# Collective Mind automation
#
# Written by Grigori Fursin
import os
from cmind import utils
from cmind.artifact import Artifact
self_path = __file__
[docs]
class Automation:
"""
CM automation class
"""
def __init__(self, cmind, automation_file):
"""
Initialize CM automation class
Args:
cmind (CM class)
automation_file (str): path to the CM automation Python module
Returns:
(python class) with the following vars:
* automation_file_path (str): full path to the CM automation Python module
* path (str): directory with the CM automation
"""
self.cmind = cmind
self.automation_file_path = automation_file
self.path = os.path.dirname(self.automation_file_path)
############################################################
[docs]
def version(self, i):
"""
Print CM version
Args:
(CM input dict):
(out) (str): if 'con', output to console
Returns:
(CM return dict):
* return (int): return code == 0 if no error and >0 if error
* (error) (str): error string if return>0
"""
console = i.get('out') == 'con'
import cmind
version = cmind.__version__
if console:
print (version)
return {'return':0, 'version':version}
############################################################
[docs]
def test(self, i):
"""
Test CM and print various info
Args:
(CM input dict):
(out) (str): if 'con', output to console
Returns:
(CM return dict):
* return (int): return code == 0 if no error and >0 if error
* (error) (str): error string if return>0
"""
import sys
console = i.get('out') == 'con'
r = self.version({})
if r['return']>0: return r
version = r['version']
print ('CM version: {}'.format(version))
# Check if repository is broken
try:
from cmind import net
rn = net.request(
{'get': {'action': 'get-cm-version-notes', 'version': version}})
if rn['return'] == 0:
notes = rn.get('dict', {}).get('notes', '')
if notes !='':
print ('')
print (notes)
except Exception as e:
print ('Warning: {}'.format(e))
pass
x = sys.executable
if x != None and x != '':
print ('')
print ('Python executable used by CK: {}'.format(x))
print ('')
print ('Path to CM package: {}'.format(self.cmind.path_to_cmind))
print ('Path to CM core module: {}'.format(self.cmind.path_to_cmind_core_module))
print ('Path to CM internal repo: {}'.format(self.cmind.repos.path_to_internal_repo))
print ('')
print ('Path to CM repositories: {}'.format(self.cmind.repos_path))
print ('')
print ('GitHub for CM developments: https://github.com/mlcommons/ck/tree/master/cm')
print ('GitHub for CM automation scripts: https://github.com/mlcommons/cm4mlops')
print ('Reporting issues and ideas: https://github.com/mlcommons/ck/issues')
print ('MLCommons taskforce developing CM: https://github.com/mlcommons/ck/blob/master/docs/taskforce.md')
return {'return':0}
############################################################
[docs]
def search(self, i):
"""
Find CM artifacts (slow - we plan to accelerate it in the future using different indexing mechanisms)
Args:
(CM input dict):
(out) (str): if 'con', output to console
parsed_automation (list): prepared in CM CLI or CM access function
[ (automation alias, automation UID) ] or
[ (automation alias, automation UID), (automation repo alias, automation repo UID) ]
parsed_artifact (list): prepared in CM CLI or CM access function
[ (artifact alias, artifact UID) ] or
[ (artifact alias, artifact UID), (artifact repo alias, artifact repo UID) ]
ignore_inheritance (bool): if 'True', ignore artifact meta description inheritance
and just load meta to match UID/alias
(skip_index_search) (bool): If True, skip indexing
(force_index_add) (bool): If True, force index add (for reindexing)
Returns:
(CM return dict):
* return (int): return code == 0 if no error and >0 if error
* (error) (str): error string if return>0
* list (list): list of CM artifact objects
"""
import copy
console = i.get('out') == 'con'
skip_index_search = i.get('skip_index_search', False)
force_index_add = i.get('force_index_add', False)
lst = []
# Check tags
tags = utils.get_list_from_cli(i, key='tags')
and_tags = []
no_tags = []
for t in tags:
t=t.strip()
if t!='':
if t.startswith('-'):
no_tags.append(t[1:])
else:
and_tags.append(t)
ignore_inheritance = i.get('ignore_inheritance',False) == True
# Get parsed automation
# First object in a list is an automation
# Second optional object in a list is a repo
parsed_automation = i.get('parsed_automation',[])
automation_name = parsed_automation[0] if len(parsed_automation)>0 else ('','')
# Get parsed artifact
parsed_artifact = i.get('parsed_artifact',[])
artifact_obj = parsed_artifact[0] if len(parsed_artifact)>0 else ('','')
artifact_repo = parsed_artifact[1] if len(parsed_artifact)>1 else None
artifact_repo_wildcards = False
if artifact_repo is not None:
if '*' in artifact_repo[0] or '?' in artifact_repo[0]:
artifact_repo_wildcards = True
# Prune repos if needed (check wildcards too)
repos = self.cmind.repos.lst
pruned_repos = []
for repo in repos:
add_repo = True
if artifact_repo is not None:
repo_uid = repo.meta['uid']
if repo_uid == '': repo_uid = None
repo_alias = repo.meta['alias']
if repo_alias == '': repo_alias = None
r = utils.match_objects(uid = repo_uid,
alias = repo_alias,
uid2 = artifact_repo[1],
alias2 = artifact_repo[0])
if r['return']>0: return r
add_repo = r['match']
if add_repo:
pruned_repos.append(repo)
# Check index and bypass search
if not skip_index_search:
r = self.cmind.index.find(automation_name, artifact_obj, artifact_repo, pruned_repos, and_tags, no_tags)
if r['return']==0:
artifacts = r.get('list',[])
if len(artifacts)>0:
lst = []
repo_extra_info = self.cmind.repos.extra_info
for artifact in artifacts:
artifact_object = Artifact(cmind = self.cmind, path = artifact['path'])
for j in [0,1]:
x = artifact['repo'][j]
if x in repo_extra_info:
# Add extra info to artifact about the repo
artifact_object.repo_path = repo_extra_info[x].path
artifact_object.repo_meta = repo_extra_info[x].meta
r = artifact_object.load(ignore_inheritance = ignore_inheritance)
if r['return']>0: return r
lst.append(artifact_object)
# Output to console if forced
if console:
print (artifact_object.path)
return {'return':0, 'list':lst}
# Iterate over pruned repositories
for repo in pruned_repos:
path_repo = repo.path_with_prefix
repo_meta = copy.deepcopy(repo.meta)
# May or may not be automation
automations = sorted(os.listdir(path_repo))
for automation in automations:
path_automations = os.path.join(path_repo, automation)
if os.path.isdir(path_automations):
# Pruning by automation
# Will need to get first entry to get UID and alias of the automation
path_artifacts = os.path.join(path_repo, path_automations)
# Potential CM artifacts or just some directories
artifacts = sorted(os.listdir(path_artifacts))
# Check if artifact is first to get meta about automation UID/alias
first_artifact = True
for artifact in artifacts:
path_artifact = os.path.join(path_artifacts, artifact)
if os.path.isdir(path_artifact):
# Check if has CM meta to make sure that it's a CM object
path_artifact_meta = os.path.join(path_artifact, self.cmind.cfg['file_cmeta'])
r = utils.is_file_json_or_yaml(file_name = path_artifact_meta)
if r['return']>0: return r
if r['is_file']:
# Load artifact class
artifact_object = Artifact(cmind = self.cmind, path = path_artifact)
# Add extra info to artifact about the repo
artifact_object.repo_path = path_repo
artifact_object.repo_meta = repo_meta
# Force no inheritance if first artifact just to check automation UID and alias
tmp_ignore_inheritance = True if first_artifact else ignore_inheritance
# Force no inheritance to get basic info about aliases and UIDs
r = artifact_object.load(ignore_inheritance = tmp_ignore_inheritance)
if r['return']>0: return r
# Check permanent meta
meta = artifact_object.meta
uid = meta.get('uid', '')
if uid==None: uid = ''
alias = meta.get('alias', '')
if alias==None: alias = ''
if first_artifact:
# Need to check if automation matches
r = utils.match_objects(uid = meta.get('automation_uid'),
alias = meta.get('automation_alias'),
uid2 = automation_name[1],
alias2 = automation_name[0],
more_strict = True)
if r['return']>0: return r
if not r['match']:
break
# Match object
# print ("'{}','{}','{}','{}'".format(uid,alias,artifact_obj[1],artifact_obj[0]))
r = utils.match_objects(uid = uid,
alias = alias,
uid2 = artifact_obj[1],
alias2 = artifact_obj[0])
if r['return']>0: return r
if r['match']:
# Reload object with inheritance if needed before checking tags
if first_artifact:
r = artifact_object.load(ignore_inheritance = ignore_inheritance)
if r['return']>0: return r
meta = artifact_object.meta
# Index
if self.cmind.use_index or force_index_add:
r=self.cmind.index.add(meta, path_artifact,
(repo.meta['alias'],
repo.meta['uid']))
# Ignore index output to continue working if issues
# Check if tags match
if not utils.tags_matched(meta.get('tags',[]), and_tags, no_tags):
continue
lst.append(artifact_object)
# Output to console if forced
if console:
print (artifact_object.path)
if first_artifact:
first_artifact = False
return {'return':0, 'list':lst}
############################################################
[docs]
def add(self, i):
"""
Add CM artifact
Args:
(CM input dict):
(out) (str): if 'con', output to console
parsed_automation (list): prepared in CM CLI or CM access function
[ (automation alias, automation UID) ] or
[ (automation alias, automation UID), (automation repo alias, automation repo UID) ]
parsed_artifact (list): prepared in CM CLI or CM access function
[ (artifact alias, artifact UID) ] or
[ (artifact alias, artifact UID), (artifact repo alias, artifact repo UID) ]
meta (dict): meta description of this artifact
(tags) (string or list): tags to be added to meta
(new_tags) (string or list): new tags to be added to meta (the same as tags)
(yaml) (bool): if True, record YAML instead of JSON
Returns:
(CM return dict):
* return (int): return code == 0 if no error and >0 if error
* (error) (str): error string if return>0
* meta (dict): final meta of the artifact
* path (str): path to the created artifact
"""
console = i.get('out') == 'con'
# Get parsed automation
if 'parsed_automation' not in i:
return {'return':1, 'error':'automation is not specified'}
parsed_automation = i.get('parsed_automation',[])
automation_name = parsed_automation[0] if len(parsed_automation)>0 else ('','')
# Get parsed artifact
parsed_artifact = i.get('parsed_artifact',[])
artifact_obj = parsed_artifact[0] if len(parsed_artifact)>0 else ('','')
artifact_repo = parsed_artifact[1] if len(parsed_artifact)>1 else None
# If artifact repo is not defined, use "local" (like scratchpad)
if artifact_repo == None:
artifact_repo = (self.cmind.cfg['local_repo_meta']['alias'],
self.cmind.cfg['local_repo_meta']['uid'])
# Find repository
r = self.cmind.access({'automation':'repo',
'action':'find',
'artifact':artifact_repo[0]+','+artifact_repo[1]})
if r['return']>0: return r
lst = r['list']
if len(lst)==0:
return {'return':16, 'error':'repository {} not found'.format(artifact_repo)}
if len(lst)>1:
return {'return':1, 'error':'more than 1 repository found (ambiguity)'}
repo = lst[0]
repo_path = repo.path_with_prefix
# Check automation (if exists)
automation_path = os.path.join(repo_path, automation_name[0]) if automation_name[0]!='' else os.path.join(repo_path, automation_name[1])
if not os.path.isdir(automation_path):
os.makedirs(automation_path)
# Check object UID
if artifact_obj[1]=='' or artifact_obj[1] is None:
r=utils.gen_uid()
if r['return']>0: return r
artifact_obj=(artifact_obj[0],r['uid'])
obj_path = os.path.join(automation_path, artifact_obj[0]) if artifact_obj[0]!='' else os.path.join(automation_path, artifact_obj[1])
meta_path_yaml = os.path.join(obj_path, self.cmind.cfg['file_cmeta']+'.yaml')
meta_path_json = os.path.join(obj_path, self.cmind.cfg['file_cmeta']+'.json')
# Check if artifact meta exists - then object already exists
if os.path.isfile(meta_path_yaml) or os.path.isfile(meta_path_json):
return {'return':8, 'error':'artifact already exists in the path {}'.format(obj_path)}
if not os.path.isdir(obj_path):
os.makedirs(obj_path)
# Prepare meta
meta = i.get('meta',{})
tags = utils.get_list_from_cli(i, key='tags')
if meta.get('alias','')=='': meta['alias']=artifact_obj[0]
if meta.get('uid','')=='': meta['uid']=artifact_obj[1]
if meta.get('automation_alias','')=='': meta['automation_alias']=automation_name[0]
if meta.get('automation_uid','')=='': meta['automation_uid']=automation_name[1]
existing_tags = meta.get('tags',[])
new_tags_list = []
if i.get('new_tags','')!='':
new_tags_list = i['new_tags'].split(',')
if len(tags)>0:
new_tags_list += tags
if len(new_tags_list)>0:
for xtag in new_tags_list:
if xtag!='' and xtag not in existing_tags:
existing_tags.append(xtag)
meta['tags'] = existing_tags
# Record meta
if i.get('yaml', False):
r = utils.save_yaml(meta_path_yaml, meta=meta)
else:
r = utils.save_json(meta_path_json, meta=meta)
if r['return']>0: return r
# Index
if self.cmind.use_index:
r=self.cmind.index.add(meta, obj_path,
(repo.meta['alias'], repo.meta['uid']),
update = True)
# Ignore index output to continue working if issues
if console:
print ('Added CM object at {}'.format(obj_path))
return {'return':0, 'meta':meta, 'path':obj_path}
############################################################
[docs]
def delete(self, i):
"""
Delete CM artifact
Args:
(CM input dict):
(out) (str): if 'con', output to console
parsed_automation (list): prepared in CM CLI or CM access function
[ (automation alias, automation UID) ] or
[ (automation alias, automation UID), (automation repo alias, automation repo UID) ]
parsed_artifact (list): prepared in CM CLI or CM access function
[ (artifact alias, artifact UID) ] or
[ (artifact alias, artifact UID), (artifact repo alias, artifact repo UID) ]
(tags) (string or list): optional tags to find artifacts
(force) (bool): force deleting CM artifacts without asking a user
(f) (bool): the same as "force"
Returns:
(CM return dict):
* return (int): return code == 0 if no error and >0 if error
* (error) (str): error string if return>0
* deleted_list (list): a list of deleted CM artifacts
"""
# Check parsed automation
if 'parsed_automation' not in i:
return {'return':1, 'error':'automation is not specified'}
import shutil
console = i.get('out') == 'con'
force = True if i.get('f', False) or i.get('force',False) else False
# Find CM artifact(s)
i['out']=None
r = self.search(i)
if r['return']>0: return r
lst = r['list']
if len(lst)==0:
return {'return':16, 'error':'artifact(s) not found'}
deleted_lst = []
for artifact in lst:
path_to_artifact = artifact.path
if console:
tags = artifact.meta.get('tags',[])
x = '' if len(tags)=='' else ' with tags "{}"'.format(','.join(tags))
print ('Deleting CM artifact in {}{} ...'.format(path_to_artifact, x))
if not force:
ask = input(' Are you sure you want to delete this artifact (y/N): ')
ask = ask.strip().lower()
if ask!='y':
print (' Skipped!')
continue
elif not force:
# If not console mode skip unless forced
continue
# Deleting artifact
deleted_lst.append(artifact)
if os.name == 'nt':
# To be able to remove .git files
shutil.rmtree(path_to_artifact, onerror = utils.rm_read_only)
# shutil.rmtree(path_to_artifact, ignore_errors = False, onerror = delete_helper)
else:
shutil.rmtree(path_to_artifact)
# Index
if self.cmind.use_index:
r=self.cmind.index.add(artifact.meta, artifact.path,
(artifact.repo_meta['alias'], artifact.repo_meta['uid']),
update = True, delete = True)
# Ignore index output to continue working if issues
if console:
print (' Deleted!')
return {'return':0, 'deleted_list':deleted_lst}
############################################################
[docs]
def load(self, i):
"""
Load CM artifact
Args:
(CM input dict):
(out) (str): if 'con', output to console
parsed_automation (list): prepared in CM CLI or CM access function
[ (automation alias, automation UID) ] or
[ (automation alias, automation UID), (automation repo alias, automation repo UID) ]
parsed_artifact (list): prepared in CM CLI or CM access function
[ (artifact alias, artifact UID) ] or
[ (artifact alias, artifact UID), (artifact repo alias, artifact repo UID) ]
(tags) (string or list): optional tags to find artifacts
Returns:
(CM return dict):
* return (int): return code == 0 if no error and >0 if error
* (error) (str): error string if return>0
* path (str): path to a loaded CM artifact
* original_meta (dict): meta description of the loaded CM artifact before inheritance
* meta (dict): meta description of the loaded CM artifact after inheritance
* artifact (CM artifact object)
"""
# Check parsed automation
if 'parsed_automation' not in i:
return {'return':1, 'error':'automation is not specified'}
console = i.get('out') == 'con'
# Find CM artifact(s)
i['out'] = None
r = self.search(i)
if r['return']>0: return r
lst = r['list']
if len(lst)==0:
return {'return':16, 'error':'artifact not found: {}'.format(i)}
elif len(lst)>1:
return {'return':1, 'error':'more than 1 artifact found', 'list':lst}
artifact = lst[0]
path = artifact.path
meta = artifact.meta
original_meta = artifact.original_meta
# Output if console
if console:
if str(i.get('yaml','')).lower() in ['true','yes']:
import yaml
print (yaml.dump(meta))
else:
import json
print (json.dumps(meta, indent=2, sort_keys=True))
return {'return':0, 'path':path, 'meta':meta, 'original_meta':original_meta, 'artifact':artifact}
############################################################
[docs]
def update(self, i):
"""
Update CM artifact.
Note: not thread safe - we expect one pipeline running on a system
can make it thread safe when needed (similar to CK)
Args:
(CM input dict):
(out) (str): if 'con', output to console
parsed_automation (list): prepared in CM CLI or CM access function
[ (automation alias, automation UID) ] or
[ (automation alias, automation UID), (automation repo alias, automation repo UID) ]
parsed_artifact (list): prepared in CM CLI or CM access function
[ (artifact alias, artifact UID) ] or
[ (artifact alias, artifact UID), (artifact repo alias, artifact repo UID) ]
(search_tags) (string or list): optional tags to find artifacts
(meta) (dict): new meta description to be merged or to replace the original meta description of a CM artifact
(tags) (string or list): tags to be added to meta
(replace) (bool): if True, replace original meta description (False by default)
(force) (bool): if True, force updates if more than 1 artifact found (False by default)
(replace_lists) (bool): if True, replace lists in meta description rather than appending them (False by default)
Returns:
(CM return dict):
* return (int): return code == 0 if no error and >0 if error
* (error) (str): error string if return>0
* list (list): list of updated CM artifacts
"""
console = i.get('out') == 'con'
meta = i.get('meta',{})
tags = utils.get_list_from_cli(i, key='tags')
replace = i.get('replace', False)
replace_lists = i.get('replace_lists', False)
# Find CM artifact(s)
ii=utils.sub_input(i, self.cmind.cfg['artifact_keys'])
if 'search_tags' in i: ii['tags']=i['search_tags']
ii['out'] = None
r = self.search(ii)
if r['return']>0: return r
lst = r['list']
if len(lst)==0:
# Attempt to add if doesn't exist
r = self.add(i)
if r['return']>0: return r
# Load this artifact
r = self.load(i)
if r['return']>0: return r
lst = [r['artifact']]
return {'return':0, 'list':lst}
# If more than one ask if update all or force
if len(lst)>1:
force = True if i.get('f', False) or i.get('force',False) else False
if not force:
ask = input('More than 1 artifact found. Are you sure you want to update them all (y/N): ')
ask = ask.strip().lower()
if ask!='y':
return {'return':1, 'error':'more than 1 artifact found', 'list':lst}
print ('')
# Updating artifacts
for artifact in lst:
# Check if need to update tags
meta_tags = []
if not replace and not replace_lists:
meta_tags = artifact.meta.get('tags',[])
if len(tags)>0:
for t in tags:
if t not in meta_tags:
meta_tags.append(t)
r = artifact.update(meta, replace = replace, append_lists = not replace_lists, tags = meta_tags)
# Index
if self.cmind.use_index:
r=self.cmind.index.add(artifact.meta, artifact.path,
(artifact.repo_meta['alias'], artifact.repo_meta['uid']),
update = True)
# Ignore index output to continue working if issues
# Output if console
if console:
print ('Updated {}'.format(artifact.path))
return {'return':0, 'list':lst}
############################################################
[docs]
def move(self, i):
"""
Rename CM artifacts and/or move them to another CM repository.
Args:
(CM input dict):
(out) (str): if 'con', output to console
parsed_automation (list): prepared in CM CLI or CM access function
[ (automation alias, automation UID) ] or
[ (automation alias, automation UID), (automation repo alias, automation repo UID) ]
parsed_artifact (list): prepared in CM CLI or CM access function
[ (artifact alias, artifact UID) ] or
[ (artifact alias, artifact UID), (artifact repo alias, artifact repo UID) ]
parsed_artifacts (list): prepared in CM CLI or CM access function - 1st entry has a new artifact name and repository:
[ (artifact alias, artifact UID) ] or
[ (artifact alias, artifact UID), (artifact repo alias, artifact repo UID) ]
Returns:
(CM return dict):
* return (int): return code == 0 if no error and >0 if error
* (error) (str): error string if return>0
* list (list): list of renamed/moved CM artifacts
"""
import shutil
console = i.get('out') == 'con'
parsed_artifacts = i.get('parsed_artifacts',[])
if len(parsed_artifacts)==0:
return {'return':1, 'error':'artifact is not specified'}
elif len(parsed_artifacts)>1:
return {'return':1, 'error':'more than 1 artifact specified'}
# Find CM artifact(s)
i['out'] = None
r = self.search(i)
if r['return']>0: return r
lst = r['list']
if len(lst)==0:
return {'return':16, 'error':'artifact not found: {}'}
# Check target artifact
target_artifact = parsed_artifacts[0]
target_artifact_obj = target_artifact[0]
target_artifact_obj_alias = target_artifact_obj[0]
target_artifact_obj_uid = target_artifact_obj[1].lower()
# Check target repo
target_artifact_repo = target_artifact[1] if len(target_artifact)>1 else None
target_artifact_repo_obj = None
target_repo_path = None
if target_artifact_repo is not None:
r = self.cmind.access({'action':'search',
'automation':'repo',
'artifact': utils.assemble_cm_object2(target_artifact_repo)})
if r['return']>0: return r
target_repo_list = r['list']
if len(target_repo_list) == 0:
return {'return':1, 'error':'target repo "{}" not found'.format(target_artifact_repo)}
elif len(target_repo_list) >1:
return {'return':1, 'error':'more than 1 target repo found "{}"'.format(target_artifact_repo)}
target_artifact_repo_obj = target_repo_list[0]
target_repo_path = os.path.abspath(target_artifact_repo_obj.path_with_prefix)
# Updating artifacts
for artifact in lst:
# Index
if self.cmind.use_index:
r=self.cmind.index.add(artifact.meta, artifact.path,
(artifact.repo_meta['alias'], artifact.repo_meta['uid']),
update = True, delete = True)
# Ignore index output to continue working if issues
artifact_path = os.path.abspath(artifact.path)
artifact_meta = artifact.original_meta
artifact_alias = artifact_meta.get('alias','')
artifact_uid = artifact_meta.get('uid','')
artifact_dir_name = os.path.basename(artifact_path)
artifact_automation_dir = os.path.dirname(artifact_path)
artifact_automation = os.path.basename(artifact_automation_dir)
must_update_meta = False
# Prepare new path
new_name = artifact_dir_name
if target_artifact_obj_alias != '':
new_name = target_artifact_obj_alias
# Use either new repo or the repo of the original artifact
new_artifact_path = os.path.join(target_repo_path, artifact_automation, new_name) if target_repo_path is not None \
else os.path.join(artifact_automation_dir, new_name)
# Check if need to update meta
if target_artifact_obj_alias != '' and artifact_alias.lower() != target_artifact_obj_alias.lower():
must_update_meta = True
artifact_meta['alias']=target_artifact_obj_alias
if target_artifact_obj_uid != '' and artifact_uid.lower() != target_artifact_obj_uid:
must_update_meta = True
artifact_meta['uid']=target_artifact_obj_uid
artifact.path = new_artifact_path
# Move
if artifact_path != new_artifact_path:
if console:
print ('* Moving "{}" to'.format(artifact_path))
print (' "{}"'.format(new_artifact_path))
if os.path.isdir(new_artifact_path):
return {'return':1, 'error':'target artifact (directory) already exists'}
shutil.move(artifact_path, new_artifact_path)
# Update meta
if must_update_meta:
if console:
print ('- Updating meta in "{}"'.format(new_artifact_path))
# If only yaml, update yaml and not json
meta_path_yaml = os.path.join(new_artifact_path, self.cmind.cfg['file_cmeta']+'.yaml')
meta_path_json = os.path.join(new_artifact_path, self.cmind.cfg['file_cmeta']+'.json')
if os.path.isfile(meta_path_yaml) and not os.path.isfile(meta_path_json):
update_meta={'alias':artifact_meta['alias'],
'uid':artifact_meta['uid']}
r = utils.update_yaml(meta_path_yaml, update_meta)
if r['return']>0: return r
else:
r = artifact.update({})
if r['return'] >0: return r
# Index
if self.cmind.use_index:
if target_artifact_repo_obj is not None:
tmp_repo_index = (target_artifact_repo_obj.meta['alias'],
target_artifact_repo_obj.meta['uid'])
else:
tmp_repo_index = (artifact.repo_meta['alias'],
artifact.repo_meta['uid'])
r=self.cmind.index.add(artifact.meta, artifact.path,
tmp_repo_index, update = True)
# Ignore index output to continue working if issues
return {'return':0, 'list':lst}
############################################################
[docs]
def copy(self, i):
"""
Copy CM artifact(s) either to a new artifact or to a new repository
Args:
(CM input dict):
(out) (str): if 'con', output to console
parsed_automation (list): prepared in CM CLI or CM access function
[ (automation alias, automation UID) ] or
[ (automation alias, automation UID), (automation repo alias, automation repo UID) ]
parsed_artifact (list): prepared in CM CLI or CM access function
[ (artifact alias, artifact UID) ] or
[ (artifact alias, artifact UID), (artifact repo alias, artifact repo UID) ]
parsed_artifacts (list): prepared in CM CLI or CM access function - 1st entry has a new artifact name and repository:
[ (artifact alias, artifact UID) ] or
[ (artifact alias, artifact UID), (artifact repo alias, artifact repo UID) ]
new_tags (string): add new tags (separated by comma) to new artifacts
Returns:
(CM return dict):
* return (int): return code == 0 if no error and >0 if error
* (error) (str): error string if return>0
* list (list): list of copied CM artifacts
"""
import shutil
console = i.get('out') == 'con'
parsed_artifacts = i.get('parsed_artifacts',[])
if len(parsed_artifacts)==0:
return {'return':1, 'error':'artifact is not specified'}
elif len(parsed_artifacts)>1:
return {'return':1, 'error':'more than 1 artifact specified'}
# Find CM artifact(s)
i['out'] = None
r = self.search(i)
if r['return']>0: return r
lst = r['list']
if len(lst)==0:
return {'return':16, 'error':'artifact(s) not found: {}'}
# Check target artifact
target_artifact = parsed_artifacts[0]
target_artifact_obj = target_artifact[0]
target_artifact_obj_alias = target_artifact_obj[0]
target_artifact_obj_uid = target_artifact_obj[1].lower()
# Check target repo
target_artifact_repo = target_artifact[1] if len(target_artifact)>1 else None
target_artifact_repo_obj = None
target_repo_path = None
if target_artifact_repo is not None:
r = self.cmind.access({'action':'search',
'automation':'repo',
'artifact': utils.assemble_cm_object2(target_artifact_repo)})
if r['return']>0: return r
target_repo_list = r['list']
if len(target_repo_list) == 0:
return {'return':1, 'error':'target repo "{}" not found'.format(target_artifact_repo)}
elif len(target_repo_list) >1:
return {'return':1, 'error':'more than 1 target repo found "{}"'.format(target_artifact_repo)}
target_artifact_repo_obj = target_repo_list[0]
target_repo_path = os.path.abspath(target_artifact_repo_obj.path_with_prefix)
# Updating artifacts
new_tags_list = []
if i.get('new_tags','')!='':
new_tags_list = i['new_tags'].split(',')
for artifact in lst:
artifact_path = os.path.abspath(artifact.path)
artifact_meta = artifact.original_meta
artifact_alias = artifact_meta.get('alias','')
r=utils.gen_uid()
if r['return']>0: return r
target_artifact_obj_uid=r['uid']
artifact_dir_name = os.path.basename(artifact_path)
artifact_automation_dir = os.path.dirname(artifact_path)
artifact_automation = os.path.basename(artifact_automation_dir)
# Prepare new path
new_name = artifact_dir_name
if target_artifact_obj_alias != '':
new_name = target_artifact_obj_alias
# Use either new repo or the repo of the original artifact
new_artifact_path = os.path.join(target_repo_path, artifact_automation, new_name) if target_repo_path is not None \
else os.path.join(artifact_automation_dir, new_name)
if target_artifact_obj_alias != '' and artifact_alias.lower() != target_artifact_obj_alias.lower():
artifact_meta['alias']=target_artifact_obj_alias
artifact_meta['uid']=target_artifact_obj_uid
if len(new_tags_list)>0:
xtags = artifact_meta.get('tags',[])
for xtag in new_tags_list:
if xtag!='' and xtag not in xtags:
xtags.append(xtag)
artifact_meta['tags'] = xtags
artifact.path = new_artifact_path
# Copy
if artifact_path != new_artifact_path:
if console:
print ('* Copying "{}" to'.format(artifact_path))
print (' "{}"'.format(new_artifact_path))
if os.path.isdir(new_artifact_path):
return {'return':1, 'error':'target artifact (directory) already exists'}
shutil.copytree(artifact_path, new_artifact_path)
# Update meta
if console:
print ('- Updating meta in "{}"'.format(new_artifact_path))
# If only yaml, update yaml and not json
meta_path_yaml = os.path.join(new_artifact_path, self.cmind.cfg['file_cmeta']+'.yaml')
meta_path_json = os.path.join(new_artifact_path, self.cmind.cfg['file_cmeta']+'.json')
if os.path.isfile(meta_path_yaml) and not os.path.isfile(meta_path_json):
update_meta={'alias':artifact_meta['alias'],
'uid':artifact_meta['uid']}
r = utils.update_yaml(meta_path_yaml, update_meta)
if r['return']>0: return r
else:
r = artifact.update({})
if r['return'] >0: return r
# Index added artifact
if self.cmind.use_index:
# Old
if target_artifact_repo_obj is not None:
tmp_repo_index = (target_artifact_repo_obj.meta['alias'],
target_artifact_repo_obj.meta['uid'])
else:
tmp_repo_index = (artifact.repo_meta['alias'],
artifact.repo_meta['uid'])
r=self.cmind.index.add(artifact.meta, artifact.path,
tmp_repo_index, update = True)
# Ignore index output to continue working if issues
return {'return':0, 'list':lst}
############################################################
[docs]
def info(self, i):
"""
Get info about artifacts
Args:
(CM input dict):
(out) (str): if 'con', output to console
parsed_automation (list): prepared in CM CLI or CM access function
[ (automation alias, automation UID) ] or
[ (automation alias, automation UID), (automation repo alias, automation repo UID) ]
parsed_artifact (list): prepared in CM CLI or CM access function
[ (artifact alias, artifact UID) ] or
[ (artifact alias, artifact UID), (artifact repo alias, artifact repo UID) ]
parsed_artifacts (list): prepared in CM CLI or CM access function - 1st entry has a new artifact name and repository:
[ (artifact alias, artifact UID) ] or
[ (artifact alias, artifact UID), (artifact repo alias, artifact repo UID) ]
(uid) (bool): if True, copy CID in UID::UID format to clipboard
(md) (bool): if True, copy CID in [](UID::UID) format to clipboard
Returns:
(CM return dict):
* return (int): return code == 0 if no error and >0 if error
* (error) (str): error string if return>0
* list (list): list of CM artifacts
"""
# Check parsed automation
if 'parsed_automation' not in i:
return {'return':1, 'error':'automation is not specified'}
console = i.get('out') == 'con'
# Find CM artifact(s)
i['out'] = None
r = self.search(i)
if r['return']>0: return r
lst = r['list']
if len(lst)==0:
return {'return':16, 'error':'artifact not found: {}'.format(i)}
cid1 = ''
for artifact in lst:
if console and len(lst)>1:
print ('****************************************************')
path = artifact.path
meta = artifact.meta
original_meta = artifact.original_meta
alias = meta.get('alias','')
uid = meta.get('uid','')
automation_alias = meta.get('automation_alias','')
automation_uid = meta.get('automation_uid','')
x = utils.assemble_cm_object(alias,uid)
if ' ' in x: x = '"' + x + '"'
cid = utils.assemble_cm_object(automation_alias,automation_uid) + \
'::' + \
x
cid1 = automation_uid+'::'+uid
# cid_user_friendly = automation_alias if automation_alias != '' else automation_uid
# cid_user_friendly += '::' + (alias if alias != '' else uid)
# cid_misc = automation_alias if automation_alias != '' else automation_uid
# cid_misc += '::' + uid
# cid = automation_uid + '::' + uid
# Output if console
full_cid = '(' + cid + ')'
if console:
print ('CID = ' + cid)
print ('CID1 = ' + cid1)
print ('CID2 = ' + full_cid)
print ('')
print ('Path = ' + path)
p_dirs=[]
p_files=[]
for p in os.listdir(path):
p_dirs.append(p) if os.path.isdir(os.path.join(path,p)) else p_files.append(p)
if len(p_dirs)>0 or len(p_files)>0:
for x in [('Directories',p_dirs),
('Files',p_files)]:
x0 = x[0]
x1 = x[1]
if len(x1)>0:
print ('')
print (' '+x0+':')
for p in sorted(x1):
print (' ' + p)
# Attempt to copy to clipboard the last CID
if cid1 !='':
clipboard = full_cid
if i.get('uid', False): clipboard = cid1
if i.get('md', False): clipboard = '[]'+full_cid
r = utils.copy_to_clipboard({'string':clipboard, 'skip_fail':True})
# Skip output
return {'return':0, 'list': lst}
#############################################################
#def delete_helper(func, path, ret):
# import stat, errno
#
# if ret[1].errno != errno.EACCES:
# raise
# else:
# clean_attr = stat.S_IRWXG | stat.S_IRWXO | stat.S_IRWXU
# os.chmod(path, clean_attr)
# func(path)
#
# return