poly_lithic.src.utils.messaging module🔗

poly_lithic.src.utils.messaging.profileit(func)[source]🔗
class poly_lithic.src.utils.messaging.Message(**data)[source]🔗

Bases: BaseModel

Parameters:
  • topic (str | list[str])

  • source (str)

  • value (dict)

  • timestamp (float)

  • allow_unsafe (bool | None)

topic: Union[str, list[str]]🔗
source: str🔗
value: dict🔗
timestamp: float🔗
allow_unsafe: Optional[bool]🔗
classmethod check_topic(topic)[source]🔗
classmethod check_value(value)[source]🔗
property keys: list[str]🔗
property values: list[Any]🔗
property uid: str🔗

return a unique id for the message

model_config: ClassVar[ConfigDict] = {}🔗

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class poly_lithic.src.utils.messaging.Observer[source]🔗

Bases: ABC

abstractmethod update(message)[source]🔗
Return type:

Message

Parameters:

message (Message)

class poly_lithic.src.utils.messaging.MessageBroker[source]🔗

Bases: object

attach(observer, topic)[source]🔗

add observer to topic

Return type:

None

Parameters:
  • observer (Observer)

  • topic (str | list[str])

detach(observer, topic)[source]🔗

remove observer from topic, we will probably never use this

Return type:

None

Parameters:
  • observer (Observer)

  • topic (str | list[str])

notify(message)[source]🔗

notify all observers of a message

Return type:

None

Parameters:

message (Message)

get_stats()[source]🔗
get_all()[source]🔗
Return type:

None

parse_queue()[source]🔗
class poly_lithic.src.utils.messaging.TransformerObserver(transformer, topic, unpack_output=False)[source]🔗

Bases: Observer

Parameters:
  • transformer (<module 'poly_lithic.src.transformers.BaseTransformer' from '/home/gbm96348/nfs_home/lume-deployment/poly_lithic/src/transformers/BaseTransformer.py'>)

  • topic (str)

  • unpack_output (bool)

update(message)[source]🔗
Return type:

Message | list[Message]

Parameters:

message (Message)

class poly_lithic.src.utils.messaging.InterfaceObserver(interface, topic, sanitise=True)[source]🔗

Bases: Observer

Parameters:
  • interface (<module 'poly_lithic.src.interfaces.BaseInterface' from '/home/gbm96348/nfs_home/lume-deployment/poly_lithic/src/interfaces/BaseInterface.py'>)

  • topic (str)

  • sanitise (bool)

update(message)[source]🔗
Return type:

Message | list[Message]

Parameters:

message (Message)

get(message)[source]🔗

get a single variable from the interface

Return type:

list[Message]

Parameters:

message (Message)

get_all()[source]🔗

get all variables from the interface based on internal variable list

Return type:

list[Message]

get_many(message)[source]🔗

get many variables from the interface

Return type:

list[Message]

Parameters:

message (Message)

put(message)[source]🔗

put a single variable into the interface

Return type:

None

Parameters:

message (Message)

put_many(message)[source]🔗

put many variables into the interface

Return type:

None

Parameters:

message (Message)

class poly_lithic.src.utils.messaging.MockModel[source]🔗

Bases: object

evaluate(value)[source]🔗

placeholder for model prediction

class poly_lithic.src.utils.messaging.ModelObserver(model=None, config=None, topic='model', unpack_input=True, pack_output=True)[source]🔗

Bases: Observer

Parameters:
  • topic (str)

  • unpack_input (bool)

  • pack_output (bool)

update(message)[source]🔗
Return type:

list[Message]

Parameters:

message (Message)