poly_lithic.src.utils.messaging module🔗
- class poly_lithic.src.utils.messaging.Message(**data)[source]🔗
Bases:
BaseModel- Parameters:
topic (str | list[str])
source (str)
value (dict)
timestamp (float)
allow_unsafe (bool | None)
- topic: str | list[str]🔗
- source: str🔗
- value: dict🔗
- timestamp: float🔗
- allow_unsafe: bool | None🔗
- property keys: list[str]🔗
- property values: list[Any]🔗
- property uid: str🔗
return a unique id for the message
- model_config: ClassVar[ConfigDict] = {}🔗
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class poly_lithic.src.utils.messaging.MessageBroker[source]🔗
Bases:
object- attach(observer, topic)[source]🔗
add observer to topic
- Return type:
None- Parameters:
observer (Observer)
topic (str | list[str])
- detach(observer, topic)[source]🔗
remove observer from topic, we will probably never use this
- Return type:
None- Parameters:
observer (Observer)
topic (str | list[str])
- class poly_lithic.src.utils.messaging.TransformerObserver(transformer, topic, unpack_output=False)[source]🔗
Bases:
Observer- Parameters:
transformer (BaseTransformer)
topic (str)
unpack_output (bool)
- class poly_lithic.src.utils.messaging.InterfaceObserver(interface, topic, sanitise=True)[source]🔗
Bases:
Observer- Parameters:
interface (BaseInterface)
topic (str)
sanitise (bool)
- interface: BaseInterface🔗
- topic: str🔗
- get_all()[source]🔗
get all variables from the interface based on internal variable list
- Return type:
list[Message]