Source code for poly_lithic.src.interfaces.k2eg_interface

import uuid
from concurrent.futures import ThreadPoolExecutor

import k2eg
from poly_lithic.src.logging_utils import get_logger

from .BaseInterface import BaseInterface

executor = ThreadPoolExecutor(20)

logger = get_logger()

# _dir = os.path.dirname(os.path.abspath(__file__))
# os.environ["K2EG_PYTHON_CONFIGURATION_PATH_FOLDER"] = _dir
# print(f"K2EG_PYTHON_CONFIGURATION_PATH_FOLDER: {_dir}")


[docs] class K2EGInterface(BaseInterface): def __init__(self, config): try: self.client = k2eg.dml( 'env', 'app-test-3', group_name=f'model-deployment-{str(uuid.uuid4())[0:15]}', ) except Exception as e: print(f'Error initializing K2EGInterface: {e}') self.close() raise e pv_dict = config['variables'] pv_url_list = [] for pv in pv_dict: pv_url_list.append(pv_dict[pv]['proto'] + '://' + pv_dict[pv]['name']) self.pv_url_list = pv_url_list self.symbol_list = list(pv_dict.keys()) self.variable_list = list(pv_dict.keys()) self.url_lookup = { pv_dict[pv]['proto'] + '://' + pv_dict[pv]['name']: pv for pv in pv_dict } self.reverse_url_lookup = { pv: pv_dict[pv]['proto'] + '://' + pv_dict[pv]['name'] for pv in pv_dict } logger.debug(f'K2EGInterface initialized with pv_url_list: {self.pv_url_list}') logger.debug(f'K2EGInterface initialized with symbol_list: {self.symbol_list}') logger.debug(f'K2EGInterface initialized with url_lookup: {self.url_lookup}') logger.debug( f'K2EGInterface initialized with reverse_url_lookup: {self.reverse_url_lookup}' )
[docs] def monitor(self, handler, **kwargs): logger.debug(f'Monitoring {self.pv_url_list}') try: self.client.monitor_many(self.pv_url_list, handler, timeout=1000) except Exception as e: print(f'Error monitoring: {e}') raise e
[docs] def get(self, name, **kwargs): url = self.reverse_url_lookup[name] value = self.client.get(url) return name, value
[docs] def put(self, name, value, **kwargs): # print(f"putting {name} with value {value}") try: self.client.put(self.reverse_url_lookup[name], value) except Exception as e: print(f'Error putting: {e}') raise e
[docs] def put_many(self, data, **kwargs): results = [] for name, value in data.items(): res = executor.submit(self.put, name, value) results.append(res) for res in results: res.result()
[docs] def get_many(self, data, **kwargs): pass
[docs] def close(self): self.client.close() print('K2EGInterface closed') return True