parent
1efa722337
commit
669e0f2adc
@ -1,92 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
__doc__ = """
|
||||
|
||||
|
||||
"""
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from multiprocessing import Process
|
||||
|
||||
import os
|
||||
import typer
|
||||
from typing_extensions import Annotated
|
||||
from typing import Optional
|
||||
import time
|
||||
from termcolor import colored
|
||||
from enum import Enum
|
||||
from rich import print
|
||||
# from rich.console import Console
|
||||
from rich.table import Table
|
||||
import plugins
|
||||
|
||||
app = typer.Typer()
|
||||
# app_e = typer.Typer() #-- handles etl (run, generate)
|
||||
# app_x = typer.Typer() #-- handles plugins (list,add, test)
|
||||
# app_i = typer.Typer() #-- handles information (version, license)
|
||||
appr = typer.Typer() #-- handles registry
|
||||
# REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
|
||||
# REGISTRY_FILE= 'transport-registry.json'
|
||||
CHECK_MARK = '[ [green]\u2713[/green] ]' #' '.join(['[',colored(u'\u2713', 'green'),']'])
|
||||
TIMES_MARK= '[ [red]\u2717[/red] ]' #' '.join(['[',colored(u'\u2717','red'),']'])
|
||||
# @app.command()
|
||||
@app.command(name="plugin")
|
||||
def inspect (file: Annotated[str,typer.Argument(help="python file that contains functions look into")],
|
||||
decorator:str=typer.Option(default=None,help="decorator attribute name (if any) ") ) :
|
||||
"""
|
||||
This function allows plugin management / testing
|
||||
"""
|
||||
loader = plugins.Loader()
|
||||
if loader.load(file=file,decorator= decorator) :
|
||||
n = len(loader._modules.keys())
|
||||
print (f"""{CHECK_MARK} Found {n} functions in [bold]{file}[/bold]""")
|
||||
else:
|
||||
_msg = f"""{TIMES_MARK} Invalid python file {file}"""
|
||||
print (_msg)
|
||||
@appr.command(name="add")
|
||||
def add_registry(
|
||||
registry_folder: Annotated[str,typer.Argument(help="registry folder")],
|
||||
python_file: Annotated[str,typer.Argument(help="python file that contains functions to be used as plugins")]):
|
||||
"""
|
||||
This function will add/override a file to the registry
|
||||
"""
|
||||
# reg = plugins.Registry(rg_file)
|
||||
loader = plugins.Loader(file=python_file)
|
||||
if loader.get() :
|
||||
_names = list(loader.get().keys())
|
||||
reg = plugins.Registry(registry_folder)
|
||||
reg.set(python_file,_names)
|
||||
print (f"""{CHECK_MARK} Import was [bold]successful[/bold] into {registry_folder}""")
|
||||
else:
|
||||
print (f"""{TIMES_MARK} Import [bold]Failed[/bold] into {registry_folder}""")
|
||||
|
||||
def to_Table(df: pd.DataFrame):
|
||||
"""Displays a Pandas DataFrame as a rich table."""
|
||||
table = Table(show_header=True, header_style="bold magenta")
|
||||
|
||||
for col in df.columns:
|
||||
table.add_column(col)
|
||||
|
||||
for _, row in df.iterrows():
|
||||
table.add_row(*row.astype(str).tolist())
|
||||
|
||||
# console.print(table)
|
||||
return table
|
||||
@appr.command(name="list")
|
||||
def list_registry(folder: Annotated[str,typer.Argument(help="registry folder where")]) :
|
||||
"""
|
||||
This function will summarize the registry in a table
|
||||
"""
|
||||
reg = plugins.Registry(folder)
|
||||
|
||||
print (to_Table(reg.stats()))
|
||||
|
||||
|
||||
def exe(file,name,_args):
|
||||
pass
|
||||
|
||||
app.add_typer(appr,name='registry',help='This enables registry management')
|
||||
from plugins import cli
|
||||
if __name__ == '__main__' :
|
||||
app()
|
||||
cli.app()
|
||||
|
@ -1,186 +1,9 @@
|
||||
"""
|
||||
implementing a plugin loader i.e can load a function from a file given parameters
|
||||
"""
|
||||
import importlib as IL
|
||||
import importlib.util
|
||||
import os
|
||||
import json
|
||||
import shutil
|
||||
import pandas as pd
|
||||
|
||||
class Loader :
|
||||
"""
|
||||
This class is intended to load a plugin and make it available and assess the quality of the developed plugin
|
||||
"""
|
||||
|
||||
def __init__(self,**_args):
|
||||
"""
|
||||
"""
|
||||
# _names = _args['names'] if 'names' in _args else None
|
||||
# path = _args['path'] if 'path' in _args else None
|
||||
# self._names = _names if type(_names) == list else [_names]
|
||||
self._modules = {}
|
||||
self._names = []
|
||||
if 'file' in _args :
|
||||
self.load(**_args)
|
||||
# self._registry = _args['registry']
|
||||
|
||||
def load (self,**_args):
|
||||
"""
|
||||
This function loads a plugin from a given location
|
||||
:file location of the file
|
||||
"""
|
||||
self._modules = {}
|
||||
self._names = []
|
||||
path = _args ['file']
|
||||
_decoratorName = None if 'decorator' not in _args else _args['decorator']
|
||||
|
||||
if os.path.exists(path) :
|
||||
_alias = path.split(os.sep)[-1]
|
||||
spec = importlib.util.spec_from_file_location(_alias, path)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module) #--loads it into sys.modules
|
||||
for _name in dir(module) :
|
||||
if self.isplugin(module,_name,_decoratorName) :
|
||||
self._modules[_name] = getattr(module,_name)
|
||||
return self._modules is not None
|
||||
# self._names [_name]
|
||||
# def format (self,**_args):
|
||||
# uri = _args['alias'],_args['name']
|
||||
# def set(self,_pointer) :
|
||||
def set(self,_key) :
|
||||
"""
|
||||
This function will set a pointer to the list of modules to be called
|
||||
This should be used within the context of using the framework as a library
|
||||
"""
|
||||
if type(_key).__name__ == 'function':
|
||||
#
|
||||
# The pointer is in the code provided by the user and loaded in memory
|
||||
#
|
||||
_pointer = _key
|
||||
_key = 'inline@'+_key.__name__
|
||||
# self._names.append(_key.__name__)
|
||||
else:
|
||||
_pointer = self._registry.get(key=_key)
|
||||
|
||||
if _pointer :
|
||||
self._modules[_key] = _pointer
|
||||
self._names.append(_key)
|
||||
|
||||
def isplugin(self,module,name,attr=None):
|
||||
"""
|
||||
This function determines if a module is a recognized plugin
|
||||
:module module object loaded from importlib
|
||||
:name name of the functiion of interest
|
||||
:attr decorator attribute name (if any)
|
||||
"""
|
||||
|
||||
p = type(getattr(module,name)).__name__ =='function'
|
||||
q = True if not attr else hasattr(getattr(module,name),attr)
|
||||
#
|
||||
# @TODO: add a generated key, and more indepth validation
|
||||
return p and q
|
||||
|
||||
def has(self,_name):
|
||||
"""
|
||||
This will determine if the module name is loaded or not
|
||||
"""
|
||||
return _name in self._modules
|
||||
def names (self):
|
||||
return list(self._modules.keys())
|
||||
def get (self,_name=None):
|
||||
"""
|
||||
This functiion determines how many modules loaded vs unloaded given the list of names
|
||||
"""
|
||||
return self._modules.get(_name,None) if _name else self._modules
|
||||
def apply(self,_name,**_args):
|
||||
_pointer = self.get(_name)
|
||||
if _pointer :
|
||||
return _pointer (**_args) if _args else _pointer()
|
||||
|
||||
#
|
||||
# we should have a way to register these functions using rudimentary means
|
||||
#
|
||||
|
||||
class Registry :
|
||||
def __init__(self,folder,reader = None) :
|
||||
"""
|
||||
|
||||
"""
|
||||
self._folder = folder
|
||||
self._filename = os.sep.join([folder,'plugins-registry.json'])
|
||||
# self._context = self._folder.split(os.sep)[-1]
|
||||
self._reader = reader
|
||||
self._data = {}
|
||||
self.make(self._folder) #-- making the folder just in case we need to
|
||||
# self.make(os.sep.join([self._folder,'code']))
|
||||
self.load()
|
||||
|
||||
|
||||
def set(self,filename,names) :
|
||||
"""
|
||||
:filename this is the python file
|
||||
:names names of the functions within the file
|
||||
"""
|
||||
if os.path.exists(filename) and names:
|
||||
_file = filename.split(os.sep)[-1].split('.')[0]
|
||||
_newlocation = os.sep.join([self._folder,'code',filename.split(os.sep)[-1]])
|
||||
self._data[_file] = {"content":names,"path":_newlocation}
|
||||
#
|
||||
# we need to copy the file to the new location
|
||||
#
|
||||
shutil.copyfile(filename, _newlocation)
|
||||
self.write()
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
def stats (self):
|
||||
return pd.DataFrame([{'file':_key,'count': len(self._data[_key]['content']),'example':'@'.join([self._data[_key]['content'][0],_key]),'functions':json.dumps(self._data[_key]['content'])} for _key in self._data])
|
||||
def make (self,_folder):
|
||||
"""
|
||||
make registry folder
|
||||
"""
|
||||
|
||||
# _folder = self._folder if not _folder else _folder
|
||||
_codepath = os.sep.join([self._folder,'code'])
|
||||
if not os.path.exists(_folder) :
|
||||
os.makedirs(self._folder)
|
||||
self.write()
|
||||
if not os.path.exists(_codepath):
|
||||
os.makedirs(_codepath)
|
||||
|
||||
#
|
||||
# adding
|
||||
def load (self):
|
||||
if os.path.exists(self._filename) :
|
||||
f = open(self._filename) #if _filename else open(_filename)
|
||||
#_context = self._context if not _context else _context
|
||||
try:
|
||||
|
||||
self._data = json.loads(f.read())
|
||||
except Exception as e:
|
||||
pass
|
||||
finally:
|
||||
f.close()
|
||||
def has (self,_key):
|
||||
"""
|
||||
_key can be formatted as _name@file with
|
||||
"""
|
||||
if '@' in _key :
|
||||
_name,_file = _key.split('@')
|
||||
else:
|
||||
_name = _key
|
||||
_file = None
|
||||
if len(self._data.keys()) == 1 :
|
||||
_file = list(self._data.keys())[0]
|
||||
if _file in self._data :
|
||||
return _name in self._data[_file]['content']
|
||||
return False
|
||||
|
||||
def write (self):
|
||||
#
|
||||
# will only write the main
|
||||
f = open(self._filename,'w+')
|
||||
f.write(json.dumps(self._data))
|
||||
f.close()
|
||||
pass
|
||||
from . import cli
|
||||
from .loader import Loader
|
||||
from .registry import Registry
|
||||
#from plugins import cli
|
||||
#from plugins.loader import Loader
|
||||
#from plugins.registry import Registry
|
||||
|
@ -0,0 +1,102 @@
|
||||
#!/usr/bin/env python
|
||||
__doc__ = """
|
||||
|
||||
|
||||
"""
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from multiprocessing import Process
|
||||
|
||||
import os
|
||||
import typer
|
||||
from typing_extensions import Annotated
|
||||
from typing import Optional
|
||||
import time
|
||||
from termcolor import colored
|
||||
from enum import Enum
|
||||
from rich import print
|
||||
# from rich.console import Console
|
||||
from rich.table import Table
|
||||
import plugins
|
||||
app = typer.Typer()
|
||||
# app_e = typer.Typer() #-- handles etl (run, generate)
|
||||
# app_x = typer.Typer() #-- handles plugins (list,add, test)
|
||||
# app_i = typer.Typer() #-- handles information (version, license)
|
||||
appr = typer.Typer() #-- handles registry
|
||||
# REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
|
||||
# REGISTRY_FILE= 'transport-registry.json'
|
||||
CHECK_MARK = '[ [green]\u2713[/green] ]' #' '.join(['[',colored(u'\u2713', 'green'),']'])
|
||||
TIMES_MARK= '[ [red]\u2717[/red] ]' #' '.join(['[',colored(u'\u2717','red'),']'])
|
||||
# @app.command()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@app.command(name="plugin")
|
||||
def inspect (file: Annotated[str,typer.Argument(help="python file that contains functions look into")],
|
||||
decorator:str=typer.Option(default=None,help="decorator attribute name (if any) ") ) :
|
||||
"""
|
||||
This function allows plugin management / testing
|
||||
"""
|
||||
loader = plugins.Loader()
|
||||
if loader.load(file=file,decorator= decorator) :
|
||||
n = len(loader._modules.keys())
|
||||
print (f"""{CHECK_MARK} Found {n} functions in [bold]{file}[/bold]""")
|
||||
else:
|
||||
_msg = f"""{TIMES_MARK} Invalid python file {file}"""
|
||||
print (_msg)
|
||||
@appr.command(name="add")
|
||||
def add_registry(
|
||||
registry_folder: Annotated[str,typer.Argument(help="registry folder")],
|
||||
python_file: Annotated[str,typer.Argument(help="python file that contains functions to be used as plugins")]):
|
||||
"""
|
||||
This function will add/override a file to the registry
|
||||
"""
|
||||
# reg = plugins.Registry(rg_file)
|
||||
loader = plugins.Loader(file=python_file)
|
||||
if loader.get() :
|
||||
_names = list(loader.get().keys())
|
||||
reg = plugins.Registry(registry_folder)
|
||||
reg.set(python_file,_names)
|
||||
print (f"""{CHECK_MARK} Import was [bold]successful[/bold] into {registry_folder}""")
|
||||
else:
|
||||
print (f"""{TIMES_MARK} Import [bold]Failed[/bold] into {registry_folder}""")
|
||||
|
||||
def to_Table(df: pd.DataFrame):
|
||||
"""Displays a Pandas DataFrame as a rich table."""
|
||||
table = Table(show_header=True, header_style="bold magenta")
|
||||
|
||||
for col in df.columns:
|
||||
table.add_column(col)
|
||||
|
||||
for _, row in df.iterrows():
|
||||
table.add_row(*row.astype(str).tolist())
|
||||
|
||||
# console.print(table)
|
||||
return table
|
||||
@appr.command(name="list")
|
||||
def list_registry(
|
||||
folder:str=typer.Option(default=os.environ.get('REGISTRY_FOLDER',None),help="path of the plugin registry folder")):
|
||||
#folder: Annotated[str,typer.Argument(help="registry folder where")]=plugins.REGISTRY_PATH) :
|
||||
"""
|
||||
This function will summarize the registry in a table
|
||||
"""
|
||||
try:
|
||||
reg = plugins.Registry(folder)
|
||||
|
||||
print (to_Table(reg.stats()))
|
||||
except Exception as e :
|
||||
print (e)
|
||||
print (f"""{TIMES_MARK} Please provide registry folder or set environment REGISTRY_FOLDER """)
|
||||
|
||||
def exe(file,name,_args):
|
||||
pass
|
||||
|
||||
app.add_typer(appr,name='registry',help='This enables registry management')
|
||||
if __name__ == '__main__' :
|
||||
|
||||
app()
|
@ -0,0 +1,98 @@
|
||||
"""
|
||||
implementing a plugin loader i.e can load a function from a file given parameters
|
||||
"""
|
||||
import importlib as IL
|
||||
import importlib.util
|
||||
import os
|
||||
import json
|
||||
import shutil
|
||||
import pandas as pd
|
||||
|
||||
#
|
||||
# we should have a way to register these functions using rudimentary means
|
||||
#
|
||||
|
||||
REGISTRY_PATH=None
|
||||
class Registry :
|
||||
|
||||
def __init__(self,folder=None,reader = None) :
|
||||
"""
|
||||
|
||||
"""
|
||||
self._folder = folder if folder else os.environ.get('REGISTRY_FOLDER',None)
|
||||
self._filename = os.sep.join([folder,'plugins-registry.json'])
|
||||
# self._context = self._folder.split(os.sep)[-1]
|
||||
self._reader = reader
|
||||
self._data = {}
|
||||
self.make(self._folder) #-- making the folder just in case we need to
|
||||
# self.make(os.sep.join([self._folder,'code']))
|
||||
self.load()
|
||||
|
||||
|
||||
def set(self,filename,names) :
|
||||
"""
|
||||
:filename this is the python file
|
||||
:names names of the functions within the file
|
||||
"""
|
||||
if os.path.exists(filename) and names:
|
||||
_file = filename.split(os.sep)[-1].split('.')[0]
|
||||
_newlocation = os.sep.join([self._folder,'code',filename.split(os.sep)[-1]])
|
||||
self._data[_file] = {"content":names,"path":_newlocation}
|
||||
#
|
||||
# we need to copy the file to the new location
|
||||
#
|
||||
shutil.copyfile(filename, _newlocation)
|
||||
self.write()
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
def stats (self):
|
||||
return pd.DataFrame([{'file':_key,'count': len(self._data[_key]['content']),'example':'@'.join([self._data[_key]['content'][0],_key]),'functions':json.dumps(self._data[_key]['content'])} for _key in self._data])
|
||||
def make (self,_folder):
|
||||
"""
|
||||
make registry folder
|
||||
"""
|
||||
|
||||
# _folder = self._folder if not _folder else _folder
|
||||
_codepath = os.sep.join([self._folder,'code'])
|
||||
if not os.path.exists(_folder) :
|
||||
os.makedirs(self._folder)
|
||||
self.write()
|
||||
if not os.path.exists(_codepath):
|
||||
os.makedirs(_codepath)
|
||||
|
||||
#
|
||||
# adding
|
||||
def load (self):
|
||||
if os.path.exists(self._filename) :
|
||||
f = open(self._filename) #if _filename else open(_filename)
|
||||
#_context = self._context if not _context else _context
|
||||
try:
|
||||
|
||||
self._data = json.loads(f.read())
|
||||
except Exception as e:
|
||||
pass
|
||||
finally:
|
||||
f.close()
|
||||
def has (self,_key):
|
||||
"""
|
||||
_key can be formatted as _name@file with
|
||||
"""
|
||||
if '@' in _key :
|
||||
_name,_file = _key.split('@')
|
||||
else:
|
||||
_name = _key
|
||||
_file = None
|
||||
if len(self._data.keys()) == 1 :
|
||||
_file = list(self._data.keys())[0]
|
||||
if _file in self._data :
|
||||
return _name in self._data[_file]['content']
|
||||
return False
|
||||
|
||||
def write (self):
|
||||
#
|
||||
# will only write the main
|
||||
f = open(self._filename,'w+')
|
||||
f.write(json.dumps(self._data))
|
||||
f.close()
|
||||
pass
|
Loading…
Reference in new issue