bug fix with registry

pull/1/head
Steve Nyemba 6 months ago
parent e3efc70c01
commit 570f2294b9

@ -1,6 +1,6 @@
__app_name__ = 'data-transport'
__author__ = 'The Phi Technology'
__version__= '2.4.28'
__version__= '2.4.30'
__edition__= 'enterprise'
__email__ = "info@the-phi.com"
__license__=f"""

@ -6,14 +6,18 @@ import transport
import importlib
import importlib.util
import shutil
from io import StringIO
"""
This class manages data from the registry and allows (read only)
@TODO: add property to the DATA attribute
"""
REGISTRY_PATH=os.sep.join([os.environ.get('HOME','USERPROFILE'),'.data-transport'])
if 'HOME' in os.environ :
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
else:
REGISTRY_PATH=os.sep.join([os.environ['USERPROFILE'],'.data-transport'])
#
# This path can be overriden by an environment variable ...
#
@ -21,161 +25,6 @@ if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH']
REGISTRY_FILE= 'transport-registry.json'
DATA = {}
# class plugins:
# #
# # This is a utility function that should enable management of plugins-registry
# # The class allows to add/remove elements
# #
# # @TODO: add read/write properties to the class (better design practice)
# #
# _data = {}
# FOLDER = os.sep.join([REGISTRY_PATH,'plugins'])
# CODE = os.sep.join([REGISTRY_PATH,'plugins','code'])
# FILE = os.sep.join([REGISTRY_PATH,'plugin-registry.json'])
# @staticmethod
# def init():
# if not os.path.exists(plugins.FOLDER) :
# os.makedirs(plugins.FOLDER)
# if not os.path.exists(plugins.CODE):
# os.makedirs(plugins.CODE)
# if not os.path.exists(plugins.FILE):
# f = open(plugins.FILE,'w')
# f.write("{}")
# f.close()
# plugins._read() #-- will load data as a side effect
# @staticmethod
# def copy (path) :
# shutil.copy2(path,plugins.CODE)
# @staticmethod
# def _read ():
# f = open(plugins.FILE)
# try:
# _data = json.loads(f.read())
# f.close()
# except Exception as e:
# print (f"Corrupted registry, resetting ...")
# _data = {}
# plugins._write(_data)
# plugins._data = _data
# @staticmethod
# def _write (_data):
# f = open(plugins.FILE,'w')
# f.write(json.dumps(_data))
# f.close()
# plugins._data = _data
# @staticmethod
# def inspect (_path):
# _names = []
# if os.path.exists(_path) :
# _filename = _path.split(os.sep)[-1]
# spec = importlib.util.spec_from_file_location(_filename, _path)
# module = importlib.util.module_from_spec(spec)
# spec.loader.exec_module(module)
# # _names = [{'name':getattr(getattr(module,_name),'name'),'pointer':getattr(module,_name)} for _name in dir(module) if type( getattr(module,_name)).__name__ == 'function']
# for _name in dir(module) :
# _pointer = getattr(module,_name)
# if hasattr(_pointer,'transport') :
# _item = {'real_name':_name,'name':getattr(_pointer,'name'),'pointer':_pointer,'version':getattr(_pointer,'version')}
# _names.append(_item)
# return _names
# @staticmethod
# def add (alias,path):
# """
# Add overwrite the registry entries
# """
# _names = plugins.inspect (path)
# _log = []
# if _names :
# #
# # We should make sure we have all the plugins with the attributes (transport,name) set
# _names = [_item for _item in _names if hasattr(_item['pointer'],'transport') ]
# if _names :
# plugins.copy(path)
# _content = []
# for _item in _names :
# _key = '@'.join([alias,_item['name']])
# _log.append(_item['name'])
# #
# # Let us update the registry
# #
# plugins.update(alias,path,_log)
# return _log
# @staticmethod
# def update (alias,path,_log) :
# """
# updating the registry entries of the plugins (management data)
# """
# # f = open(plugins.FILE)
# # _data = json.loads(f.read())
# # f.close()
# _data = plugins._data
# # _log = plugins.add(alias,path)
# if _log :
# _data[alias] = {'content':_log,'name':path.split(os.sep)[-1]}
# plugins._write(_data) #-- will update data as a side effect
# return _log
# @staticmethod
# def get(**_args) :
# # f = open(plugins.FILE)
# # _data = json.loads(f.read())
# # f.close()
# # if 'key' in _args :
# # alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
# # else :
# # alias = _args['alias']
# # name = _args['name']
# # if alias in _data :
# # _path = os.sep.join([plugins.CODE,_data[alias]['name']])
# # _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
# # _item = _item[0] if _item else None
# # if _item :
# # return _item['pointer']
# # return None
# _item = plugins.has(**_args)
# return _item['pointer'] if _item else None
# @staticmethod
# def has (**_args):
# f = open(plugins.FILE)
# _data = json.loads(f.read())
# f.close()
# if 'key' in _args :
# alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
# else :
# alias = _args['alias']
# name = _args['name']
# if alias in _data :
# _path = os.sep.join([plugins.CODE,_data[alias]['name']])
# _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
# _item = _item[0] if _item else None
# if _item :
# return copy.copy(_item)
# return None
# @staticmethod
# def synch():
# pass
def isloaded ():
return DATA not in [{},None]
@ -233,8 +82,11 @@ def set (label, auth_file, default=False,path=REGISTRY_PATH) :
if label == 'default' :
raise Exception ("""Invalid label name provided, please change the label name and use the switch""")
reg_file = os.sep.join([path,REGISTRY_FILE])
if os.path.exists (auth_file) and os.path.exists(path) and os.path.exists(reg_file):
f = open(auth_file)
if os.path.exists(path) and os.path.exists(reg_file):
if type(auth_file) == str and os.path.exists (auth_file) :
f = open(auth_file)
elif type(auth_file) == StringIO:
f = auth_file
_info = json.loads(f.read())
f.close()
f = open(reg_file)

Loading…
Cancel
Save