Merge pull request 'v2.2.0' (#6) from v2.2.0 into main

Reviewed-on: #6
main
Steve L. Nyemba 5 days ago
commit 60bf2e8872

@ -56,6 +56,9 @@ def wait(jobs):
# def wait (jobs): # def wait (jobs):
# while jobs : # while jobs :
# jobs = [pthread for pthread in jobs if pthread.is_alive()] # jobs = [pthread for pthread in jobs if pthread.is_alive()]
def launch_etl (_args):
_etlTask = IETL(**_args)
_etlTask.read()
@app_e.command(name="run") @app_e.command(name="run")
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")], def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
@ -75,10 +78,10 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil
jobs = [] jobs = []
for _args in _config : for _args in _config :
# pthread = etl.instance(**_args) #-- automatically starts the process # pthread = etl.instance(**_args) #-- automatically starts the process
def bootup (): # def bootup ():
_worker = IETL(**_args) # _worker = IETL(**_args)
_worker.run() # _worker.read()
pthread = Process(target=bootup) pthread = Process(target=launch_etl,args=(_args,))
pthread.start() pthread.start()
jobs.append(pthread) jobs.append(pthread)
if len(jobs) == batch : if len(jobs) == batch :

@ -99,6 +99,7 @@ class IETL(IReader) :
""" """
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(agent=transport.get.reader(**_args['source']),plugins=None) super().__init__(agent=transport.get.reader(**_args['source']),plugins=None)
self._source = _args['source']
if 'target' in _args: if 'target' in _args:
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']] self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
else: else:
@ -108,11 +109,10 @@ class IETL(IReader) :
# If the parent is already multiprocessing # If the parent is already multiprocessing
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
def read(self,**_args): def read(self,**_args):
_data = super().read(**_args) _key = 'cmd' if 'cmd' in self._source else 'query'
_schema = super().meta() _kwargs = self._source[_key] if _key in self._source else None
_data = super().read(**_kwargs) if _kwargs else super().read()
for _kwargs in self._targets : for _kwargs in self._targets :
# if _schema :
# _kwargs['schema'] = _schema
self.post(_data,**_kwargs) self.post(_data,**_kwargs)
return _data return _data
@ -124,8 +124,9 @@ class IETL(IReader) :
:_args parameters associated with writer object :_args parameters associated with writer object
""" """
writer = transport.get.writer(**_args) writer = transport.get.writer(**_args)
if 'schema' in _args : # if 'schema' in _args :
writer.write(_data,schema=_args['schema']) # writer.write(_data,schema=_args['schema'])
else: # else:
writer.write(_data)
writer.write(_data)
writer.close() writer.close()
Loading…
Cancel
Save