adding plugin handler (enhancement)

v2.4
Steve Nyemba 1 month ago
parent fce888606c
commit 73fa9d90a9

@ -34,6 +34,8 @@ import time
from termcolor import colored from termcolor import colored
from enum import Enum from enum import Enum
from rich import print from rich import print
import plugin_ix as pix
app = typer.Typer() app = typer.Typer()
app_e = typer.Typer() #-- handles etl (run, generate) app_e = typer.Typer() #-- handles etl (run, generate)
@ -147,7 +149,7 @@ def initregistry (email:Annotated[str,typer.Argument(help="email")],
path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"), path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"),
override:bool=typer.Option(default=False,help="override existing configuration or not")): override:bool=typer.Option(default=False,help="override existing configuration or not")):
""" """
This functiion will initialize the registry and have both application and calling code loading the database parameters by a label This functiion will initialize the data-transport registry and have both application and calling code loading the database parameters by a label
""" """
try: try:
@ -179,42 +181,62 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be
pass pass
@app_x.command(name='add') @app_x.command(name='add')
def register_plugs ( def register_plugs (
alias:Annotated[str,typer.Argument(help="unique alias fo the file being registered")], alias:Annotated[str,typer.Argument(help="unique function name within a file")],
path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")] path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")],
folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder"),
): ):
""" """
This function will register a file and the functions within will be refrences <alias>.<function> in a configuration file This function will register a file and the functions within we are interested in using
""" """
transport.registry.plugins.init() if ',' in alias :
_log = transport.registry.plugins.add(alias,path) alias = [_name.strip() for _name in alias.split(',') if _name.strip() != '' ]
else:
alias = [alias.strip()]
_pregistry = pix.Registry(folder=folder,plugin_folder='plugins/code')
_log = _pregistry.set(path,alias)
# transport.registry.plugins.init()
# _log = transport.registry.plugins.add(alias,path)
_mark = TIMES_MARK if not _log else CHECK_MARK _mark = TIMES_MARK if not _log else CHECK_MARK
_msg = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {len(_log)} functions added""" _msg = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {_log} functions registered"""
print (f"""{_mark} {_msg}""") print (f"""{_mark} {_msg}""")
@app_x.command(name="list") @app_x.command(name="list")
def registry_list (): def registry_list (folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport configuration folder")):
"""
transport.registry.plugins.init() This function will list all the plugins (python functions/files) that are registered and can be reused
_d = [] """
for _alias in transport.registry.plugins._data : _pregistry = pix.Registry(folder=folder)
_data = transport.registry.plugins._data[_alias] _df = _pregistry.stats()
_d += [{'alias':_alias,"plugin-count":len(_data['content']),'e.g':'@'.join([_alias,_data['content'][0]]),'plugins':json.dumps(_data['content'])}] if _df.empty :
if _d: print (f"{TIMES_MARK} registry at {folder} is not ready")
print (pd.DataFrame(_d))
else: else:
print (f"""{TIMES_MARK}, Plugin registry is not available or needs initialization""") print (_df)
@app_x.command ("has")
def registry_has (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")) :
_pregistry = pix.Registry(folder=folder)
if _pregistry.has(alias) :
_msg = f"{CHECK_MARK} {alias} was [bold] found [/bold] in registry "
else:
_msg = f"{TIMES_MARK} {alias} was [bold] NOT found [/bold] in registry "
print (_msg)
@app_x.command(name="test") @app_x.command(name="test")
def registry_test (key): def registry_test (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder")) :
_pregistry = pix.Registry(folder=folder)
""" """
This function allows to test syntax for a plugin i.e in terms of alias@function This function allows to test syntax for a plugin i.e in terms of alias@function
""" """
_item = transport.registry.plugins.has(key=key) # _item = transport.registry.plugins.has(key=key)
if _item : _pointer = _pregistry.get(alias) if _pregistry.has(alias) else None
del _item['pointer']
print (f"""{CHECK_MARK} successfully loaded \033[1m{key}\033[0m found, version {_item['version']}""") if _pointer:
print (pd.DataFrame([_item])) print (f"""{CHECK_MARK} successfully loaded [bold] {alias}[/bold] found in {folder}""")
else: else:
print (f"{TIMES_MARK} unable to load \033[1m{key}\033[0m. Make sure it is registered") print (f"{TIMES_MARK} unable to load {alias}. Make sure it is registered")
app.add_typer(app_e,name='etl',help="This function will run etl or generate a template etl configuration file") app.add_typer(app_e,name='etl',help="This function will run etl or generate a template etl configuration file")
app.add_typer(app_r,name='registry',help='This function allows labeling database access information') app.add_typer(app_r,name='registry',help='This function allows labeling database access information')
app.add_typer(app_i,name="info",help="This function will print either license or supported database technologies") app.add_typer(app_i,name="info",help="This function will print either license or supported database technologies")

@ -19,6 +19,9 @@ import os
import sys import sys
import itertools import itertools
import json import json
import plugin_ix
class BaseIO : class BaseIO :
def __init__(self,**_args): def __init__(self,**_args):
@ -51,19 +54,19 @@ class IO(BaseIO):
# #
super().__init__(**_args) super().__init__(**_args)
_agent = _args['agent'] _agent = _args['agent']
plugins = _args['plugins'] plugins = _args['plugins'] if 'plugins' else None
# _logger = _args['logger'] if 'logger' in _args else None # _logger = _args['logger'] if 'logger' in _args else None
# self._logger = _logger if not type(_agent) in [IReader,IWriter] else _agent._logger #transport.get.writer(label='logger') #if registry.has('logger') else None # self._logger = _logger if not type(_agent) in [IReader,IWriter] else _agent._logger #transport.get.writer(label='logger') #if registry.has('logger') else None
# if not _logger and hasattr(_agent,'_logger') : # if not _logger and hasattr(_agent,'_logger') :
# self._logger = getattr(_agent,'_logger') # self._logger = getattr(_agent,'_logger')
self._agent = _agent self._agent = _agent
_date = _date = str(datetime.now()) _date = _date = str(datetime.now())
self._ixloader = plugin_ix.Loader ()
# self._logTable = 'logs' #'_'.join(['logs',_date[:10]+_date[11:19]]).replace(':','').replace('-','_') # self._logTable = 'logs' #'_'.join(['logs',_date[:10]+_date[11:19]]).replace(':','').replace('-','_')
if plugins : if plugins :
self._init_plugins(plugins) self.init_plugins(plugins)
else:
self._plugins = None
# def setLogger(self,_logger): # def setLogger(self,_logger):
# self._logger = _logger # self._logger = _logger
# def log (self,**_args): # def log (self,**_args):
@ -77,22 +80,22 @@ class IO(BaseIO):
# _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key]) # _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key])
# self._logger.write(pd.DataFrame([_data])) #,table=self._logTable) # self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
def _init_plugins(self,_items): # def _init_plugins(self,_items):
""" # """
This function will load pipelined functions as a plugin loader # This function will load pipelined functions as a plugin loader
""" # """
registry.plugins.init() # registry.plugins.init()
self._plugins = PluginLoader(registry=registry.plugins) # self._plugins = PluginLoader(registry=registry.plugins)
[self._plugins.set(_name) for _name in _items] # [self._plugins.set(_name) for _name in _items]
self.log(action='init-plugins',object=self.getClassName(self),input =[_name for _name in _items]) # self.log(action='init-plugins',object=self.getClassName(self),input =[_name for _name in _items])
# if 'path' in _args and 'names' in _args : # # if 'path' in _args and 'names' in _args :
# self._plugins = PluginLoader(**_args) # # self._plugins = PluginLoader(**_args)
# else: # # else:
# self._plugins = PluginLoader(registry=registry.plugins) # # self._plugins = PluginLoader(registry=registry.plugins)
# [self._plugins.set(_pointer) for _pointer in _args] # # [self._plugins.set(_pointer) for _pointer in _args]
# # #
# @TODO: We should have a way to log what plugins are loaded and ready to use # # @TODO: We should have a way to log what plugins are loaded and ready to use
def meta (self,**_args): def meta (self,**_args):
if hasattr(self._agent,'meta') : if hasattr(self._agent,'meta') :
return self._agent.meta(**_args) return self._agent.meta(**_args)
@ -120,6 +123,10 @@ class IO(BaseIO):
pointer = getattr(self._agent,_name) pointer = getattr(self._agent,_name)
return pointer(_query) return pointer(_query)
return None return None
def init_plugins(self,plugins):
for _ref in plugins :
self._ixloader.set(_ref)
class IReader(IO): class IReader(IO):
""" """
This is a wrapper for read functionalities This is a wrapper for read functionalities
@ -133,7 +140,8 @@ class IReader(IO):
for _segment in _data : for _segment in _data :
_shape += list(_segment.shape) _shape += list(_segment.shape)
if self._plugins : if self._plugins :
yield self._plugins.apply(_segment,self.log) # yield self._plugins.apply(_segment,self.log)
yield self._ixloader.visitor(_data,self.log)
else: else:
yield _segment yield _segment
_objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__]) _objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__])
@ -146,8 +154,8 @@ class IReader(IO):
def read(self,**_args): def read(self,**_args):
if 'plugins' in _args : if 'plugins' in _args :
self.init_plugins(_args['plugins'])
self._init_plugins(_args['plugins'])
if self._args : if self._args :
_data = self._agent.read(**self._args) _data = self._agent.read(**self._args)
else: else:
@ -172,9 +180,7 @@ class IReader(IO):
_input['table'] = self._agent._table _input['table'] = self._agent._table
self.log(action='read',object=_objectName, input=_input) self.log(action='read',object=_objectName, input=_input)
if self._plugins : _data = self._ixloader.visitor(_data)
_logs = []
_data = self._plugins.apply(_data,self.log)
return _data return _data
class IWriter(IO): class IWriter(IO):
@ -184,13 +190,14 @@ class IWriter(IO):
def write(self,_data,**_args): def write(self,_data,**_args):
if 'plugins' in _args : if 'plugins' in _args :
self._init_plugins(_args['plugins']) self._init_plugins(_args['plugins'])
if self._plugins and self._plugins.ratio() > 0 : # if self._plugins and self._plugins.ratio() > 0 :
_logs = [] # _logs = []
_data = self._plugins.apply(_data,_logs,self.log) # _data = self._plugins.apply(_data,_logs,self.log)
# [self.log(**_item) for _item in _logs] # [self.log(**_item) for _item in _logs]
try: try:
# IWriter.lock.acquire() # IWriter.lock.acquire()
_data = self._ixloader.visitor(_data)
self._agent.write(_data,**_args) self._agent.write(_data,**_args)
finally: finally:
# IWriter.lock.release() # IWriter.lock.release()

@ -21,161 +21,161 @@ if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH'] REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH']
REGISTRY_FILE= 'transport-registry.json' REGISTRY_FILE= 'transport-registry.json'
DATA = {} DATA = {}
class plugins: # class plugins:
# # #
# This is a utility function that should enable management of plugins-registry # # This is a utility function that should enable management of plugins-registry
# The class allows to add/remove elements # # The class allows to add/remove elements
# # #
# @TODO: add read/write properties to the class (better design practice) # # @TODO: add read/write properties to the class (better design practice)
# # #
_data = {} # _data = {}
FOLDER = os.sep.join([REGISTRY_PATH,'plugins']) # FOLDER = os.sep.join([REGISTRY_PATH,'plugins'])
CODE = os.sep.join([REGISTRY_PATH,'plugins','code']) # CODE = os.sep.join([REGISTRY_PATH,'plugins','code'])
FILE = os.sep.join([REGISTRY_PATH,'plugin-registry.json']) # FILE = os.sep.join([REGISTRY_PATH,'plugin-registry.json'])
@staticmethod # @staticmethod
def init(): # def init():
if not os.path.exists(plugins.FOLDER) : # if not os.path.exists(plugins.FOLDER) :
os.makedirs(plugins.FOLDER) # os.makedirs(plugins.FOLDER)
if not os.path.exists(plugins.CODE): # if not os.path.exists(plugins.CODE):
os.makedirs(plugins.CODE) # os.makedirs(plugins.CODE)
if not os.path.exists(plugins.FILE): # if not os.path.exists(plugins.FILE):
f = open(plugins.FILE,'w') # f = open(plugins.FILE,'w')
f.write("{}") # f.write("{}")
f.close() # f.close()
plugins._read() #-- will load data as a side effect # plugins._read() #-- will load data as a side effect
@staticmethod # @staticmethod
def copy (path) : # def copy (path) :
shutil.copy2(path,plugins.CODE) # shutil.copy2(path,plugins.CODE)
@staticmethod # @staticmethod
def _read (): # def _read ():
f = open(plugins.FILE) # f = open(plugins.FILE)
try: # try:
_data = json.loads(f.read()) # _data = json.loads(f.read())
f.close() # f.close()
except Exception as e: # except Exception as e:
print (f"Corrupted registry, resetting ...") # print (f"Corrupted registry, resetting ...")
_data = {} # _data = {}
plugins._write(_data) # plugins._write(_data)
plugins._data = _data # plugins._data = _data
@staticmethod # @staticmethod
def _write (_data): # def _write (_data):
f = open(plugins.FILE,'w') # f = open(plugins.FILE,'w')
f.write(json.dumps(_data)) # f.write(json.dumps(_data))
f.close() # f.close()
plugins._data = _data # plugins._data = _data
@staticmethod # @staticmethod
def inspect (_path): # def inspect (_path):
_names = [] # _names = []
if os.path.exists(_path) : # if os.path.exists(_path) :
_filename = _path.split(os.sep)[-1] # _filename = _path.split(os.sep)[-1]
spec = importlib.util.spec_from_file_location(_filename, _path) # spec = importlib.util.spec_from_file_location(_filename, _path)
module = importlib.util.module_from_spec(spec) # module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) # spec.loader.exec_module(module)
# _names = [{'name':getattr(getattr(module,_name),'name'),'pointer':getattr(module,_name)} for _name in dir(module) if type( getattr(module,_name)).__name__ == 'function'] # # _names = [{'name':getattr(getattr(module,_name),'name'),'pointer':getattr(module,_name)} for _name in dir(module) if type( getattr(module,_name)).__name__ == 'function']
for _name in dir(module) : # for _name in dir(module) :
_pointer = getattr(module,_name) # _pointer = getattr(module,_name)
if hasattr(_pointer,'transport') : # if hasattr(_pointer,'transport') :
_item = {'real_name':_name,'name':getattr(_pointer,'name'),'pointer':_pointer,'version':getattr(_pointer,'version')} # _item = {'real_name':_name,'name':getattr(_pointer,'name'),'pointer':_pointer,'version':getattr(_pointer,'version')}
_names.append(_item) # _names.append(_item)
return _names # return _names
@staticmethod # @staticmethod
def add (alias,path): # def add (alias,path):
""" # """
Add overwrite the registry entries # Add overwrite the registry entries
""" # """
_names = plugins.inspect (path) # _names = plugins.inspect (path)
_log = [] # _log = []
if _names : # if _names :
# # #
# We should make sure we have all the plugins with the attributes (transport,name) set # # We should make sure we have all the plugins with the attributes (transport,name) set
_names = [_item for _item in _names if hasattr(_item['pointer'],'transport') ] # _names = [_item for _item in _names if hasattr(_item['pointer'],'transport') ]
if _names : # if _names :
plugins.copy(path) # plugins.copy(path)
_content = [] # _content = []
for _item in _names : # for _item in _names :
_key = '@'.join([alias,_item['name']]) # _key = '@'.join([alias,_item['name']])
_log.append(_item['name']) # _log.append(_item['name'])
# # #
# Let us update the registry # # Let us update the registry
# # #
plugins.update(alias,path,_log) # plugins.update(alias,path,_log)
return _log # return _log
@staticmethod # @staticmethod
def update (alias,path,_log) : # def update (alias,path,_log) :
""" # """
updating the registry entries of the plugins (management data) # updating the registry entries of the plugins (management data)
""" # """
# f = open(plugins.FILE) # # f = open(plugins.FILE)
# _data = json.loads(f.read()) # # _data = json.loads(f.read())
# f.close() # # f.close()
_data = plugins._data # _data = plugins._data
# _log = plugins.add(alias,path) # # _log = plugins.add(alias,path)
if _log : # if _log :
_data[alias] = {'content':_log,'name':path.split(os.sep)[-1]} # _data[alias] = {'content':_log,'name':path.split(os.sep)[-1]}
plugins._write(_data) #-- will update data as a side effect # plugins._write(_data) #-- will update data as a side effect
return _log # return _log
@staticmethod # @staticmethod
def get(**_args) : # def get(**_args) :
# f = open(plugins.FILE) # # f = open(plugins.FILE)
# _data = json.loads(f.read()) # # _data = json.loads(f.read())
# f.close() # # f.close()
# if 'key' in _args : # # if 'key' in _args :
# alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@') # # alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
# else : # # else :
# alias = _args['alias'] # # alias = _args['alias']
# name = _args['name'] # # name = _args['name']
# if alias in _data : # # if alias in _data :
# _path = os.sep.join([plugins.CODE,_data[alias]['name']]) # # _path = os.sep.join([plugins.CODE,_data[alias]['name']])
# _item = [_item for _item in plugins.inspect(_path) if name == _item['name']] # # _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
# _item = _item[0] if _item else None # # _item = _item[0] if _item else None
# if _item : # # if _item :
# return _item['pointer'] # # return _item['pointer']
# return None # # return None
_item = plugins.has(**_args) # _item = plugins.has(**_args)
return _item['pointer'] if _item else None # return _item['pointer'] if _item else None
@staticmethod # @staticmethod
def has (**_args): # def has (**_args):
f = open(plugins.FILE) # f = open(plugins.FILE)
_data = json.loads(f.read()) # _data = json.loads(f.read())
f.close() # f.close()
if 'key' in _args : # if 'key' in _args :
alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@') # alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@')
else : # else :
alias = _args['alias'] # alias = _args['alias']
name = _args['name'] # name = _args['name']
if alias in _data : # if alias in _data :
_path = os.sep.join([plugins.CODE,_data[alias]['name']]) # _path = os.sep.join([plugins.CODE,_data[alias]['name']])
_item = [_item for _item in plugins.inspect(_path) if name == _item['name']] # _item = [_item for _item in plugins.inspect(_path) if name == _item['name']]
_item = _item[0] if _item else None # _item = _item[0] if _item else None
if _item : # if _item :
return copy.copy(_item) # return copy.copy(_item)
return None # return None
@staticmethod # @staticmethod
def synch(): # def synch():
pass # pass
def isloaded (): def isloaded ():
return DATA not in [{},None] return DATA not in [{},None]

Loading…
Cancel
Save