Source code for poly_lithic.src.utils.builder

from poly_lithic.src.config import ConfigParser
from poly_lithic.src.interfaces import registered_interfaces
from poly_lithic.src.logging_utils import get_logger
from poly_lithic.src.transformers import registered_transformers
from poly_lithic.src.utils.messaging import (
    ModelObserver,
    InterfaceObserver,
    MessageBroker,
    TransformerObserver,
)

logger = get_logger()


[docs] class MockModel: def __init__(self): """placeholder for model"""
[docs] def evaluate(self, value): """placeholder for model prediction""" return {'not_initialized': {'value': -99999999999}}
[docs] class Builder: def __init__(self, config_path): self.config = self.__initailise_config(config_path) self.logger = get_logger() def __initailise_config(self, config_path): """Initialise the configuration.""" try: config_parser = ConfigParser(config_path) return config_parser.parse() except Exception as e: logger.error(f'Error initializing configuration: {e}') raise e
[docs] def build(self) -> MessageBroker: """Build the model manager.""" self.__build_observers() self.__build_broker() for name, observer in self.loaded_observers.items(): self.broker.attach(observer, self.config.modules[name].sub) return self.broker
def __build_observers(self): """Build the observers.""" loaded_observers: dict[str, object] = {} for module in self.config.modules: try: module_type, module_subtype = self.config.modules[module].type.split( '.' ) except ValueError: raise ValueError( f"Invalid module type: {self.config.modules[module].type} must be of the form 'type.subtype'" ) if module_type == 'interface': if self.config.modules[module].module_args is not None: args = self.config.modules[module].module_args else: args = {} interface = InterfaceObserver( registered_interfaces[module_subtype]( self.config.modules[module].config ), self.config.modules[module].pub, *args, ) loaded_observers[module] = interface elif module_type == 'transformer': if self.config.modules[module].module_args is not None: args = self.config.modules[module].module_args else: args = {} transformer = TransformerObserver( registered_transformers[module_subtype]( self.config.modules[module].config ), self.config.modules[module].pub, *args, ) loaded_observers[module] = transformer elif module_type == 'model': if self.config.modules[module].module_args is not None: args = self.config.modules[module].module_args else: args = {} observer = ModelObserver( config=self.config.modules[module].config, topic=self.config.modules[module].pub, *args, ) loaded_observers[module] = observer else: raise ValueError(f'Invalid module type: {module_type}') logger.debug(f'Loaded observers: {loaded_observers}') # lets validate all of them are Observers for observer in loaded_observers: if not isinstance( loaded_observers[observer], (ModelObserver, InterfaceObserver, TransformerObserver), ): raise ValueError( f'Invalid observer: {observer} must be of type ModelObserver, InterfaceObserver or TransformerObserver' ) self.loaded_observers = loaded_observers return None def __build_broker(self): """Build the message broker.""" self.broker = MessageBroker() logger.debug(f'Built broker: {self.broker}')