From 8904c7184ae4426871b1ce526780a3e915cf8291 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 29 Oct 2024 03:11:00 -0500 Subject: [PATCH] warehouse support, plugin registry and streaming --- bin/transport | 46 ++++++- setup.py | 2 +- transport/__init__.py | 3 +- transport/iowrapper.py | 51 +++++-- transport/other/files.py | 10 +- transport/plugins/__init__.py | 98 +++++++++----- transport/providers/__init__.py | 2 +- transport/registry.py | 172 +++++++++++++++++++++++- transport/sql/__init__.py | 4 +- transport/sql/{sqlite.py => sqlite3.py} | 6 +- transport/warehouse/drill.py | 54 ++++++++ 11 files changed, 380 insertions(+), 68 deletions(-) rename transport/sql/{sqlite.py => sqlite3.py} (83%) create mode 100644 transport/warehouse/drill.py diff --git a/bin/transport b/bin/transport index 39c362c..371d579 100755 --- a/bin/transport +++ b/bin/transport @@ -136,8 +136,10 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be default:bool=typer.Option(default=False,help="set the auth_file as default"), path:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")): """ - This function will register an auth-file i.e database connection and assign it a label, + This function create a registery of either: + an auth-file entries given an auth-file i.e database connection and assign it a label, Learn more about auth-file at https://healthcareio.the-phi.com/data-transport + """ try: if transport.registry.exists(path) : @@ -148,8 +150,46 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be except Exception as e: _msg = f"""{TIMES_MARK} {e}""" print (_msg) - - pass + +@app.command(name='plugin-add') +def register_plugs ( + alias:Annotated[str,typer.Argument(help="unique alias fo the file being registered")], + path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")] + ): + """ + This function will register a file and the functions within will be refrences . in a configuration file + """ + transport.registry.plugins.init() + _log = transport.registry.plugins.add(alias,path) + _mark = TIMES_MARK if not _log else CHECK_MARK + _msg = f"""Could NOT add the \033[1m{alias}\033[0m to the registry""" if not _log else f""" successfully added {alias}, {len(_log)} functions added""" + print (f"""{_mark} {_msg}""") +@app.command(name="plugin-list") +def registry_list (): + + transport.registry.plugins.init() + _d = [] + for _alias in transport.registry.plugins._data : + _data = transport.registry.plugins._data[_alias] + _d += [{'alias':_alias,"plugin-count":len(_data['content']),'e.g':'@'.join([_alias,_data['content'][0]]),'plugins':json.dumps(_data['content'])}] + if _d: + print (pd.DataFrame(_d)) + else: + print (f"""{TIMES_MARK}, Plugin registry is not available or needs initialization""") + +@app.command(name="plugin-test") +def registry_test (key): + """ + This function allows to test syntax for a plugin i.e in terms of alis@function + """ + _item = transport.registry.plugins.has(key=key) + if _item : + del _item['pointer'] + print (f"""{CHECK_MARK} successfully loaded \033[1m{key}\033[0m found, version {_item['version']}""") + print (pd.DataFrame([_item])) + else: + print (f"{TIMES_MARK} unable to load \033[1m{key}\033[0m. Make sure it is registered") + if __name__ == '__main__' : app() diff --git a/setup.py b/setup.py index dcb52a0..c7ae339 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ args = { "packages": find_packages(include=['info','transport', 'transport.*'])} args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite'] -args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql','pyspark'] +args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql','pyspark','pydrill'] args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args['scripts'] = ['bin/transport'] # if sys.version_info[0] == 2 : diff --git a/transport/__init__.py b/transport/__init__.py index 8ab354a..c07a1df 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -28,7 +28,7 @@ from transport.plugins import PluginLoader from transport import providers import copy from transport import registry - +from transport.plugins import Plugin PROVIDERS = {} def init(): @@ -120,6 +120,7 @@ def instance (**_args): # loader.set(_delegate) loader = None if 'plugins' not in _args else _args['plugins'] + return IReader(_agent,loader) if _context == 'read' else IWriter(_agent,loader) else: diff --git a/transport/iowrapper.py b/transport/iowrapper.py index e78fca9..d5ca23e 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -5,11 +5,13 @@ NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading o - upon initialization we will load plugins - on read/write we apply a pipeline (if passed as an argument) """ -from transport.plugins import plugin, PluginLoader +from transport.plugins import PluginLoader import transport from transport import providers from multiprocessing import Process import time +import types +from . import registry class IO: @@ -18,20 +20,24 @@ class IO: """ def __init__(self,_agent,plugins): self._agent = _agent + if plugins : self._init_plugins(plugins) else: self._plugins = None - def _init_plugins(self,_args): + def _init_plugins(self,_items): """ This function will load pipelined functions as a plugin loader """ - if 'path' in _args and 'names' in _args : - self._plugins = PluginLoader(**_args) - else: - self._plugins = PluginLoader() - [self._plugins.set(_pointer) for _pointer in _args] + registry.plugins.init() + self._plugins = PluginLoader(registry=registry.plugins) + [self._plugins.set(_name) for _name in _items] + # if 'path' in _args and 'names' in _args : + # self._plugins = PluginLoader(**_args) + # else: + # self._plugins = PluginLoader(registry=registry.plugins) + # [self._plugins.set(_pointer) for _pointer in _args] # # @TODO: We should have a way to log what plugins are loaded and ready to use def meta (self,**_args): @@ -65,15 +71,26 @@ class IReader(IO): """ def __init__(self,_agent,pipeline=None): super().__init__(_agent,pipeline) + + def _stream (self,_data ): + for _segment in _data : + + yield self._plugins.apply(_segment) def read(self,**_args): if 'plugins' in _args : self._init_plugins(_args['plugins']) _data = self._agent.read(**_args) + if self._plugins and self._plugins.ratio() > 0 : - _data = self._plugins.apply(_data) - # - # output data - return _data + + if types.GeneratorType == type(_data): + + return self._stream(_data) + else: + _data = self._plugins.apply(_data) + return _data + else: + return _data class IWriter(IO): def __init__(self,_agent,pipeline=None): super().__init__(_agent,pipeline) @@ -95,6 +112,7 @@ class IETL(IReader) : """ def __init__(self,**_args): super().__init__(transport.get.reader(**_args['source'])) + if 'target' in _args: self._targets = _args['target'] if type(_args['target']) == list else [_args['target']] else: @@ -113,11 +131,18 @@ class IETL(IReader) : def read(self,**_args): _data = super().read(**_args) + if types.GeneratorType == type(_data): + for _segment in _data : + for _kwars in self._targets : + self.post(_segment,**_kwargs) + else: + - for _kwargs in self._targets : - self.post(_data,**_kwargs) + for _kwargs in self._targets : + self.post(_data,**_kwargs) return _data + # return _data def post (self,_data,**_args) : """ This function returns an instance of a process that will perform the write operation diff --git a/transport/other/files.py b/transport/other/files.py index 62ee3c4..e0cc2af 100644 --- a/transport/other/files.py +++ b/transport/other/files.py @@ -12,7 +12,7 @@ class File : """ self.path = params['path'] if 'path' in params else None self.delimiter = params['delimiter'] if 'delimiter' in params else ',' - + self._chunksize = None if 'chunksize' not in params else int(params['chunksize']) def isready(self): return os.path.exists(self.path) def meta(self,**_args): @@ -26,11 +26,15 @@ class Reader (File): def __init__(self,**_args): super().__init__(**_args) - + def _stream(self,path) : + reader = pd.read_csv(path,delimiter=self.delimiter,chunksize=self._chunksize) + for segment in reader : + yield segment def read(self,**args): _path = self.path if 'path' not in args else args['path'] _delimiter = self.delimiter if 'delimiter' not in args else args['delimiter'] - return pd.read_csv(_path,delimiter=self.delimiter) + + return pd.read_csv(_path,delimiter=self.delimiter) if not self._chunksize else self._stream(_path) def stream(self,**args): raise Exception ("streaming needs to be implemented") class Writer (File): diff --git a/transport/plugins/__init__.py b/transport/plugins/__init__.py index 26e5782..34d88d0 100644 --- a/transport/plugins/__init__.py +++ b/transport/plugins/__init__.py @@ -12,7 +12,7 @@ import importlib.util import sys import os -class plugin : +class Plugin : """ Implementing function decorator for data-transport plugins (post-pre)-processing """ @@ -22,8 +22,9 @@ class plugin : :mode restrict to reader/writer :about tell what the function is about """ - self._name = _args['name'] - self._about = _args['about'] + self._name = _args['name'] if 'name' in _args else None + self._version = _args['version'] if 'version' in _args else '0.1' + self._doc = _args['doc'] if 'doc' in _args else "N/A" self._mode = _args['mode'] if 'mode' in _args else 'rw' def __call__(self,pointer,**kwargs): def wrapper(_args,**kwargs): @@ -32,57 +33,86 @@ class plugin : # @TODO: # add attributes to the wrapper object # + self._name = pointer.__name__ if not self._name else self._name setattr(wrapper,'transport',True) setattr(wrapper,'name',self._name) - setattr(wrapper,'mode',self._mode) - setattr(wrapper,'about',self._about) + setattr(wrapper,'version',self._version) + setattr(wrapper,'doc',self._doc) return wrapper - class PluginLoader : """ This class is intended to load a plugin and make it available and assess the quality of the developed plugin """ + def __init__(self,**_args): """ - :path location of the plugin (should be a single file) - :_names of functions to load """ - _names = _args['names'] if 'names' in _args else None - path = _args['path'] if 'path' in _args else None - self._names = _names if type(_names) == list else [_names] + # _names = _args['names'] if 'names' in _args else None + # path = _args['path'] if 'path' in _args else None + # self._names = _names if type(_names) == list else [_names] self._modules = {} self._names = [] - if path and os.path.exists(path) and _names: - for _name in self._names : + self._registry = _args['registry'] + # if path and os.path.exists(path) and _names: + # for _name in self._names : - spec = importlib.util.spec_from_file_location('private', path) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) #--loads it into sys.modules - if hasattr(module,_name) : - if self.isplugin(module,_name) : - self._modules[_name] = getattr(module,_name) - else: - print ([f'Found {_name}', 'not plugin']) - else: - # - # @TODO: We should log this somewhere some how - print (['skipping ',_name, hasattr(module,_name)]) - pass - else: - # - # Initialization is empty - self._names = [] + # spec = importlib.util.spec_from_file_location('private', path) + # module = importlib.util.module_from_spec(spec) + # spec.loader.exec_module(module) #--loads it into sys.modules + # if hasattr(module,_name) : + # if self.isplugin(module,_name) : + # self._modules[_name] = getattr(module,_name) + # else: + # print ([f'Found {_name}', 'not plugin']) + # else: + # # + # # @TODO: We should log this somewhere some how + # print (['skipping ',_name, hasattr(module,_name)]) + # pass + # else: + # # + # # Initialization is empty + # self._names = [] pass - def set(self,_pointer) : + def load (self,**_args): + self._modules = {} + self._names = [] + path = _args ['path'] + if os.path.exists(path) : + _alias = path.split(os.sep)[-1] + spec = importlib.util.spec_from_file_location(_alias, path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) #--loads it into sys.modules + # self._names = [_name for _name in dir(module) if type(getattr(module,_name)).__name__ == 'function'] + for _name in dir(module) : + if self.isplugin(module,_name) : + self._module[_name] = getattr(module,_name) + # self._names [_name] + def format (self,**_args): + uri = _args['alias'],_args['name'] + # def set(self,_pointer) : + def set(self,_key) : """ This function will set a pointer to the list of modules to be called This should be used within the context of using the framework as a library """ - _name = _pointer.__name__ + # _name = _pointer.__name__ if type(_pointer).__name__ == 'function' else {} + + # self._modules[_name] = _pointer + # self._names.append(_name) + + _pointer = self._registry.get(key=_key) + if _pointer : + self._modules[_key] = _pointer + self._names.append(_key) + elif type(_key).__name__ == 'function': + # + # The pointer is in the code provided by the user and loaded in memory + # + _pointer = _key + self._names.append(_key.__name__) - self._modules[_name] = _pointer - self._names.append(_name) def isplugin(self,module,name): """ This function determines if a module is a recognized plugin diff --git a/transport/providers/__init__.py b/transport/providers/__init__.py index 556df39..b4cf37a 100644 --- a/transport/providers/__init__.py +++ b/transport/providers/__init__.py @@ -11,7 +11,7 @@ BIGQUERY ='bigquery' FILE = 'file' ETL = 'etl' -SQLITE = 'sqlite' +SQLITE = 'sqlite3' SQLITE3= 'sqlite3' DUCKDB = 'duckdb' diff --git a/transport/registry.py b/transport/registry.py index 6764f1b..f3dc8ac 100644 --- a/transport/registry.py +++ b/transport/registry.py @@ -3,6 +3,10 @@ import json from info import __version__ import copy import transport +import importlib +import importlib.util +import shutil + """ This class manages data from the registry and allows (read only) @@ -16,28 +20,182 @@ REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport']) if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ : REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH'] REGISTRY_FILE= 'transport-registry.json' - DATA = {} +class plugins: + # + # This is a utility function that should enable management of plugins-registry + # The class allows to add/remove elements + # + # @TODO: add read/write properties to the class (better design practice) + # + _data = {} + FOLDER = os.sep.join([REGISTRY_PATH,'plugins']) + CODE = os.sep.join([REGISTRY_PATH,'plugins','code']) + FILE = os.sep.join([REGISTRY_PATH,'plugin-registry.json']) + @staticmethod + def init(): + + if not os.path.exists(plugins.FOLDER) : + os.makedirs(plugins.FOLDER) + if not os.path.exists(plugins.CODE): + os.makedirs(plugins.CODE) + if not os.path.exists(plugins.FILE): + f = open(plugins.FILE,'w') + f.write("{}") + f.close() + plugins._read() #-- will load data as a side effect + + @staticmethod + def copy (path) : + + shutil.copy2(path,plugins.CODE) + @staticmethod + def _read (): + f = open(plugins.FILE) + try: + _data = json.loads(f.read()) + f.close() + except Exception as e: + print (f"Corrupted registry, resetting ...") + _data = {} + plugins._write(_data) + + plugins._data = _data + @staticmethod + def _write (_data): + f = open(plugins.FILE,'w') + f.write(json.dumps(_data)) + f.close() + plugins._data = _data + + @staticmethod + def inspect (_path): + _names = [] + + if os.path.exists(_path) : + _filename = _path.split(os.sep)[-1] + spec = importlib.util.spec_from_file_location(_filename, _path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + # _names = [{'name':getattr(getattr(module,_name),'name'),'pointer':getattr(module,_name)} for _name in dir(module) if type( getattr(module,_name)).__name__ == 'function'] + for _name in dir(module) : + _pointer = getattr(module,_name) + if hasattr(_pointer,'transport') : + _item = {'real_name':_name,'name':getattr(_pointer,'name'),'pointer':_pointer,'version':getattr(_pointer,'version')} + _names.append(_item) + + + return _names + @staticmethod + def add (alias,path): + """ + Add overwrite the registry entries + """ + _names = plugins.inspect (path) + _log = [] + + if _names : + # + # We should make sure we have all the plugins with the attributes (transport,name) set + _names = [_item for _item in _names if hasattr(_item['pointer'],'transport') ] + if _names : + plugins.copy(path) + _content = [] + + for _item in _names : + _key = '@'.join([alias,_item['name']]) + _log.append(_item['name']) + # + # Let us update the registry + # + plugins.update(alias,path,_log) + return _log + + @staticmethod + def update (alias,path,_log) : + """ + updating the registry entries of the plugins (management data) + """ + # f = open(plugins.FILE) + # _data = json.loads(f.read()) + # f.close() + _data = plugins._data + # _log = plugins.add(alias,path) + + if _log : + _data[alias] = {'content':_log,'name':path.split(os.sep)[-1]} + plugins._write(_data) #-- will update data as a side effect + + return _log + @staticmethod + def get(**_args) : + # f = open(plugins.FILE) + # _data = json.loads(f.read()) + # f.close() + # if 'key' in _args : + # alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@') + # else : + # alias = _args['alias'] + # name = _args['name'] + + # if alias in _data : + + # _path = os.sep.join([plugins.CODE,_data[alias]['name']]) + # _item = [_item for _item in plugins.inspect(_path) if name == _item['name']] + + # _item = _item[0] if _item else None + # if _item : + + # return _item['pointer'] + # return None + _item = plugins.has(**_args) + return _item['pointer'] if _item else None + + @staticmethod + def has (**_args): + f = open(plugins.FILE) + _data = json.loads(f.read()) + f.close() + if 'key' in _args : + alias,name = _args['key'].split('.') if '.' in _args['key'] else _args['key'].split('@') + else : + alias = _args['alias'] + name = _args['name'] + + if alias in _data : + + _path = os.sep.join([plugins.CODE,_data[alias]['name']]) + _item = [_item for _item in plugins.inspect(_path) if name == _item['name']] + + _item = _item[0] if _item else None + if _item : + + return copy.copy(_item) + return None + @staticmethod + def synch(): + pass def isloaded (): return DATA not in [{},None] -def exists (path=REGISTRY_PATH) : +def exists (path=REGISTRY_PATH,_file=REGISTRY_FILE) : """ This function determines if there is a registry at all """ p = os.path.exists(path) - q = os.path.exists( os.sep.join([path,REGISTRY_FILE])) + q = os.path.exists( os.sep.join([path,_file])) return p and q -def load (_path=REGISTRY_PATH): +def load (_path=REGISTRY_PATH,_file=REGISTRY_FILE): global DATA if exists(_path) : - path = os.sep.join([_path,REGISTRY_FILE]) + path = os.sep.join([_path,_file]) f = open(path) DATA = json.loads(f.read()) f.close() -def init (email,path=REGISTRY_PATH,override=False): +def init (email,path=REGISTRY_PATH,override=False,_file=REGISTRY_FILE): """ Initializing the registry and will raise an exception in the advent of an issue """ @@ -47,7 +205,7 @@ def init (email,path=REGISTRY_PATH,override=False): _config = {"email":email,'version':__version__} if not os.path.exists(path): os.makedirs(path) - filename = os.sep.join([path,REGISTRY_FILE]) + filename = os.sep.join([path,_file]) if not os.path.exists(filename) or override == True : f = open(filename,'w') diff --git a/transport/sql/__init__.py b/transport/sql/__init__.py index b5aaa98..17bdbf3 100644 --- a/transport/sql/__init__.py +++ b/transport/sql/__init__.py @@ -3,7 +3,7 @@ This namespace/package wrap the sql functionalities for a certain data-stores - netezza, postgresql, mysql and sqlite - mariadb, redshift (also included) """ -from . import postgresql, mysql, netezza, sqlite, sqlserver, duckdb +from . import postgresql, mysql, netezza, sqlite3, sqlserver, duckdb # @@ -11,7 +11,7 @@ from . import postgresql, mysql, netezza, sqlite, sqlserver, duckdb # mariadb = mysql redshift = postgresql -sqlite3 = sqlite +# sqlite3 = sqlite # from transport import sql diff --git a/transport/sql/sqlite.py b/transport/sql/sqlite3.py similarity index 83% rename from transport/sql/sqlite.py rename to transport/sql/sqlite3.py index 734ab24..50fa30a 100644 --- a/transport/sql/sqlite.py +++ b/transport/sql/sqlite3.py @@ -1,7 +1,7 @@ import sqlalchemy import pandas as pd from transport.sql.common import Base, BaseReader, BaseWriter -class SQLite (BaseReader): +class SQLite3 (BaseReader): def __init__(self,**_args): super().__init__(**_args) if 'path' in _args : @@ -12,7 +12,7 @@ class SQLite (BaseReader): path = self._database return f'sqlite:///{path}' # ensure this is the correct path for the sqlite file. -class Reader(SQLite,BaseReader): +class Reader(SQLite3,BaseReader): def __init__(self,**_args): super().__init__(**_args) # def read(self,**_args): @@ -20,6 +20,6 @@ class Reader(SQLite,BaseReader): # return pd.read_sql(sql,self._engine) -class Writer (SQLite,BaseWriter): +class Writer (SQLite3,BaseWriter): def __init__(self,**_args): super().__init__(**_args) \ No newline at end of file diff --git a/transport/warehouse/drill.py b/transport/warehouse/drill.py new file mode 100644 index 0000000..551ac6f --- /dev/null +++ b/transport/warehouse/drill.py @@ -0,0 +1,54 @@ +import sqlalchemy +import pandas as pd +from .. sql.common import BaseReader , BaseWriter +import sqlalchemy as sqa + +class Drill : + def __init__(self,**_args): + + self._host = _args['host'] if 'host' in _args else 'localhost' + self._port = _args['port'] if 'port' in _args else self.get_default_port() + self._ssl = False if 'ssl' not in _args else _args['ssl'] + + self._table = _args['table'] if 'table' in _args else None + if self._table and '.' in self._table : + _seg = self._table.split('.') + if len(_seg) > 2 : + self._schema,self._database = _seg[:2] + else: + + self._database=_args['database'] + self._schema = self._database.split('.')[0] + + def _get_uri(self,**_args): + return f'drill+sadrill://{self._host}:{self._port}/{self._schema}?use_ssl={self._ssl}' + def get_provider(self): + return "drill+sadrill" + def get_default_port(self): + return "8047" + def meta(self,**_args): + _table = _args['table'] if 'table' in _args else self._table + if '.' in _table : + _schema = _table.split('.')[:2] + _schema = '.'.join(_schema) + _table = _table.split('.')[-1] + else: + _schema = self._schema + + # _sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( 125 )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'" + _sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( '||COLUMN_SIZE||' )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'" + try: + _df = pd.read_sql(_sql,self._engine) + return _df.to_dict(orient='records') + except Exception as e: + print (e) + pass + return [] +class Reader (Drill,BaseReader) : + def __init__(self,**_args): + super().__init__(**_args) + self._chunksize = 0 if 'chunksize' not in _args else _args['chunksize'] + self._engine= sqa.create_engine(self._get_uri(),future=True) +class Writer(Drill,BaseWriter): + def __init__(self,**_args): + super().__init__(self,**_args) \ No newline at end of file