Source code for poly_lithic.src.cli

import click
import asyncio
import json
import logging
import os
import sys
import time
import traceback


from poly_lithic._version import __version__
from poly_lithic.src.logging_utils import get_logger, make_logger

[docs] def import_poly_lithic_deps(): from poly_lithic.src.config import ConfigParser from poly_lithic.src.utils.builder import Builder
# ============================================================================ # Helper Functions # ============================================================================
[docs] def load_build_info(): """Load build info from build-info.json if it exists.""" if os.path.exists('build-info.json'): with open('build-info.json') as stream: data = json.load(stream) for key, value in data.items(): os.environ[key] = value
[docs] def setup_logging(debug): """Setup logging based on debug flag.""" if debug: click.echo('Debug mode') logger = make_logger(level=logging.DEBUG) os.environ['DEBUG'] = 'True' else: click.echo('Info mode') logger = make_logger(level=logging.INFO) os.environ['DEBUG'] = 'False' return logger
[docs] def load_env_config(env_path): """Load environment configuration from JSON file.""" logger = get_logger() logger.debug(f'Setting environment variables from: {env_path}') try: with open(env_path) as stream: data = json.load(stream) for key, value in data.items(): os.environ[key] = value logger.info(f'Loaded {len(data)} environment variables') except Exception as e: logger.error(f'Error setting environment variables: {e}') raise e
[docs] async def model_main(args, config, broker): """ Main async function for running the model manager. Args: args: Parsed arguments namespace config: Configuration object broker: Broker instance """ logger = get_logger() logger.info('Starting model manager') os.environ['PUBLISH'] = str(args.publish) try: if config.deployment.type == 'continuous': time_start = time.time() while True: if time.time() - time_start > config.deployment.rate: time_start = time.time() broker.get_all() else: if len(broker.queue) > 0: broker.parse_queue() if len(broker.queue) > 0: broker.parse_queue() if args.one_shot: logger.info('One shot mode, exiting') break await asyncio.sleep(0.01) else: raise Exception(f'Deployment type "{config.deployment.type}" not supported') except Exception as e: logger.error(f'Error in model main loop: {traceback.format_exc()}') raise e finally: logger.info('Exiting')
# ============================================================================ # CLI Commands # ============================================================================ @click.group(invoke_without_command=True) @click.pass_context @click.version_option(version=__version__, prog_name='poly-lithic') def cli(ctx): """ Poly-Lithic - A modular ML model deployment framework with plugin support. Run without subcommands to start the model manager (default behavior). Use subcommands for additional functionality. Examples: poly-lithic --config config.yaml poly-lithic --config config.yaml --debug poly-lithic plugin init --name my-plugin """ load_build_info() print_banner() if ctx.invoked_subcommand is None: ctx.invoke(run_model, **ctx.params) @cli.command(name='run') @click.option('--config', '-c', type=click.Path(exists=True), help='Path to the configuration file') @click.option('--model-getter', '-g', type=click.Choice(['mlflow', 'local'], case_sensitive=False), default='mlflow', help='Method to obtain the model') @click.option('--debug', '-d', is_flag=True, help='Enable debug mode') @click.option('--env', '-e', type=click.Path(exists=True), help='Path to environment configuration file (JSON format)') @click.option('--one-shot', '-o', is_flag=True, help='One shot mode - run once and exit (helpful for debugging)') @click.option('--publish', '-p', is_flag=True, help='Publish data to system') @click.option('--requirements', '-r', is_flag=True, help='Requirements install only - install requirements.txt from model and exit') def run_model(config, model_getter, debug, env, one_shot, publish, requirements): """ Run the model manager with the specified configuration. This is the default command when no subcommand is specified. Examples: poly-lithic run --config config.yaml poly-lithic run --config config.yaml --debug --publish poly-lithic run --config config.yaml --one-shot """ try: logger = setup_logging(debug) logger.info('Model Manager CLI') if publish: logger.warning('Publishing data to system') os.environ['PUBLISH'] = 'True' else: logger.warning('Not publishing data to system. To publish, use --publish') os.environ['PUBLISH'] = 'False' if env: load_env_config(env) if not config: logger.info('No configuration file provided, getting config from model artifacts') if 'MODEL_CONFIG_FILE' not in os.environ: raise click.ClickException( 'No configuration file provided. Use --config or set MODEL_CONFIG_FILE environment variable' ) config = os.environ['MODEL_CONFIG_FILE'] else: logger.info(f'Configuration file provided: {config}') # Import heavy dependencies only when needed from poly_lithic.src.utils.builder import Builder click.echo('Building model manager...') builder = Builder(config) broker = builder.build() if requirements: click.echo('Requirements-only mode - exiting after installation') sys.exit(0) # ugly but we needed to do some legacy support, this will eventially all be click handled import argparse args = argparse.Namespace( config=config, model_getter=model_getter, debug=debug, env=env, one_shot=one_shot, publish=publish, requirements=requirements, ) logger.info('Starting model manager main loop') asyncio.run(model_main(args, builder.config, broker)) except KeyboardInterrupt: click.echo('\n\nInterrupted by user') sys.exit(0) except Exception as e: click.echo(click.style(f'\n✗ Error: {e}', fg='red'), err=True) if debug: traceback.print_exc() sys.exit(1) # ============================================================================ # Plugin Commands # ============================================================================ @cli.group() def plugin(): """ Manage poly_lithic plugins. Commands for creating, listing, and managing plugins. """ pass @plugin.command() @click.option('--name', '-n', default=None, help='Name of the plugin package') @click.option('--author', '-a', default=None, help='Author name') @click.option('--email', default=None, help='Author email') @click.option('--description', '-d', default=None, help='Plugin description') @click.option('--output-dir', '--dir', '-o', type=click.Path(), default='.', help='Output directory for the plugin project (default: current directory)') @click.option('--no-prompt', is_flag=True, help='Skip interactive prompts (use defaults or provided values)') def init(name, author, email, description, output_dir, no_prompt): """ Initialize a new plugin project from template. Creates a simple plugin project with examples of all plugin types. Comment out the types you don't need. Examples: poly-lithic plugin init --name my-plugin poly-lithic plugin init -n my-plugin --dir ./plugins poly-lithic plugin init --name my-plugin --no-prompt """ try: from poly_lithic.src.utils.plugin_generator import PluginGenerator from pathlib import Path click.echo(click.style('\n🚀 Creating Plugin Project\n', fg='cyan', bold=True)) # Handle prompts manually based on no_prompt flag if no_prompt: name = name or 'my_plugin' author = author or 'Author Name' email = email or 'example@email.com' description = description or 'A poly_lithic plugin package' else: name = name or click.prompt('Plugin name') author = author or click.prompt('Author name', default='') email = email or click.prompt('Author email', default='example@email.com') description = description or click.prompt('Short description', default='') output_path = Path(output_dir).expanduser().resolve() plugin_dir_name = PluginGenerator._normalize_package_name(name) if not output_path.exists(): if no_prompt or click.confirm(f"Directory '{output_path}' doesn't exist. Create it?", default=True): output_path.mkdir(parents=True, exist_ok=True) else: click.echo("Aborted.") sys.exit(1) generator = PluginGenerator() project_path = generator.generate( name=name, author=author, email=email, description=description, output_dir=str(output_path), ) click.echo(click.style(f"✓ Plugin project created: {project_path.name}", fg='green', bold=True)) click.echo(click.style("\n📝 Next steps:", fg='yellow', bold=True)) click.echo(f" 1. cd {project_path.name}") click.echo(" 2. Edit the plugin files and comment out types you don't need") click.echo(" 3. pip install -e .") click.echo(" 4. Test the plugin: pl run --config test_deployment.yaml --debug --one-shot") click.echo(" 5. Run tests: pytest") click.echo(click.style("\n💡 Tips:", fg='cyan')) click.echo(" • The template includes examples of all three plugin types") click.echo(" • Comment out types you don't need in:") click.echo(f" - {plugin_dir_name}/__init__.py") click.echo(f" - pyproject.toml (entry points section)") click.echo(f" - test_deployment.yaml (module configurations)") click.echo() except FileExistsError as e: click.echo(click.style(f'✗ {e}', fg='red'), err=True) click.echo(click.style(' Tip: Choose a different name or remove the existing directory', fg='yellow')) sys.exit(1) except Exception as e: click.echo(click.style(f'✗ Error creating plugin: {e}', fg='red'), err=True) if os.environ.get('DEBUG') == 'True': traceback.print_exc() sys.exit(1) @plugin.command() @click.option('--type', '-t', 'plugin_type', type=click.Choice(['interface', 'transformer', 'model_getter', 'all'], case_sensitive=False), help='Filter by plugin type') def list(plugin_type): """ List all available plugins. Shows all plugins discovered via entry points in the current environment. Examples: poly-lithic plugin list poly-lithic plugin list --type interface """ from poly_lithic.src.utils.plugin_registry import ( interface_plugin_registry, transformer_plugin_registry, model_getter_plugin_registry, ) try: from yaspin import yaspin from yaspin.spinners import Spinners use_spinner = True except ImportError: use_spinner = False registries = [] if not plugin_type or plugin_type in ['interface', 'all']: registries.append(('Interfaces', interface_plugin_registry)) if not plugin_type or plugin_type in ['transformer', 'all']: registries.append(('Transformers', transformer_plugin_registry)) if not plugin_type or plugin_type in ['model_getter', 'all']: registries.append(('Model Getters', model_getter_plugin_registry)) if use_spinner: with yaspin(Spinners.dots12, text="Discovering plugins...", color="green") as spinner: for name, registry in registries: spinner.text = f"Scanning {name.lower()}..." registry.discover_plugins() spinner.text = "Scanning complete!" spinner.ok("✓") else: with click.progressbar(registries, label='Scanning for plugins', bar_template='%(label)s [%(bar)s] %(info)s', show_percent=False, show_pos=True) as bar: for name, registry in bar: registry.discover_plugins() click.echo(click.style("\n📦 Available Plugins\n", fg='cyan', bold=True)) for name, registry in registries: plugins = registry.list_plugins() if plugins: click.echo(click.style(f"{name}:", fg='yellow', bold=True)) for plugin_name in sorted(plugins): click.echo(f" • {plugin_name}") click.echo() else: click.echo(click.style(f"{name}: None found", fg='yellow')) click.echo() if not any(reg[1].list_plugins() for reg in registries): click.echo(click.style("No plugins found", fg='yellow')) @plugin.command() @click.argument('plugin_name') @click.option('--type', '-t', 'plugin_type', type=click.Choice(['interface', 'transformer', 'model_getter'], case_sensitive=False), help='Plugin type (optional, will search all if not specified)') def info(plugin_name, plugin_type): """ Show detailed information about a specific plugin. Examples: poly-lithic plugin info my_interface poly-lithic plugin info my_interface --type interface """ from poly_lithic.src.utils.plugin_registry import ( interface_plugin_registry, transformer_plugin_registry, model_getter_plugin_registry, ) registries = {} if not plugin_type or plugin_type == 'interface': registries['interface'] = interface_plugin_registry if not plugin_type or plugin_type == 'transformer': registries['transformer'] = transformer_plugin_registry if not plugin_type or plugin_type == 'model_getter': registries['model_getter'] = model_getter_plugin_registry found = False for reg_type, registry in registries.items(): registry.discover_plugins() if registry.has_plugin(plugin_name): found = True plugin_class = registry.get(plugin_name) click.echo(click.style(f"\n📋 Plugin: {plugin_name}", fg='cyan', bold=True)) click.echo(f"Type: {reg_type}") click.echo(f"Class: {plugin_class.__module__}.{plugin_class.__name__}") if plugin_class.__doc__: click.echo(f"\nDescription:") click.echo(f" {plugin_class.__doc__.strip()}") methods = [m for m in dir(plugin_class) if not m.startswith('_') and callable(getattr(plugin_class, m))] if methods: click.echo(f"\nPublic Methods:") for method in sorted(methods): click.echo(f" • {method}()") click.echo() if not found: click.echo(click.style(f"✗ Plugin '{plugin_name}' not found", fg='red'), err=True) click.echo(click.style("\nTip: Run 'poly-lithic plugin list' to see available plugins", fg='yellow')) sys.exit(1) # ============================================================================ # Utility Commands # ============================================================================ @cli.command() @click.argument('config_file', type=click.Path(exists=True)) @click.option('--env', '-e', type=click.Path(exists=True), help='Path to environment configuration file (JSON format)') def validate(config_file, env): """ Validate a configuration file without running the model. Examples: poly-lithic validate config.yaml poly-lithic validate config.yaml --env env.json """ try: from poly_lithic.src.config import ConfigParser logger = make_logger(level=logging.INFO) if env: load_env_config(env) click.echo(f'Validating configuration file: {config_file}') config_parser = ConfigParser(config_file) config = config_parser.parse() click.echo(click.style('✓ Configuration is valid', fg='green', bold=True)) click.echo('\nConfiguration Summary:') click.echo(f' Deployment type: {config.deployment.type}') click.echo(f' Deployment rate: {config.deployment.rate}') if hasattr(config, 'models'): click.echo(f' Models: {len(config.models) if hasattr(config.models, "__len__") else "configured"}') if hasattr(config, 'variables'): click.echo(f' Variables: {len(config.variables)}') if hasattr(config, 'transformers'): click.echo(f' Transformers: {len(config.transformers) if hasattr(config.transformers, "__len__") else "configured"}') click.echo() except Exception as e: click.echo(click.style(f'✗ Configuration is invalid: {e}', fg='red'), err=True) if os.environ.get('DEBUG') == 'True': traceback.print_exc() sys.exit(1) @cli.command() @click.argument('config_file', type=click.Path(exists=True)) @click.option('--output', '-o', type=click.Path(), required=True, help='Image file output path') def visualize(config_file, output): """ Visualize the configuration as an image file """ try: from poly_lithic.src.config import ConfigParser parser = ConfigParser(config_file) cfg = parser.parse() cfg.save_routing_graph(output) click.echo(click.style(f'Saved to {output}', fg='green')) except Exception as e: click.echo(click.style(f'Configuration error: {e}', fg='red'), err=True) sys.exit(1) # ============================================================================ # Legacy Support Functions # ============================================================================
[docs] def setup(): """ Legacy setup function for backward compatibility. This function is kept for existing code that imports and calls setup(). New code should use the Click CLI directly. """ import argparse parser = argparse.ArgumentParser(description='Model Manager CLI') parser.add_argument('-d', '--debug', action='store_true', help='Debug mode') parser.add_argument('-c', '--config', help='Path to the configuration file') parser.add_argument('-g', '--model_getter', choices=['mlflow', 'local'], default='mlflow') parser.add_argument('-v', '--version', action='store_true', help='Print version and exit') parser.add_argument('-r', '--requirements', action='store_true') parser.add_argument('-e', '--env', help='Path to environment configuration file') parser.add_argument('-o', '--one_shot', action='store_true') parser.add_argument('-p', '--publish', action='store_true') args = parser.parse_args() if args.version: print(f'Poly-Lithic version: {__version__}') sys.exit(0) logger = setup_logging(args.debug) if args.publish: os.environ['PUBLISH'] = 'True' else: os.environ['PUBLISH'] = 'False' if args.env: load_env_config(args.env) if not args.config and 'MODEL_CONFIG_FILE' not in os.environ: raise Exception('No configuration file provided') from poly_lithic.src.utils.builder import Builder config = args.config or os.environ['MODEL_CONFIG_FILE'] builder = Builder(config) broker = builder.build() return args, builder.config, broker
# ============================================================================ # Main Entry Point # ============================================================================ if __name__ == '__main__': cli()