diff --git a/bin/transport b/bin/transport index 371d579..20d1f7a 100755 --- a/bin/transport +++ b/bin/transport @@ -104,7 +104,7 @@ def generate (path:Annotated[str,typer.Argument(help="path of the ETL configurat { "source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"}, "target": - [{"provider":"files","path":"addresses.csv","delimiter":","},{"provider":"sqlite","database":"sample.db3","table":"addresses"}] + [{"provider":"files","path":"addresses.csv","delimiter":","},{"provider":"sqlite3","database":"sample.db3","table":"addresses"}] } ] file = open(path,'w') diff --git a/info/__init__.py b/info/__init__.py index 9d32231..4d32224 100644 --- a/info/__init__.py +++ b/info/__init__.py @@ -1,6 +1,6 @@ __app_name__ = 'data-transport' __author__ = 'The Phi Technology' -__version__= '2.4.3' +__version__= '2.4.4' __email__ = "info@the-phi.com" __license__=f""" Copyright 2010 - 2024, Steve L. Nyemba diff --git a/transport/cloud/bigquery.py b/transport/cloud/bigquery.py index 205d947..cf2e3d6 100644 --- a/transport/cloud/bigquery.py +++ b/transport/cloud/bigquery.py @@ -15,7 +15,7 @@ import time MAX_CHUNK = 2000000 class BigQuery: - __template__= {"private_key":None,"dataset":None,"table":None,} + __template__= {"private_key":None,"dataset":None,"table":None} def __init__(self,**_args): path = _args['service_key'] if 'service_key' in _args else _args['private_key'] self.credentials = service_account.Credentials.from_service_account_file(path) diff --git a/transport/iowrapper.py b/transport/iowrapper.py index 224f618..e555ff5 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -16,7 +16,8 @@ from datetime import datetime import pandas as pd import os import sys - +import itertools +import json class IO: """ @@ -29,9 +30,9 @@ class IO: # # registry.init() - self._logger = _logger #transport.get.writer(label='logger') #if registry.has('logger') else None - if not _logger and hasattr(_agent,'_logger') : - self._logger = getattr(_agent,'_logger') + self._logger = _logger if not type(_agent) in [IReader,IWriter] else _agent._logger #transport.get.writer(label='logger') #if registry.has('logger') else None + # if not _logger and hasattr(_agent,'_logger') : + # self._logger = getattr(_agent,'_logger') self._agent = _agent _date = _date = str(datetime.now()) self._logTable = 'logs' #'_'.join(['logs',_date[:10]+_date[11:19]]).replace(':','').replace('-','_') @@ -47,8 +48,11 @@ class IO: _date = str(datetime.now()) _data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args) for key in _data : - _data[key] = str(_data[key]) + _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key]) self._logger.write(pd.DataFrame([_data])) #,table=self._logTable) + else: + print ([' ********** ']) + print (_args) def _init_plugins(self,_items): """ This function will load pipelined functions as a plugin loader @@ -117,12 +121,19 @@ class IReader(IO): if self._plugins : return self._stream(_data) else: - self.log(action='streaming',object=_objectName, input= {'memory_size':sys.getsizeof(_data)}) - return _data + _count = 0 + for _segment in _data : + _count += 1 + yield _segment + self.log(action='streaming',object=_objectName, input= {'segments':_count}) + # return _data else: + + self.log(action='read',object=_objectName, input=_data.shape) if self._plugins : - _data = self._plugins.apply(_data) + _logs = [] + _data = self._plugins.apply(_data,self.log) return _data # if self._plugins and self._plugins.ratio() > 0 : @@ -144,7 +155,10 @@ class IWriter(IO): if 'plugins' in _args : self._init_plugins(_args['plugins']) if self._plugins and self._plugins.ratio() > 0 : - _data = self._plugins.apply(_data,self._logger) + _logs = [] + _data = self._plugins.apply(_data,_logs,self.log) + + # [self.log(**_item) for _item in _logs] try: # IWriter.lock.acquire() self._agent.write(_data,**_args) @@ -161,7 +175,13 @@ class IETL(IReader) : This class performs an ETL operation by ineriting a read and adding writes as pipeline functions """ def __init__(self,**_args): - super().__init__(transport.get.reader(**_args['source'])) + _source = _args['source'] + _plugins = _source['plugins'] if 'plugins' in _source else None + + # super().__init__(transport.get.reader(**_args['source'])) + super().__init__(transport.get.reader(**_source),_plugins) + + # _logger = if 'target' in _args: self._targets = _args['target'] if type(_args['target']) == list else [_args['target']] else: diff --git a/transport/plugins/__init__.py b/transport/plugins/__init__.py index db99416..2fa77e3 100644 --- a/transport/plugins/__init__.py +++ b/transport/plugins/__init__.py @@ -114,8 +114,7 @@ class PluginLoader : _n = len(self._names) return len(set(self._modules.keys()) & set (self._names)) / _n - def apply(self,_data,_logger=None): - _rows = [] + def apply(self,_data,_logger=[]): _input= {} for _name in self._modules : @@ -136,8 +135,7 @@ class PluginLoader : print (e) if _logger: - _logger(**_input) - pass + _logger(_input) return _data # def apply(self,_data,_name): # """ diff --git a/transport/sql/common.py b/transport/sql/common.py index 6dcab6d..988a4ba 100644 --- a/transport/sql/common.py +++ b/transport/sql/common.py @@ -9,6 +9,7 @@ import pandas as pd class Base: + __template__={"host":None,"port":1,"database":None,"table":None,"username":None,"password":None} def __init__(self,**_args): # print ([' ## ',_args]) self._host = _args['host'] if 'host' in _args else 'localhost' diff --git a/transport/sql/sqlserver.py b/transport/sql/sqlserver.py index 6a53842..6cc9e38 100644 --- a/transport/sql/sqlserver.py +++ b/transport/sql/sqlserver.py @@ -7,6 +7,7 @@ from transport.sql.common import Base, BaseReader, BaseWriter class MsSQLServer: + def __init__(self,**_args) : super().__init__(**_args) pass