diff --git a/info/__init__.py b/info/__init__.py index ee36366..9d32231 100644 --- a/info/__init__.py +++ b/info/__init__.py @@ -1,6 +1,6 @@ __app_name__ = 'data-transport' __author__ = 'The Phi Technology' -__version__= '2.4.0' +__version__= '2.4.3' __email__ = "info@the-phi.com" __license__=f""" Copyright 2010 - 2024, Steve L. Nyemba diff --git a/transport/__init__.py b/transport/__init__.py index c07a1df..64bda0d 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -38,11 +38,16 @@ def init(): if _provider_name.startswith('__') or _provider_name == 'common': continue PROVIDERS[_provider_name] = {'module':getattr(_module,_provider_name),'type':_module.__name__} -def _getauthfile (path) : - f = open(path) - _object = json.loads(f.read()) - f.close() - return _object + # + # loading the registry + if not registry.isloaded() : + registry.load() + +# def _getauthfile (path) : +# f = open(path) +# _object = json.loads(f.read()) +# f.close() +# return _object def instance (**_args): """ This function returns an object of to read or write from a supported database provider/vendor @@ -52,16 +57,7 @@ def instance (**_args): kwargs These are arguments that are provider/vendor specific """ global PROVIDERS - # if not registry.isloaded () : - # if ('path' in _args and registry.exists(_args['path'] )) or registry.exists(): - # registry.load() if 'path' not in _args else registry.load(_args['path']) - # print ([' GOT IT']) - # if 'label' in _args and registry.isloaded(): - # _info = registry.get(_args['label']) - # if _info : - # # - # _args = dict(_args,**_info) - + if 'auth_file' in _args: if os.path.exists(_args['auth_file']) : # @@ -78,7 +74,7 @@ def instance (**_args): filename = _args['auth_file'] raise Exception(f" {filename} was not found or is invalid") if 'provider' not in _args and 'auth_file' not in _args : - if not registry.isloaded () : + if not registry.isloaded () : if ('path' in _args and registry.exists(_args['path'] )) or registry.exists(): registry.load() if 'path' not in _args else registry.load(_args['path']) _info = {} @@ -87,8 +83,6 @@ def instance (**_args): else: _info = registry.get() if _info : - # - # _args = dict(_args,**_info) _args = dict(_info,**_args) #-- we can override the registry parameters with our own arguments if 'provider' in _args and _args['provider'] in PROVIDERS : @@ -119,9 +113,27 @@ def instance (**_args): # for _delegate in _params : # loader.set(_delegate) - loader = None if 'plugins' not in _args else _args['plugins'] + _plugins = None if 'plugins' not in _args else _args['plugins'] - return IReader(_agent,loader) if _context == 'read' else IWriter(_agent,loader) + # if registry.has('logger') : + # _kwa = registry.get('logger') + # _lmodule = getPROVIDERS[_kwa['provider']] + + if ('label' not in _args and registry.has('logger')): + # + # We did not request label called logger, so we are setting up a logger if it is specified in the registry + # + _kwargs = registry.get('logger') + _kwargs['context'] = 'write' + _kwargs['table'] =_module.__name__.split('.')[-1]+'_logs' + # _logger = instance(**_kwargs) + _module = PROVIDERS[_kwargs['provider']]['module'] + _logger = getattr(_module,'Writer') + _logger = _logger(**_kwargs) + else: + _logger = None + _datatransport = IReader(_agent,_plugins,_logger) if _context == 'read' else IWriter(_agent,_plugins,_logger) + return _datatransport else: # @@ -138,7 +150,14 @@ class get : if not _args or ('provider' not in _args and 'label' not in _args): _args['label'] = 'default' _args['context'] = 'read' - return instance(**_args) + # return instance(**_args) + # _args['logger'] = instance(**{'label':'logger','context':'write','table':'logs'}) + + _handler = instance(**_args) + # _handler.setLogger(get.logger()) + return _handler + + @staticmethod def writer(**_args): """ @@ -147,10 +166,26 @@ class get : if not _args or ('provider' not in _args and 'label' not in _args): _args['label'] = 'default' _args['context'] = 'write' - return instance(**_args) + # _args['logger'] = instance(**{'label':'logger','context':'write','table':'logs'}) + + _handler = instance(**_args) + # + # Implementing logging with the 'eat-your-own-dog-food' approach + # Using dependency injection to set the logger (problem with imports) + # + # _handler.setLogger(get.logger()) + return _handler + @staticmethod + def logger (): + if registry.has('logger') : + _args = registry.get('logger') + _args['context'] = 'write' + return instance(**_args) + return None @staticmethod def etl (**_args): if 'source' in _args and 'target' in _args : + return IETL(**_args) else: raise Exception ("Malformed input found, object must have both 'source' and 'target' attributes") diff --git a/transport/cloud/bigquery.py b/transport/cloud/bigquery.py index ba720af..205d947 100644 --- a/transport/cloud/bigquery.py +++ b/transport/cloud/bigquery.py @@ -15,6 +15,7 @@ import time MAX_CHUNK = 2000000 class BigQuery: + __template__= {"private_key":None,"dataset":None,"table":None,} def __init__(self,**_args): path = _args['service_key'] if 'service_key' in _args else _args['private_key'] self.credentials = service_account.Credentials.from_service_account_file(path) diff --git a/transport/cloud/databricks.py b/transport/cloud/databricks.py index 5c1ee0d..a5fa4c0 100644 --- a/transport/cloud/databricks.py +++ b/transport/cloud/databricks.py @@ -26,6 +26,7 @@ class Bricks: :cluster_path :table """ + __template__ = {"host":None,"token":None,"cluster_path":None,"catalog":None,"schema":None} def __init__(self,**_args): _host = _args['host'] _token= _args['token'] diff --git a/transport/cloud/nextcloud.py b/transport/cloud/nextcloud.py index ebb44d3..16a37af 100644 --- a/transport/cloud/nextcloud.py +++ b/transport/cloud/nextcloud.py @@ -10,6 +10,7 @@ import json import nextcloud_client as nextcloud class Nextcloud : + __template__={"url":None,"token":None,"uid":None,"file":None} def __init__(self,**_args): pass self._delimiter = None diff --git a/transport/cloud/s3.py b/transport/cloud/s3.py index 81e2e69..4d7c5b9 100644 --- a/transport/cloud/s3.py +++ b/transport/cloud/s3.py @@ -24,6 +24,7 @@ class s3 : """ @TODO: Implement a search function for a file given a bucket?? """ + __template__={"access_key":None,"secret_key":None,"bucket":None,"file":None,"region":None} def __init__(self,**args) : """ This function will extract a file or set of files from s3 bucket provided diff --git a/transport/iowrapper.py b/transport/iowrapper.py index d5ca23e..224f618 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -8,24 +8,47 @@ NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading o from transport.plugins import PluginLoader import transport from transport import providers -from multiprocessing import Process +from multiprocessing import Process, RLock import time import types from . import registry +from datetime import datetime +import pandas as pd +import os +import sys class IO: """ Base wrapper class for read/write and support for logs """ - def __init__(self,_agent,plugins): + def __init__(self,_agent,plugins,_logger=None): + + # + # We need to initialize the logger here ... + # + # registry.init() + + self._logger = _logger #transport.get.writer(label='logger') #if registry.has('logger') else None + if not _logger and hasattr(_agent,'_logger') : + self._logger = getattr(_agent,'_logger') self._agent = _agent + _date = _date = str(datetime.now()) + self._logTable = 'logs' #'_'.join(['logs',_date[:10]+_date[11:19]]).replace(':','').replace('-','_') if plugins : self._init_plugins(plugins) else: self._plugins = None - + def setLogger(self,_logger): + self._logger = _logger + def log (self,**_args): + if self._logger : + _date = str(datetime.now()) + _data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args) + for key in _data : + _data[key] = str(_data[key]) + self._logger.write(pd.DataFrame([_data])) #,table=self._logTable) def _init_plugins(self,_items): """ This function will load pipelined functions as a plugin loader @@ -33,6 +56,7 @@ class IO: registry.plugins.init() self._plugins = PluginLoader(registry=registry.plugins) [self._plugins.set(_name) for _name in _items] + self.log(action='init-plugins',caller='read', input =[_name for _name in _items]) # if 'path' in _args and 'names' in _args : # self._plugins = PluginLoader(**_args) # else: @@ -69,38 +93,64 @@ class IReader(IO): """ This is a wrapper for read functionalities """ - def __init__(self,_agent,pipeline=None): - super().__init__(_agent,pipeline) + def __init__(self,_agent,_plugins=None,_logger=None): + super().__init__(_agent,_plugins,_logger) def _stream (self,_data ): - for _segment in _data : - - yield self._plugins.apply(_segment) + # self.log(action='streaming',object=self._agent._engine.name, input= type(_data).__name__) + _shape = [] + for _segment in _data : + _shape.append(list(_segment.shape)) + yield self._plugins.apply(_segment,self.log) + + self.log(action='streaming',object=self._agent._engine.name, input= {'shape':_shape}) + def read(self,**_args): + if 'plugins' in _args : + self._init_plugins(_args['plugins']) - _data = self._agent.read(**_args) - if self._plugins and self._plugins.ratio() > 0 : - - if types.GeneratorType == type(_data): - - return self._stream(_data) + _data = self._agent.read(**_args) + _objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__]) + if types.GeneratorType == type(_data): + if self._plugins : + return self._stream(_data) else: - _data = self._plugins.apply(_data) + self.log(action='streaming',object=_objectName, input= {'memory_size':sys.getsizeof(_data)}) return _data else: + self.log(action='read',object=_objectName, input=_data.shape) + if self._plugins : + _data = self._plugins.apply(_data) return _data + + # if self._plugins and self._plugins.ratio() > 0 : + + # if types.GeneratorType == type(_data): + + # return self._stream(_data) + # else: + # _data = self._plugins.apply(_data) + # return _data + # else: + # self.log(action='read',object=self._agent._engine.name, input=_data.shape) + # return _data class IWriter(IO): - def __init__(self,_agent,pipeline=None): - super().__init__(_agent,pipeline) + lock = RLock() + def __init__(self,_agent,pipeline=None,_logger=None): + super().__init__(_agent,pipeline,_logger) def write(self,_data,**_args): if 'plugins' in _args : self._init_plugins(_args['plugins']) if self._plugins and self._plugins.ratio() > 0 : - _data = self._plugins.apply(_data) - - self._agent.write(_data,**_args) + _data = self._plugins.apply(_data,self._logger) + try: + # IWriter.lock.acquire() + self._agent.write(_data,**_args) + finally: + # IWriter.lock.release() + pass # # The ETL object in its simplest form is an aggregation of read/write objects @@ -112,7 +162,6 @@ class IETL(IReader) : """ def __init__(self,**_args): super().__init__(transport.get.reader(**_args['source'])) - if 'target' in _args: self._targets = _args['target'] if type(_args['target']) == list else [_args['target']] else: @@ -121,25 +170,25 @@ class IETL(IReader) : # # If the parent is already multiprocessing self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] - def run(self) : - """ - We should apply the etl here, if we are in multiprocessing mode - """ - _data = super().read() - for _kwargs in self._targets : - self.post(_data,**_kwargs) - - def read(self,**_args): + # def run(self) : + # """ + # We should apply the etl here, if we are in multiprocessing mode + # """ + # return self.read() + def run(self,**_args): _data = super().read(**_args) + self._targets = [transport.get.writer(**_kwargs) for _kwargs in self._targets] if types.GeneratorType == type(_data): + _index = 0 for _segment in _data : - for _kwars in self._targets : - self.post(_segment,**_kwargs) + _index += 1 + for _writer in self._targets : + self.post(_segment,writer=_writer,index=_index) + time.sleep(1) else: - - for _kwargs in self._targets : - self.post(_data,**_kwargs) + for _writer in self._targets : + self.post(_data,writer=_writer) return _data # return _data @@ -148,6 +197,19 @@ class IETL(IReader) : This function returns an instance of a process that will perform the write operation :_args parameters associated with writer object """ - writer = transport.get.writer(**_args) - writer.write(_data) - writer.close() \ No newline at end of file + #writer = transport.get.writer(**_args) + + try: + _action = 'post' + _shape = dict(zip(['rows','columns'],_data.shape)) + _index = _args['index'] if 'index' in _args else 0 + writer = _args['writer'] + + writer.write(_data) + + except Exception as e: + _action = 'post-error' + print (e) + pass + + self.log(action=_action,object=writer._agent.__module__, input= {'shape':_shape,'segment':_index}) diff --git a/transport/nosql/couchdb.py b/transport/nosql/couchdb.py index aa503fb..9449947 100644 --- a/transport/nosql/couchdb.py +++ b/transport/nosql/couchdb.py @@ -19,6 +19,7 @@ class Couch: @param doc user id involved @param dbname database name (target) """ + __template__={"url":None,"doc":None,"dbname":None,"username":None,"password":None} def __init__(self,**args): url = args['url'] if 'url' in args else 'http://localhost:5984' self._id = args['doc'] diff --git a/transport/nosql/mongodb.py b/transport/nosql/mongodb.py index 503f821..4c216bd 100644 --- a/transport/nosql/mongodb.py +++ b/transport/nosql/mongodb.py @@ -25,6 +25,7 @@ class Mongo : """ Basic mongodb functions are captured here """ + __template__={"db":None,"collection":None,"host":None,"port":None,"username":None,"password":None} def __init__(self,**args): """ :dbname database name/identifier diff --git a/transport/other/files.py b/transport/other/files.py index e0cc2af..6679285 100644 --- a/transport/other/files.py +++ b/transport/other/files.py @@ -27,14 +27,14 @@ class Reader (File): def __init__(self,**_args): super().__init__(**_args) def _stream(self,path) : - reader = pd.read_csv(path,delimiter=self.delimiter,chunksize=self._chunksize) + reader = pd.read_csv(path,sep=self.delimiter,chunksize=self._chunksize,low_memory=False) for segment in reader : yield segment def read(self,**args): _path = self.path if 'path' not in args else args['path'] _delimiter = self.delimiter if 'delimiter' not in args else args['delimiter'] - return pd.read_csv(_path,delimiter=self.delimiter) if not self._chunksize else self._stream(_path) + return pd.read_csv(_path,sep=self.delimiter) if not self._chunksize else self._stream(_path) def stream(self,**args): raise Exception ("streaming needs to be implemented") class Writer (File): diff --git a/transport/plugins/__init__.py b/transport/plugins/__init__.py index 34d88d0..db99416 100644 --- a/transport/plugins/__init__.py +++ b/transport/plugins/__init__.py @@ -11,6 +11,8 @@ import importlib as IL import importlib.util import sys import os +import pandas as pd + class Plugin : """ @@ -54,26 +56,7 @@ class PluginLoader : self._modules = {} self._names = [] self._registry = _args['registry'] - # if path and os.path.exists(path) and _names: - # for _name in self._names : - - # spec = importlib.util.spec_from_file_location('private', path) - # module = importlib.util.module_from_spec(spec) - # spec.loader.exec_module(module) #--loads it into sys.modules - # if hasattr(module,_name) : - # if self.isplugin(module,_name) : - # self._modules[_name] = getattr(module,_name) - # else: - # print ([f'Found {_name}', 'not plugin']) - # else: - # # - # # @TODO: We should log this somewhere some how - # print (['skipping ',_name, hasattr(module,_name)]) - # pass - # else: - # # - # # Initialization is empty - # self._names = [] + pass def load (self,**_args): self._modules = {} @@ -84,7 +67,6 @@ class PluginLoader : spec = importlib.util.spec_from_file_location(_alias, path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) #--loads it into sys.modules - # self._names = [_name for _name in dir(module) if type(getattr(module,_name)).__name__ == 'function'] for _name in dir(module) : if self.isplugin(module,_name) : self._module[_name] = getattr(module,_name) @@ -97,11 +79,6 @@ class PluginLoader : This function will set a pointer to the list of modules to be called This should be used within the context of using the framework as a library """ - # _name = _pointer.__name__ if type(_pointer).__name__ == 'function' else {} - - # self._modules[_name] = _pointer - # self._names.append(_name) - _pointer = self._registry.get(key=_key) if _pointer : self._modules[_key] = _pointer @@ -137,12 +114,30 @@ class PluginLoader : _n = len(self._names) return len(set(self._modules.keys()) & set (self._names)) / _n - def apply(self,_data): + def apply(self,_data,_logger=None): + _rows = [] + _input= {} + for _name in self._modules : - _pointer = self._modules[_name] - # - # @TODO: add exception handling - _data = _pointer(_data) + try: + _input = {'action':'plugin','object':_name,'input':{'status':'PASS'}} + _pointer = self._modules[_name] + if type(_data) == list : + _data = pd.DataFrame(_data) + _brow,_bcol = list(_data.shape) + + # + # @TODO: add exception handling + _data = _pointer(_data) + + _input['input']['shape'] = {'dropped':{'rows':_brow - _data.shape[0],'cols':_bcol-_data.shape[1]}} + except Exception as e: + _input['input']['status'] = 'FAILED' + print (e) + + if _logger: + _logger(**_input) + pass return _data # def apply(self,_data,_name): # """ diff --git a/transport/registry.py b/transport/registry.py index f3dc8ac..1f612dc 100644 --- a/transport/registry.py +++ b/transport/registry.py @@ -220,6 +220,8 @@ def init (email,path=REGISTRY_PATH,override=False,_file=REGISTRY_FILE): def lookup (label): global DATA return label in DATA +has = lookup + def get (label='default') : global DATA return copy.copy(DATA[label]) if label in DATA else {} diff --git a/transport/sql/common.py b/transport/sql/common.py index 1988dd6..6dcab6d 100644 --- a/transport/sql/common.py +++ b/transport/sql/common.py @@ -7,6 +7,7 @@ from sqlalchemy import text import pandas as pd + class Base: def __init__(self,**_args): # print ([' ## ',_args]) @@ -122,23 +123,21 @@ class BaseWriter (SQLBase): def __init__(self,**_args): super().__init__(**_args) def write(self,_data,**_args): - if type(_data) == dict : - _df = pd.DataFrame(_data) - elif type(_data) == list : + if type(_data) in [list,dict] : _df = pd.DataFrame(_data) + # elif type(_data) == list : + # _df = pd.DataFrame(_data) else: _df = _data.copy() # # We are assuming we have a data-frame at this point # _table = _args['table'] if 'table' in _args else self._table - _mode = {'chunksize':2000000,'if_exists':'append','index':False} + _mode = {'if_exists':'append','index':False} + if self._chunksize : + _mode['chunksize'] = self._chunksize for key in ['if_exists','index','chunksize'] : if key in _args : _mode[key] = _args[key] - # if 'schema' in _args : - # _mode['schema'] = _args['schema'] - # if 'if_exists' in _args : - # _mode['if_exists'] = _args['if_exists'] - + _df.to_sql(_table,self._engine,**_mode) \ No newline at end of file diff --git a/transport/sql/duckdb.py b/transport/sql/duckdb.py index 06f66e5..496e364 100644 --- a/transport/sql/duckdb.py +++ b/transport/sql/duckdb.py @@ -2,8 +2,9 @@ This module implements the handler for duckdb (in memory or not) """ from transport.sql.common import Base, BaseReader, BaseWriter - +from multiprocessing import RLock class Duck : + lock = RLock() def __init__(self,**_args): # # duckdb with none as database will operate as an in-memory database @@ -22,3 +23,4 @@ class Writer(Duck,BaseWriter): def __init__(self,**_args): Duck.__init__(self,**_args) BaseWriter.__init__(self,**_args) + \ No newline at end of file diff --git a/transport/sql/sqlite3.py b/transport/sql/sqlite3.py index 50fa30a..d71441e 100644 --- a/transport/sql/sqlite3.py +++ b/transport/sql/sqlite3.py @@ -1,7 +1,9 @@ import sqlalchemy import pandas as pd from transport.sql.common import Base, BaseReader, BaseWriter -class SQLite3 (BaseReader): +from multiprocessing import RLock +class SQLite3 : + lock = RLock() def __init__(self,**_args): super().__init__(**_args) if 'path' in _args : @@ -22,4 +24,10 @@ class Reader(SQLite3,BaseReader): class Writer (SQLite3,BaseWriter): def __init__(self,**_args): - super().__init__(**_args) \ No newline at end of file + super().__init__(**_args) + def write(self,_data,**_kwargs): + try: + SQLite3.lock.acquire() + super().write(_data,**_kwargs) + finally: + SQLite3.lock.release() \ No newline at end of file diff --git a/transport/warehouse/iceberg.py b/transport/warehouse/iceberg.py index a22c16e..8d54300 100644 --- a/transport/warehouse/iceberg.py +++ b/transport/warehouse/iceberg.py @@ -51,7 +51,6 @@ class Iceberg : _schema = [] try: _tableName = self._getPrefix(**_args) + f".{_args['table']}" - print (_tableName) _tmp = self._session.table(_tableName).schema _schema = _tmp.jsonValue()['fields'] for _item in _schema : @@ -77,6 +76,8 @@ class Iceberg : return False def apply(self,sql): pass + def close(self): + self._session.stop() class Reader(Iceberg) : def __init__(self,**_args): super().__init__(**_args) @@ -103,13 +104,20 @@ class Writer (Iceberg): _prefix = self._getPrefix(**_args) if 'table' not in _args and not self._table : raise Exception (f"Table Name should be specified for catalog/database {_prefix}") - rdd = self._session.createDataFrame(_data) + + rdd = self._session.createDataFrame(_data,verifySchema=False) _mode = self._mode if 'mode' not in _args else _args['mode'] _table = self._table if 'table' not in _args else _args['table'] - - if not self.has(table=_table) : - _mode = 'overwrite' - rdd.write.format('iceberg').mode(_mode).saveAsTable(_table) + # print (_data.shape,_mode,_table) + + if not self._session.catalog.tableExists(_table): + # # @TODO: + # # add partitioning information here + rdd.writeTo(_table).using('iceberg').create() + + # # _mode = 'overwrite' + # # rdd.write.format('iceberg').mode(_mode).saveAsTable(_table) else: - _table = f'{_prefix}.{_table}' - rdd.write.format('iceberg').mode(_mode).save(_table) + # rdd.writeTo(_table).append() + # # _table = f'{_prefix}.{_table}' + rdd.write.format('iceberg').mode('append').save(_table)