Source code for cmind.repos

# Collective Mind repositories
#
# Written by Grigori Fursin

import os

from cmind.repo import Repo
from cmind import utils

[docs] class Repos: """ CM repositories class """ def __init__(self, path, cfg, path_to_internal_repo = '', cmx = False): """ Initialize CM repositories class Args: path (str): path to directory with repos.json where all paths of registered CM repositories are stored cfg (dict): CM configuration (path_to_internal_repo) (str): path to the internal CM repository (useful during development/debugging) Returns: (python class) with the following vars: * path (str): path to CM repositories and index file * full_path_to_repos (str): path to CM repositories * path_to_internal_repo (str): path to the internal CM repository * full_path_to_repo_paths (str): full path to repos.json * lst (list): list of initialized CM repository classes """ self.path = path self.cfg = cfg self.full_path_to_repos = '' # Paths to CM repos self.paths = [] # List of initialized repositories self.lst = [] # Potential path to internal repo self.path_to_internal_repo = path_to_internal_repo self.full_path_to_repo_paths = '' self.extra_info = {} self.cmx = cmx ############################################################
[docs] def load(self, init = False): """ Load or initialize repos.json file with repositories Args: init (bool): if False do not init individual CM repositories Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 """ # If reload after updating repos if init: self.paths = [] self.lst = [] # Check if home directory exists. Create it otherwise. if not os.path.isdir(self.path): os.makedirs(self.path) # Check repos holder full_path_to_repos = os.path.join(self.path, self.cfg['dir_repos']) self.full_path_to_repos = full_path_to_repos # If placeholder for repos doesn't exist, create it if not os.path.isdir(full_path_to_repos): os.makedirs(full_path_to_repos) # Search if there is a file with repos full_path_to_repo_paths = os.path.join(self.path, self.cfg['file_repos']) self.full_path_to_repo_paths = full_path_to_repo_paths if os.path.isfile(full_path_to_repo_paths): r = utils.load_json(full_path_to_repo_paths) if r['return']>0: return r self.paths = r['meta'] else: r = utils.save_json(full_path_to_repo_paths, meta = self.paths) if r['return']>0: return r # Check internal repo (will be after local) if self.path_to_internal_repo != '' and os.path.isdir(self.path_to_internal_repo): self.paths.insert(0, self.path_to_internal_repo) # Prepare path for local repo (will be the first in the search as a local scratchpad) path_local_repo = os.path.join(full_path_to_repos, self.cfg['local_repo_name']) if not os.path.isdir(path_local_repo): os.makedirs(path_local_repo) path_local_repo_meta = os.path.join(path_local_repo, self.cfg['file_meta_repo']+'.yaml') if not os.path.isfile(path_local_repo_meta): r = utils.save_yaml(path_local_repo_meta, meta = self.cfg['local_repo_meta']) if r['return']>0: return r if path_local_repo not in self.paths: self.paths.insert(0, path_local_repo) # Check that repository exists and load meta description checked_self_paths = [] for path_to_repo in self.paths: # First try concatenated path and then full path (if imported) found = False for full_path_to_repo in [os.path.join(full_path_to_repos, path_to_repo), path_to_repo]: if os.path.isdir(full_path_to_repo): # Load description repo = Repo(full_path_to_repo, self.cfg) r = repo.load(cmx = self.cmx) if r['return']>0 and r['return']!=16: return r # Load only if desc exists if r['return']!=16: # Set only after all initializations self.lst.append(repo) repo_uid = repo.meta['uid'] if repo_uid!='': self.extra_info[repo_uid]=repo repo_alias = repo.meta['alias'] if repo_alias!='': self.extra_info[repo_alias]=repo found = True break # Repo path exists but repo itself doesn't exist - fail if found: checked_self_paths.append(path_to_repo) else: print ('WARNING: repository path {} not found (check file {})'.format(path_to_repo, full_path_to_repo_paths)) # Save with correct paths if len(checked_self_paths) != len(self.paths): skip_fix_paths = os.environ.get('CM_CORE_SKIP_FIX_REPO_PATH','').strip().lower() if skip_fix_paths not in ['1', 'true', 'yes']: import copy self.paths = copy.deepcopy(checked_self_paths) if self.path_to_internal_repo in checked_self_paths: checked_self_paths.remove(self.path_to_internal_repo) print ('WARNING: fixed repo list file {}'.format(full_path_to_repo_paths)) r = utils.save_json(full_path_to_repo_paths, meta = checked_self_paths) if r['return']>0: return r return {'return':0}
############################################################
[docs] def process(self, repo_path, mode='add'): """ Add or delete CM repository Args: repo_path (str): path to CM repository (mode) (str): "add" (default) or "delete" Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * (warnings) (list of str): warnings to install more CM repositories """ # Load clean file with repo paths r = utils.load_json(self.full_path_to_repo_paths) if r['return']>0: return r paths = r['meta'] modified = False warnings = [] if mode == 'add': if repo_path not in paths: # Load meta of the current repo path_to_repo_desc = os.path.join(repo_path, self.cfg['file_meta_repo']) r=utils.load_yaml_and_json(file_name_without_ext=path_to_repo_desc) if r['return']>0: return r meta = r['meta'] alias = meta.get('alias', '') uid = meta.get('uid', '') deps_on_other_repos = meta.get('deps', {}) # Check that no repos exist with the same alias and/or uid # (to avoid adding forks and original repos) for path in paths: path_to_existing_repo_desc = os.path.join(path, self.cfg['file_meta_repo']) r=utils.load_yaml_and_json(file_name_without_ext=path_to_existing_repo_desc) if r['return']>0: return r existing_meta = r['meta'] existing_alias = existing_meta.get('alias', '') existing_uid = existing_meta.get('uid', '') # Check if repository already exists under different name exist = False if alias != '' and existing_alias !='' and alias == existing_alias: exist = True if not exist and uid !='' and existing_uid !='' and uid == existing_uid: exist = True if exist: return {'return':1, 'error':'CM repository with the same alias "{}" and/or uid "{}" already exists in {}'.format(alias, uid, path)} # Check if there is a conflict if len(deps_on_other_repos)>0: for d in deps_on_other_repos: d_alias = d.get('alias', '') d_uid = d.get('uid', '') r = utils.match_objects(existing_uid, existing_alias, d_uid, d_alias) if r['return']>0: return r match = r['match'] if match: if d.get('conflict', False): return {'return':1, 'error':'Can\'t install this repository because it conflicts with the already installed one ({}) - you may need to remove it to proceed (cm rm repo {})'.format(d_alias,d_alias)} d['matched'] = True break # Check if has missing deps on other CM repos for d in deps_on_other_repos: if not d.get('conflict', False) and not d.get('matched', False): warnings.append('You must install extra CM repository: cm pull repo {}'.format(d['alias'])) paths.append(repo_path) modified = True elif mode == 'delete': new_paths = [] for p in paths: if p==repo_path: modified = True else: new_paths.append(p) paths=new_paths if modified: r = utils.save_json(self.full_path_to_repo_paths, meta = paths) if r['return']>0: return r # Reload repos self.load(init=True) rr = {'return':0} if len(warnings)>0: rr['warnings'] = warnings return rr
############################################################
[docs] def pull(self, alias, url = '', branch = '', checkout = '', console = False, desc = '', prefix = '', depth = None, path_to_repo = None, checkout_only = False, skip_zip_parent_dir = False, extra_cmd_git = '', extra_cmd_pip = '', new_branch = ''): """ Clone or pull CM repository Args: alias (str): CM repository alias (url) (str): Git repository URL (branch) (str): Git repository branch (new_branch) (str): Create new branch (checkout) (str): Git repository checkout (checkout_only) (bool): only checkout existing repo (depth) (int): Git repository depth (console) (bool): if True, print some info to console (desc) (str): optional repository description (prefix) (str): sub-directory to be used inside this CM repository to store artifacts (path_to_repo) (str): force path to repo (useful to pull imported repos with non-standard path) (checkout_only) (bool): only checkout Git repository but don't pull (skip_zip_parent_dir) (bool): skip parent dir in CM ZIP repo (useful when downloading CM repo archives from GitHub) (extra_cmd_git) (str): add this string to git clone (extra_cmd_pip) (str): add this string to pip install when installing requirements from CM repositories Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * (meta) (dict): meta of the CM repository * (warnings) (list of str): warnings to install more CM repositories """ # Prepare path if path_to_repo == None: path_to_repo = os.path.join(self.full_path_to_repos, alias) if console: print ('Local path: '+path_to_repo) print ('') # Check if repository already exists but corrupted path_to_repo_desc = os.path.join(path_to_repo, self.cfg['file_meta_repo']) r=utils.is_file_json_or_yaml(file_name = path_to_repo_desc) if r['return']>0: return r repo_desc_exists=r['is_file'] if os.path.isdir(path_to_repo) and not repo_desc_exists: print ('') print ('WARNING: directory {} already exists but without cmr.yaml - maybe clone or download was corrupted!'.format(path_to_repo)) x = input('Delete this repo (Y/n)? ') if x.strip().lower() not in ['n','no']: import shutil print ('') print ('Deleting {} ...'.format(path_to_repo)) shutil.rmtree(path_to_repo, onerror=utils.rm_read_only) print ('') cur_dir = os.getcwd() clone=False download=True if url.find('.zip')>0 else False if checkout_only: if not os.path.isdir(path_to_repo): return {'return':1, 'error':'Trying to checkout repo "{}" that was not pulled'.format(alias)} else: if download: # If CM repo already exists if os.path.isdir(path_to_repo): return {'return':1, 'error':'repository is already installed'} os.makedirs(path_to_repo) os.chdir(path_to_repo) cmd = 'wget --no-check-certificate "'+url+'" -O '+alias else: if os.path.isdir(path_to_repo): # Attempt to update os.chdir(path_to_repo) cmd = 'git pull' else: # Attempt to clone clone = True os.chdir(self.full_path_to_repos) cmd = 'git clone ' + url + ' ' + alias # Check if depth is set if depth != None and depth != '': cmd += ' --depth ' + str(depth) if extra_cmd_git !='' : cmd +=' ' + extra_cmd_git if console: print (cmd) print ('') r = os.system(cmd) if clone and not os.path.isdir(path_to_repo): return {'return':1, 'error':'repository was not cloned'} os.chdir(path_to_repo) if download and not checkout_only: import zipfile pack_file = os.path.join(path_to_repo, alias) # Attempt to read cmr.json repo_pack_file = open(pack_file, 'rb') repo_pack_zip = zipfile.ZipFile(repo_pack_file) repo_pack_desc = self.cfg['file_meta_repo'] files=repo_pack_zip.namelist() repo_path = path_to_repo if console: print ('Unpacking {} to {} ...'.format(pack_file, repo_path)) parent_dir = '' # Unpacking zip for f in files: if not f.startswith('..') and not f.startswith('/') and not f.startswith('\\'): if skip_zip_parent_dir and parent_dir == '': parent_dir = f ff = f[len(parent_dir):] if parent_dir != '' else f file_path = os.path.join(repo_path, ff) if f.endswith('/'): # create directory if not os.path.exists(file_path): os.makedirs(file_path) else: dir_name = os.path.dirname(file_path) if not os.path.exists(dir_name): os.makedirs(dir_name) # extract file file_out = open(file_path, 'wb') file_out.write(repo_pack_zip.read(f)) file_out.close() repo_pack_zip.close() repo_pack_file.close() # remove original file os.remove(pack_file) # Check if branch if new_branch != '': cmd = 'git checkout -b ' + new_branch if console: print ('') print (cmd) print ('') r = os.system(cmd) if r>0: return {'return':1, 'error':'creating new git branch failed'} if branch != '' or checkout != '': cmd = 'git checkout' # When checkout only, we do not need -b for branch extra_flag = ' ' if checkout_only else ' -b ' if branch != '': cmd = 'git fetch && git checkout ' + branch if checkout!='': cmd += ' ' + checkout if console: print ('') print (cmd) print ('') r = os.system(cmd) if r>0: return {'return':1, 'error':'git checkout for repository failed'} # Check if repo description exists r=utils.is_file_json_or_yaml(file_name = path_to_repo_desc) if r['return']>0: return r must_update_repo_desc = False if r['is_file']: # Load meta from the repository r=utils.load_yaml_and_json(file_name_without_ext=path_to_repo_desc) if r['return']>0: return r meta = r['meta'] else: # Prepare meta r=utils.gen_uid() if r['return']>0: return r repo_uid = r['uid'] meta = { 'uid': repo_uid, 'alias': alias, 'git':True, } if desc!='': meta['desc']=desc if prefix!='': meta['prefix']=prefix must_update_repo_desc = True # GF blocked the following code because if we create a fork of mlcommons@ck for example to ctuning@mlcommons-ck # and then pull ctuning@mlcommons-ck, it attempts to rewrite .cmr.yaml with the new alias which we do not want to do! # We want to keep whatever is in .cmr.yaml to avoid ambiguities ... # else: # # Load meta from the repository # r=utils.load_yaml_and_json(file_name_without_ext=path_to_repo_desc) # if r['return']>0: return r # # meta = r['meta'] # # # If alias forced by user is not the same as in meta, update it # # https://github.com/mlcommons/ck/issues/196 # # if alias != meta.get('alias',''): # meta['alias']=alias # # must_update_repo_desc = True if must_update_repo_desc: r=utils.save_yaml(path_to_repo_desc + '.yaml', meta=meta) if r['return']>0: return r # Check path to repo with prefix path_to_repo_with_prefix = path_to_repo if prefix!='': path_to_repo_with_prefix = os.path.join(path_to_repo, prefix) if not os.path.isdir(path_to_repo_with_prefix): os.makedirs(path_to_repo_with_prefix) # Check min CM version requirement min_cm_version = meta.get('min_cm_version','').strip() if min_cm_version != '': from cmind import __version__ as current_cm_version comparison = utils.compare_versions(current_cm_version, min_cm_version) if comparison < 0: return {'return':1, 'error':'This repository requires CM version >= {} while current CM version is {} - please update using "pip install cmind -U"'.format(min_cm_version, current_cm_version)} # Get final alias alias = meta.get('alias', '') # Update repo list # TBD: make it more safe (reload and save) r = self.process(path_to_repo, 'add') if r['return']>0: return r warnings = r.get('warnings', []) # Check if need to install requirements install_python_requirements = meta.get('install_python_requirements', False) if install_python_requirements: import sys python_exec = sys.executable cmd = python_exec + ' -m pip install -r requirements.txt' if extra_cmd_pip !='' : cmd +=' ' + extra_cmd_pip if console: print ('') print (cmd) print ('') r = os.system(cmd) if r>0: return {'return':1, 'error':'pip install -r requirements failed for this CM repository'} # Go back to original directory os.chdir(cur_dir) if console: print ('') print ('CM alias for this repository: {}'.format(alias)) rr = {'return':0, 'meta':meta} if len(warnings)>0: rr['warnings'] = warnings return rr
############################################################
[docs] def init(self, alias, uid, path = '', console = False, desc = '', prefix = '', only_register = False): """ Init CM repository in a given path Args: alias (str): CM repository alias uid (str): CM repository UID (path) (str): local path to a given repository (otherwise use $HOME/CM/repos) (desc) (str): optional repository description (prefix) (str): sub-directory to be used inside this CM repository to store artifacts (only_register) (bool): if True, only register path in the CM index of repositories but do not recreate cmr.yaml Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 * meta (dict): meta of the CM repository * path_to_repo (str): path to repository * path_to_repo_desc (str): path to repository description * path_to_repo_with_prefix (str): path to repository with prefix (== path_to_repo if prefix == "") * (warnings) (list of str): warnings to install more CM repositories """ # Prepare path if uid == '': r=utils.gen_uid() if r['return']>0: return r uid = r['uid'] repo_name=alias if alias!='' else uid path_to_repo = os.path.join(self.full_path_to_repos, repo_name) if path=='' else path # Convert potentially relative path into an absolute path # For example, it's needed when we initialize repo in the current directory # or use . and .. path_to_repo = os.path.abspath(path_to_repo) if console: print ('Local path to the CM repository: '+path_to_repo) print ('') # If only register (description already exists), skip meta preparation path_to_repo_desc = os.path.join(path_to_repo, self.cfg['file_meta_repo']) if not only_register: if not os.path.isdir(path_to_repo): os.makedirs(path_to_repo) # Check if file already exists r=utils.is_file_json_or_yaml(file_name=path_to_repo_desc) if r['return'] >0: return r if r['is_file']: return {'return':1, 'error':'Repository description already exists in {}'.format(path_to_repo)} meta = { 'uid': uid, 'alias': alias, 'git':False, } if desc!='': meta['desc']=desc if prefix!='': meta['prefix']=prefix r=utils.save_yaml(path_to_repo_desc + '.yaml', meta=meta) if r['return']>0: return r # Check requirements.txt # 20241006: Moved the check for min cmind version to _cmr.yaml # path_to_requirements = os.path.join(path_to_repo, 'requirements.txt') # # if not os.path.isfile(path_to_requirements): # r = utils.save_txt(file_name = path_to_requirements, string = self.cfg['new_repo_requirements']) # if r['return']>0: return r else: r=utils.load_yaml_and_json(file_name_without_ext=path_to_repo_desc) if r['return'] >0: return r meta = r['meta'] # Check if exist and create if not path_to_repo_with_prefix = path_to_repo if prefix!='': path_to_repo_with_prefix = os.path.join(path_to_repo, prefix) if not os.path.isdir(path_to_repo_with_prefix): os.makedirs(path_to_repo_with_prefix) # Update repo list # TBD: make it more safe (reload and save) r = self.process(path_to_repo, 'add') if r['return']>0: return r warnings = r.get('warnings', []) rr = {'return':0, 'meta':meta, 'path_to_repo': path_to_repo, 'path_to_repo_desc': path_to_repo_desc, 'path_to_repo_with_prefix': path_to_repo_with_prefix} if len(warnings)>0: rr['warnings'] = warnings return rr
############################################################
[docs] def delete(self, lst, remove_all = False, console = False, force = False): """ Delete CM repository or repositories with or without content Args: lst (list of CM repository classes): list of CM repositories remove_all (bool): if True, remove the content and unregister CM repository console (bool): if True, output some info to console force (bool): if True, do not ask questions Returns: (CM return dict): * return (int): return code == 0 if no error and >0 if error * (error) (str): error string if return>0 """ # Navigate through repos from the search function for repo in lst: # Prepare path path_to_repo = repo.path if console: print ('Local path to a CM repository: '+path_to_repo) if path_to_repo not in self.paths: return {'return':16, 'error':'repository not found in CM index - weird'} if console: if not force: ask = input(' Are you sure you want to delete this repository (y/N): ') ask = ask.strip().lower() if ask!='y': print (' Skipped!') continue # TBD: make it more safe (reload and save) r = self.process(path_to_repo, 'delete') if r['return']>0: return r # Check if remove all if remove_all: import shutil if console: print (' Deleting repository content ...') shutil.rmtree(path_to_repo, onerror=utils.rm_read_only) else: if console: print (' CM repository was unregistered from CM but its content was not deleted ...') return {'return':0}