Source code for cmind.cli

# Collective Mind command line wrapper
#
# Written by Grigori Fursin

import sys

############################################################
[docs] def run(argv = None): """ Run CM automation actions from the command line. CM command line format: cm {action} {automation} (artifacts) (--flags) (@input.yaml) (@input.json) Args: argv (list | string): command line arguments Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * Output from a CM automation action """ # Access CM from cmind.core import CM cm = CM() if argv is None: argv = sys.argv[1:] r = cm.access(argv, out='con') # Check if save to json if cm.save_to_json != '': from cmind import utils utils.save_json(cm.save_to_json, meta=r) # Check if output to console if cm.output=='json': from cmind import utils utils.dump_safe_json(r) elif r['return']>0 and (cm.output is None or cm.output=='con'): cm.error(r) sys.exit(r['return'])
############################################################
[docs] def runx(argv = None): """ Run CM automation actions from the new command line. CM command line format: cm {action} {automation} (artifacts) (-params) (--inputs) (@input.yaml) (@input.json) Args: argv (list | string): command line arguments Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * Output from a CM automation action """ # Access CM from cmind.core import CM cm = CM() if argv is None: argv = sys.argv[1:] r = cm.x(argv, out='con') if r['return']>0 and (cm.output is None or cm.output == 'con'): cm.errorx(r) sys.exit(r['return'])
############################################################
[docs] def run_script(argv = None): """ Shortcut to "cm run script ..." CM command line format: Args: argv (list | string): command line arguments Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * Output from a CM automation action """ # Access CM if argv is None: argv = sys.argv[1:] return run(['run', 'script'] + argv)
############################################################
[docs] def docker_script(argv = None): """ Shortcut to "cm docker script ..." CM command line format: Args: argv (list | string): command line arguments Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * Output from a CM automation action """ # Access CM if argv is None: argv = sys.argv[1:] return run(['docker', 'script'] + argv)
############################################################
[docs] def gui_script(argv = None): """ Shortcut to "cm gui script ..." CM command line format: Args: argv (list | string): command line arguments Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * Output from a CM automation action """ # Access CM if argv is None: argv = sys.argv[1:] return run(['gui', 'script'] + argv)
############################################################
[docs] def run_experiment(argv = None): """ Shortcut to "cm run experiment ..." CM command line format: Args: argv (list | string): command line arguments Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * Output from a CM automation action """ # Access CM if argv is None: argv = sys.argv[1:] return run(['run', 'experiment'] + argv)
############################################################
[docs] def parse(cmd): """ Parse CM command line into CM input dictionary. Args: cmd (str | list) : arguments as a string or list Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * cm_input (dict): CM unified input to the CM access function """ # If input is string, convert to argv # We use shlex to properly convert "" if cmd is None: argv = [] elif type(cmd) == str: import shlex argv = shlex.split(cmd) else: argv = cmd # Positional arguments cm_input = {} # First argument: automation special_cli_characters=['-', '@'] for key in ['action', 'automation']: if len(argv) > 0: x = argv[0].strip() if x != '' and x[0] not in special_cli_characters: cm_input[key] = argv.pop(0) # Check if just one artifact or multiple ones artifact='' artifacts=[] # Only added if more than 1 artifact! for index in range(0, len(argv)): a=argv[index] if a=='--': unparsed = [] if index>len(argv) else argv[index+1:] cm_input['unparsed_cmd']=unparsed break elif a.startswith('@'): # Load JSON or YAML file from cmind import utils r = utils.load_json_or_yaml(file_name = a[1:], check_if_exists=True) if r['return'] >0 : return r meta = r['meta'] cm_input.update(meta) elif not a.startswith('-'): # artifact if artifact=='': artifact=a cm_input['artifact']=a else: artifacts.append(a) else: # flags j = a.find('=') # find first = if j>0: key = a[:j].strip() value = a[j+1:].strip() else: if a.endswith('-'): key=a[:-1] value=False else: key=a value=True if key.startswith('-'): key=key[1:] if key.startswith('-'): key=key[1:] if key.endswith(','): key = key[:-1] value = value.split(',') if value!="" else [] if '.' in key: keys = key.split('.') new_cm_input = cm_input first = True for key in keys[:-1]: if first: key = key.replace('-','_') first = False if key not in new_cm_input: new_cm_input[key] = {} new_cm_input = new_cm_input[key] new_cm_input[keys[-1]]=value else: key = key.replace('-','_') cm_input[key] = value # Add extra artifacts if specified if len(artifacts) > 0: cm_input['artifacts'] = artifacts cm_input['cmd'] = cmd return {'return':0, 'cm_input':cm_input}
############################################################
[docs] def parsex(cmd): """ Parse CM command line into CM input dictionary. Args: cmd (str | list) : arguments as a string or list Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * cm_input (dict): CM unified input to the CM access function """ # Init variables cmx_input = {'control':{}} # If input is string, convert to argv # We use shlex to properly convert "" if cmd is None: argv = [] elif type(cmd) == str: import shlex argv = shlex.split(cmd) else: argv = cmd cmx_input['control']['_cmd'] = argv.copy() # First positional arguments for key in ['action', 'automation']: if len(argv) > 0: x = argv[0].strip() if x != '' and x[0] not in ['-', '@']: cmx_input[key] = argv.pop(0) # Check if just one artifact or multiple ones artifact ='' artifacts = [] # Only added if more than 1 artifact! for index in range(0, len(argv)): a = argv[index] if a == '--': unparsed = [] if index>len(argv) else argv[index+1:] cmx_input['control']['_unparsed_cmd'] = unparsed break elif a.startswith('@'): # Load JSON or YAML file from cmind import utils r = utils.load_json_or_yaml(file_name = a[1:], check_if_exists=True) if r['return'] >0 : return r meta = r['meta'] control = meta.pop('control', {}) for k in control: cmx_input['control'][k] = control[k] cmx_input.update(meta) # Check flags elif a.startswith('---'): return {'return':1, 'error': f'flag "{a}" has unknown prefix'} elif a.startswith('--'): a = a[2:] split_flag(a, cmx_input) elif a.startswith('-'): a = a[1:] split_flag(a, cmx_input['control']) else: # artifact if '=' in a: split_flag(a, cmx_input) elif artifact == '': artifact = a cmx_input['artifact'] = a else: artifacts.append(a) # Add extra artifacts if specified if len(artifacts) > 0: cmx_input['artifacts'] = artifacts return {'return':0, 'cmx_input':cmx_input}
###########################################################################
[docs] def split_flag(flag, array, value=None): """ Split flag key=value and add to array Args: flag (str): flag of format "key=value" array (dict): array where to accumulate flags value (str): if not None use it (for "-key value" flags) Returns: key (str): key value (str): value array (dict): updated array """ # flags j = flag.find('=') # find first = if j>0: key = flag[:j].strip() value = flag[j+1:].strip() else: key = flag if value == None: if key.endswith('-'): key = key[:-1] value=False else: value=True # Decided not to do that (20241006) # Force '-' to '_' in keys in CLI # key = key.replace('-', '_') if key.endswith(','): key = key[:-1] if value in [True, False]: value = [value] elif value !='': value = value.split(',') else: value = [] if '.' in key: keys = key.split('.') first = True for key in keys[:-1]: if first: first = False if key not in array: array[key] = {} array = array[key] array[keys[-1]] = value else: array[key] = value return key, value, array
########################################################################### if __name__ == "__main__": run()