Source code for cmind.cli

# Collective Mind command line wrapper
#
# Written by Grigori Fursin

import sys

############################################################
[docs] def run(argv = None): """ Run CM automation actions from the command line. CM command line format: cm {action} {automation} (artifacts) (--flags) (@input.yaml) (@input.json) Args: argv (list | string): command line arguments Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * Output from a CM automation action """ # Access CM from cmind.core import CM cm = CM() if argv is None: argv = sys.argv[1:] r = cm.access(argv, out='con') # Check if save to json if cm.save_to_json != '': from cmind import utils utils.save_json(cm.save_to_json, meta=r) # Check if output to console if cm.output=='json': from cmind import utils utils.dump_safe_json(r) elif r['return']>0 and (cm.output is None or cm.output=='con'): cm.error(r) sys.exit(r['return'])
############################################################
[docs] def run_script(argv = None): """ Shortcut to "cm run script ..." CM command line format: Args: argv (list | string): command line arguments Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * Output from a CM automation action """ # Access CM if argv is None: argv = sys.argv[1:] return run(['run', 'script'] + argv)
############################################################
[docs] def docker_script(argv = None): """ Shortcut to "cm docker script ..." CM command line format: Args: argv (list | string): command line arguments Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * Output from a CM automation action """ # Access CM if argv is None: argv = sys.argv[1:] return run(['docker', 'script'] + argv)
############################################################
[docs] def gui_script(argv = None): """ Shortcut to "cm gui script ..." CM command line format: Args: argv (list | string): command line arguments Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * Output from a CM automation action """ # Access CM if argv is None: argv = sys.argv[1:] return run(['gui', 'script'] + argv)
############################################################
[docs] def run_experiment(argv = None): """ Shortcut to "cm run experiment ..." CM command line format: Args: argv (list | string): command line arguments Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * Output from a CM automation action """ # Access CM if argv is None: argv = sys.argv[1:] return run(['run', 'experiment'] + argv)
############################################################
[docs] def parse(cmd): """ Parse CM command line into CM input dictionary. Args: cmd (str | list) : arguments as a string or list Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * cm_input (dict): CM unified input to the CM access function """ # If input is string, convert to argv # We use shlex to properly convert "" if cmd is None: argv = [] elif type(cmd) == str: import shlex argv = shlex.split(cmd) else: argv = cmd # Positional arguments cm_input = {} # First argument: automation special_cli_characters=['-', '@'] for key in ['action', 'automation']: if len(argv) > 0: x = argv[0].strip() if x != '' and x[0] not in special_cli_characters: cm_input[key] = argv.pop(0) # Check if just one artifact or multiple ones artifact='' artifacts=[] # Only added if more than 1 artifact! for index in range(0, len(argv)): a=argv[index] if a=='--': unparsed = [] if index>len(argv) else argv[index+1:] cm_input['unparsed_cmd']=unparsed break elif a.startswith('@'): # Load JSON or YAML file from cmind import utils r = utils.load_json_or_yaml(file_name = a[1:], check_if_exists=True) if r['return'] >0 : return r meta = r['meta'] cm_input.update(meta) elif not a.startswith('-'): # artifact if artifact=='': artifact=a cm_input['artifact']=a else: artifacts.append(a) else: # flags j = a.find('=') # find first = if j>0: key = a[:j].strip() value = a[j+1:].strip() else: key=a value=True if key.startswith('-'): key=key[1:] if key.startswith('-'): key=key[1:] if key.endswith(','): key = key[:-1] value = value.split(',') if value!="" else [] if '.' in key: keys = key.split('.') new_cm_input = cm_input first = True for key in keys[:-1]: if first: key = key.replace('-','_') first = False if key not in new_cm_input: new_cm_input[key] = {} new_cm_input = new_cm_input[key] new_cm_input[keys[-1]]=value else: key = key.replace('-','_') cm_input[key] = value # Add extra artifacts if specified if len(artifacts) > 0: cm_input['artifacts'] = artifacts cm_input['cmd'] = cmd return {'return':0, 'cm_input':cm_input}
if __name__ == "__main__": run()