bug fixes: drill & iceberg, etl

v2.4
Steve Nyemba 13 hours ago
parent a2ab60660e
commit bcf25a4e27

@ -46,7 +46,7 @@ def wait(jobs):
jobs = [thread for thread in jobs if thread.is_alive()] jobs = [thread for thread in jobs if thread.is_alive()]
time.sleep(1) time.sleep(1)
@app.command(name="apply") @app.command(name="apply-etl")
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")], def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed")): index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed")):
""" """
@ -57,7 +57,7 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil
file = open(path) file = open(path)
_config = json.loads (file.read() ) _config = json.loads (file.read() )
file.close() file.close()
if index : if index :
_config = [_config[ int(index)]] _config = [_config[ int(index)]]
jobs = [] jobs = []
for _args in _config : for _args in _config :

@ -1,6 +1,6 @@
__app_name__ = 'data-transport' __app_name__ = 'data-transport'
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '2.4.6' __version__= '2.4.7'
__email__ = "info@the-phi.com" __email__ = "info@the-phi.com"
__license__=f""" __license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba Copyright 2010 - 2024, Steve L. Nyemba

@ -48,6 +48,9 @@ class IO:
_date = str(datetime.now()) _date = str(datetime.now())
_data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args) _data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args)
for key in _data : for key in _data :
if type(_data[key]) == list :
_data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]]
_data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key]) _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key])
self._logger.write(pd.DataFrame([_data])) #,table=self._logTable) self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
def _init_plugins(self,_items): def _init_plugins(self,_items):
@ -57,7 +60,8 @@ class IO:
registry.plugins.init() registry.plugins.init()
self._plugins = PluginLoader(registry=registry.plugins) self._plugins = PluginLoader(registry=registry.plugins)
[self._plugins.set(_name) for _name in _items] [self._plugins.set(_name) for _name in _items]
self.log(action='init-plugins',caller='read', input =[_name for _name in _items])
self.log(action='init-plugins',caller='read',object=self.getClassName(self),input =[_name for _name in _items])
# if 'path' in _args and 'names' in _args : # if 'path' in _args and 'names' in _args :
# self._plugins = PluginLoader(**_args) # self._plugins = PluginLoader(**_args)
# else: # else:
@ -69,7 +73,8 @@ class IO:
if hasattr(self._agent,'meta') : if hasattr(self._agent,'meta') :
return self._agent.meta(**_args) return self._agent.meta(**_args)
return [] return []
def getClassName (self,_object):
return '.'.join([_object.__class__.__module__,_object.__class__.__name__])
def close(self): def close(self):
if hasattr(self._agent,'close') : if hasattr(self._agent,'close') :
self._agent.close() self._agent.close()
@ -79,6 +84,7 @@ class IO:
""" """
for _pointer in self._plugins : for _pointer in self._plugins :
_data = _pointer(_data) _data = _pointer(_data)
time.sleep(1)
def apply(self,_query): def apply(self,_query):
if hasattr(self._agent,'apply') : if hasattr(self._agent,'apply') :
return self._agent.apply(_query) return self._agent.apply(_query)

@ -12,7 +12,7 @@ import importlib.util
import sys import sys
import os import os
import pandas as pd import pandas as pd
import time
class Plugin : class Plugin :
""" """
@ -79,16 +79,19 @@ class PluginLoader :
This function will set a pointer to the list of modules to be called This function will set a pointer to the list of modules to be called
This should be used within the context of using the framework as a library This should be used within the context of using the framework as a library
""" """
_pointer = self._registry.get(key=_key) if type(_key).__name__ == 'function':
if _pointer :
self._modules[_key] = _pointer
self._names.append(_key)
elif type(_key).__name__ == 'function':
# #
# The pointer is in the code provided by the user and loaded in memory # The pointer is in the code provided by the user and loaded in memory
# #
_pointer = _key _pointer = _key
self._names.append(_key.__name__) _key = 'inline@'+_key.__name__
# self._names.append(_key.__name__)
else:
_pointer = self._registry.get(key=_key)
if _pointer :
self._modules[_key] = _pointer
self._names.append(_key)
def isplugin(self,module,name): def isplugin(self,module,name):
""" """
@ -129,13 +132,16 @@ class PluginLoader :
# @TODO: add exception handling # @TODO: add exception handling
_data = _pointer(_data) _data = _pointer(_data)
_input['input']['shape'] = {'dropped':{'rows':_brow - _data.shape[0],'cols':_bcol-_data.shape[1]}} _input['input']['shape'] = {'rows-dropped':_brow - _data.shape[0]}
except Exception as e: except Exception as e:
_input['input']['status'] = 'FAILED' _input['input']['status'] = 'FAILED'
print (e) print (e)
time.sleep(1)
if _logger: if _logger:
_logger(_input) try:
_logger(**_input)
except Exception as e:
pass
return _data return _data
# def apply(self,_data,_name): # def apply(self,_data,_name):
# """ # """

@ -21,7 +21,7 @@ class Iceberg :
# @TODO: # @TODO:
# Make arrangements for additional configuration elements # Make arrangements for additional configuration elements
# #
self._session = SparkSession.builder.getOrCreate() self._session = SparkSession.builder.appName("data-transport").getOrCreate()
# self._session.sparkContext.setLogLevel("ERROR") # self._session.sparkContext.setLogLevel("ERROR")
self._catalog = self._session.catalog self._catalog = self._session.catalog
self._table = _args['table'] if 'table' in _args else None self._table = _args['table'] if 'table' in _args else None
@ -123,4 +123,4 @@ class Writer (Iceberg):
else: else:
# rdd.writeTo(_table).append() # rdd.writeTo(_table).append()
# # _table = f'{_prefix}.{_table}' # # _table = f'{_prefix}.{_table}'
rdd.write.format('iceberg').mode('append').save(_table) rdd.coalesce(10).write.format('iceberg').mode('append').save(_table)

Loading…
Cancel
Save