Source code for poly_lithic.src.config.config_object
from typing import Any, Optional, Union
from pydantic import computed_field
import pydantic
import networkx as nx
from ..transformers import registered_transformers
from matplotlib import pyplot as plt
from uuid import uuid4
import os
import logging
allowed_transformers = list(registered_transformers.keys())
[docs]
class ModuleConfig(pydantic.BaseModel):
type: str
name: str
pub: Optional[Union[str, list]] = None
sub: Optional[Union[str, list]] = None
module_args: Optional[Union[dict[str, Union[str, dict, bool]], str]] = None
config: Any = None # kind of a free for all for now but we can narrow down the specifics later
[docs]
@pydantic.field_validator('module_args', mode='before')
def validate_module_args(cls, v):
if isinstance(v, str):
return {}
return v
[docs]
class DeploymentConfig(pydantic.BaseModel):
type: str
rate: Optional[Union[float, int]] = None
[docs]
class ConfigObject(pydantic.BaseModel):
deployment: DeploymentConfig
modules: dict[str, ModuleConfig]
[docs]
class Config:
arbitrary_types_allowed = True # to allow nx.DiGraph
@computed_field(return_type=nx.DiGraph)
@property
def graph(self):
G = nx.DiGraph()
nodes = []
edges = []
# to collect edges we need to go through each item , look at what its publishing and find matching subsribers, this will form an edge
for key, value in self.modules.items():
if value.pub is not None or value.pub.lower() != 'none':
if isinstance(value.pub, str):
value.pub = [value.pub]
for key2, value2 in self.modules.items():
if value2.sub is not None:
# normalise them all to lists
if isinstance(value2.sub, str):
value2.sub = [value2.sub]
for pub in value.pub:
for sub in value2.sub:
if pub == sub and value.name != value2.name:
edges.append((value.name, value2.name))
nodes.append(value.name)
logging.debug(f'Nodes: {nodes}')
logging.debug(f'Edges: {edges}')
nodes = list(set(nodes))
nodes = [x for x in nodes if x is not None]
edges = [tuple(x) for x in edges]
G.add_nodes_from(nodes)
G.add_edges_from(edges)
return G
[docs]
@pydantic.field_validator('graph')
def check_routing(cls, v):
isolated_nodes = list(nx.isolates(v))
if isolated_nodes:
raise ValueError(
f'Isolated nodes found in routing graph: {isolated_nodes}, this means that a modules is niether a publisher or subscriber to any other module'
)
return v
[docs]
def draw_routing_graph(self):
G = self.graph
plt.figure(figsize=(10, 10))
nx.draw(G, with_labels=True)
if not os.path.exists('./graphs'):
os.makedirs('./graphs')
plt.savefig('./graphs/{}_routing_graph.png'.format(uuid4()))