Source code for poly_lithic.src.cli

import argparse
import asyncio
import json
import logging
import os
import sys
import time
import traceback

from poly_lithic.src.config import ConfigParser
from poly_lithic.src.logging_utils import get_logger, make_logger

from poly_lithic.src.utils.builder import Builder

from poly_lithic._version import __version__

logger = get_logger()

init_time = time.time()

# if file build-info exists, set the environment variables
if os.path.exists('build-info.json'):
    with open('build-info.json') as stream:
        data = json.load(stream)
    for key, value in data.items():
        os.environ[key] = value

print('=' * 120)
# print(f"Commit head: {os.environ['vcs-ref']}")
# print(f"Build time: {os.environ['build-date']}")
print(f'Version: {__version__}')
print('=' * 120 + '\n')


[docs] def initailize_config(config_path): """Initialize the configuration.""" try: config_parser = ConfigParser(config_path) return config_parser.parse() except Exception as e: logger.error(f'Error initializing configuration: {e}') raise e
[docs] def env_config(env_config): """Set the environment variables.""" logger.debug(f'Setting environment variables from: {env_config}') try: # load json with open(env_config) as stream: data = json.load(stream) for key, value in data.items(): os.environ[key] = value except Exception as e: logger.error(f'Error setting environment variables: {e}') raise e
[docs] def setup(): """Setup the model manager.""" parser = argparse.ArgumentParser(description='Model Manager CLI') parser.add_argument( '-d', '--debug', help='Debug mode', required=False, default=False, action='store_true', ) parser.add_argument( '-c', '--config', help='Path to the configuration file', required=False ) parser.add_argument( '-g', '--model_getter', help='Method to obtain the model', choices=['mlflow', 'local'], default='mlflow', ) # parser.add_argument( # "-n", # "--model_name", # required=False, # help="Name of the model to be loaded from mlflow", # ) parser.add_argument( '-v', '--version', help='Print Version and exit', required=False, default=False, action='store_true', ) parser.add_argument( '-r', '--reqirements', help='reqirements install only, if True requirements.txt is obtained from the model and installed, program then exits', required=False, default=False, action='store_true', ) # env parser.add_argument( '-e', '--env', help='Path to the environment configuration file, json format', required=False, ) parser.add_argument( '-o', '--one_shot', help='One shot mode, run once and exit, helpful for debugging', required=False, default=False, action='store_true', ) # publish parser.add_argument( '-p', '--publish', help='Publish data to system, if True data is published, otherwise the step is skipped', required=False, default=False, action='store_true', ) # version print and exit # parser.add_argument( # '-v', # '--version', # help='Print version and exit', # required=False, # default=False, # action='store_true', # ) args = parser.parse_args() if args.version: os.environ['version'] = __version__ print(f'Ploy-Lithic version: {os.environ["version"]}') sys.exit(0) # change logger level if args.debug: print('Debug mode') logger = make_logger(level=logging.DEBUG) os.environ['DEBUG'] = 'True' else: print('Info mode') logger = make_logger(level=logging.INFO) os.environ['DEBUG'] = 'False' logger.info('Model Manager CLI') logger.debug(f'Arguments: {args}') if args.publish: logger.warning('Publishing data to system') os.environ['PUBLISH'] = 'True' else: logger.warning('Not publishing data to system, to publish use -p or --publish') os.environ['PUBLISH'] = 'False' # env useful when running in windows if args.env: env_config(args.env) if not args.config: logger.info( 'No configuration file provided, getting config from model artifacts' ) # check if os.environ["MODEL_CONFIG_FILE"] exists if os.environ['MODEL_CONFIG_FILE']: builder = Builder(args.config) else: raise Exception( 'No configuration file provided, this should be done via command -c line or environment variable MODEL_CONFIG_FILE' ) else: logger.info(f'Configuration file provided: {args.config}') builder = Builder(args.config) broker = builder.build() return ( args, builder.config, broker, )
[docs] async def model_main(args, config, broker): """Main.""" # monitor and send to transformer handle # reintialise logger to get the correct logger and clear any previous handlers # print("model_main") logger = get_logger() logger.info('Starting model manager') time.time() os.environ['PUBLISH'] = str(args.publish) try: if config.deployment.type == 'continuous': time_start = time.time() while True: if time.time() - time_start > config.deployment.rate: time_start = time.time() broker.get_all() else: if len(broker.queue) > 0: broker.parse_queue() if len(broker.queue) > 0: broker.parse_queue() if args.one_shot: logger.info('One shot mode, exiting') break await asyncio.sleep(0.01) # sleep for 10ms else: raise Exception('Deployment type not supported') except Exception as e: logger.error(f'Error monitoring: {traceback.format_exc()}') raise e finally: logger.info('Exiting') sys.exit(0)