poly_lithic.src.utils.messaging module🔗
- class poly_lithic.src.utils.messaging.Message(**data)[source]🔗
Bases:
BaseModel
- Parameters:
topic (str | list[str])
source (str)
value (dict)
timestamp (float)
allow_unsafe (bool | None)
-
topic:
Union
[str
,list
[str
]]🔗
-
source:
str
🔗
-
value:
dict
🔗
-
timestamp:
float
🔗
-
allow_unsafe:
Optional
[bool
]🔗
- property keys: list[str]🔗
- property values: list[Any]🔗
- property uid: str🔗
return a unique id for the message
- model_config: ClassVar[ConfigDict] = {}🔗
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class poly_lithic.src.utils.messaging.MessageBroker[source]🔗
Bases:
object
- attach(observer, topic)[source]🔗
add observer to topic
- Return type:
None
- Parameters:
observer (Observer)
topic (str | list[str])
- detach(observer, topic)[source]🔗
remove observer from topic, we will probably never use this
- Return type:
None
- Parameters:
observer (Observer)
topic (str | list[str])
- class poly_lithic.src.utils.messaging.TransformerObserver(transformer, topic, unpack_output=False)[source]🔗
Bases:
Observer
- Parameters:
transformer (<module 'poly_lithic.src.transformers.BaseTransformer' from '/home/gbm96348/nfs_home/lume-deployment/poly_lithic/src/transformers/BaseTransformer.py'>)
topic (str)
unpack_output (bool)
- class poly_lithic.src.utils.messaging.InterfaceObserver(interface, topic, sanitise=True)[source]🔗
Bases:
Observer
- Parameters:
interface (<module 'poly_lithic.src.interfaces.BaseInterface' from '/home/gbm96348/nfs_home/lume-deployment/poly_lithic/src/interfaces/BaseInterface.py'>)
topic (str)
sanitise (bool)
- get_all()[source]🔗
get all variables from the interface based on internal variable list
- Return type:
list
[Message
]