diff --git a/bin/transport b/bin/transport index 20d1f7a..bb35f7a 100755 --- a/bin/transport +++ b/bin/transport @@ -46,7 +46,7 @@ def wait(jobs): jobs = [thread for thread in jobs if thread.is_alive()] time.sleep(1) -@app.command(name="apply") +@app.command(name="apply-etl") def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")], index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed")): """ @@ -57,7 +57,7 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil file = open(path) _config = json.loads (file.read() ) file.close() - if index : + if index : _config = [_config[ int(index)]] jobs = [] for _args in _config : diff --git a/info/__init__.py b/info/__init__.py index 056d8ac..99deb57 100644 --- a/info/__init__.py +++ b/info/__init__.py @@ -1,6 +1,6 @@ __app_name__ = 'data-transport' __author__ = 'The Phi Technology' -__version__= '2.4.6' +__version__= '2.4.7' __email__ = "info@the-phi.com" __license__=f""" Copyright 2010 - 2024, Steve L. Nyemba diff --git a/transport/iowrapper.py b/transport/iowrapper.py index 2732003..a464fde 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -48,6 +48,9 @@ class IO: _date = str(datetime.now()) _data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args) for key in _data : + if type(_data[key]) == list : + _data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]] + _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key]) self._logger.write(pd.DataFrame([_data])) #,table=self._logTable) def _init_plugins(self,_items): @@ -57,7 +60,8 @@ class IO: registry.plugins.init() self._plugins = PluginLoader(registry=registry.plugins) [self._plugins.set(_name) for _name in _items] - self.log(action='init-plugins',caller='read', input =[_name for _name in _items]) + + self.log(action='init-plugins',caller='read',object=self.getClassName(self),input =[_name for _name in _items]) # if 'path' in _args and 'names' in _args : # self._plugins = PluginLoader(**_args) # else: @@ -69,7 +73,8 @@ class IO: if hasattr(self._agent,'meta') : return self._agent.meta(**_args) return [] - + def getClassName (self,_object): + return '.'.join([_object.__class__.__module__,_object.__class__.__name__]) def close(self): if hasattr(self._agent,'close') : self._agent.close() @@ -79,6 +84,7 @@ class IO: """ for _pointer in self._plugins : _data = _pointer(_data) + time.sleep(1) def apply(self,_query): if hasattr(self._agent,'apply') : return self._agent.apply(_query) diff --git a/transport/plugins/__init__.py b/transport/plugins/__init__.py index 2fa77e3..760b66c 100644 --- a/transport/plugins/__init__.py +++ b/transport/plugins/__init__.py @@ -12,7 +12,7 @@ import importlib.util import sys import os import pandas as pd - +import time class Plugin : """ @@ -79,16 +79,19 @@ class PluginLoader : This function will set a pointer to the list of modules to be called This should be used within the context of using the framework as a library """ - _pointer = self._registry.get(key=_key) - if _pointer : - self._modules[_key] = _pointer - self._names.append(_key) - elif type(_key).__name__ == 'function': + if type(_key).__name__ == 'function': # # The pointer is in the code provided by the user and loaded in memory # _pointer = _key - self._names.append(_key.__name__) + _key = 'inline@'+_key.__name__ + # self._names.append(_key.__name__) + else: + _pointer = self._registry.get(key=_key) + + if _pointer : + self._modules[_key] = _pointer + self._names.append(_key) def isplugin(self,module,name): """ @@ -129,13 +132,16 @@ class PluginLoader : # @TODO: add exception handling _data = _pointer(_data) - _input['input']['shape'] = {'dropped':{'rows':_brow - _data.shape[0],'cols':_bcol-_data.shape[1]}} + _input['input']['shape'] = {'rows-dropped':_brow - _data.shape[0]} except Exception as e: _input['input']['status'] = 'FAILED' print (e) - + time.sleep(1) if _logger: - _logger(_input) + try: + _logger(**_input) + except Exception as e: + pass return _data # def apply(self,_data,_name): # """ diff --git a/transport/warehouse/iceberg.py b/transport/warehouse/iceberg.py index 77e16ad..518f570 100644 --- a/transport/warehouse/iceberg.py +++ b/transport/warehouse/iceberg.py @@ -21,7 +21,7 @@ class Iceberg : # @TODO: # Make arrangements for additional configuration elements # - self._session = SparkSession.builder.getOrCreate() + self._session = SparkSession.builder.appName("data-transport").getOrCreate() # self._session.sparkContext.setLogLevel("ERROR") self._catalog = self._session.catalog self._table = _args['table'] if 'table' in _args else None @@ -123,4 +123,4 @@ class Writer (Iceberg): else: # rdd.writeTo(_table).append() # # _table = f'{_prefix}.{_table}' - rdd.write.format('iceberg').mode('append').save(_table) + rdd.coalesce(10).write.format('iceberg').mode('append').save(_table)