Compare commits

..

No commits in common. '47e68e257697f7edc4b3921c54c1b90d8183c407' and '80005acd52fd1866d881d225d09c49c0358ef2e3' have entirely different histories.

@ -56,9 +56,6 @@ def wait(jobs):
# def wait (jobs): # def wait (jobs):
# while jobs : # while jobs :
# jobs = [pthread for pthread in jobs if pthread.is_alive()] # jobs = [pthread for pthread in jobs if pthread.is_alive()]
def launch_etl (_args):
_etlTask = IETL(**_args)
_etlTask.read()
@app_e.command(name="run") @app_e.command(name="run")
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")], def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
@ -78,10 +75,10 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil
jobs = [] jobs = []
for _args in _config : for _args in _config :
# pthread = etl.instance(**_args) #-- automatically starts the process # pthread = etl.instance(**_args) #-- automatically starts the process
# def bootup (): def bootup ():
# _worker = IETL(**_args) _worker = IETL(**_args)
# _worker.read() _worker.run()
pthread = Process(target=launch_etl,args=(_args,)) pthread = Process(target=bootup)
pthread.start() pthread.start()
jobs.append(pthread) jobs.append(pthread)
if len(jobs) == batch : if len(jobs) == batch :

@ -99,7 +99,6 @@ class IETL(IReader) :
""" """
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(agent=transport.get.reader(**_args['source']),plugins=None) super().__init__(agent=transport.get.reader(**_args['source']),plugins=None)
self._source = _args['source']
if 'target' in _args: if 'target' in _args:
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']] self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
else: else:
@ -109,10 +108,11 @@ class IETL(IReader) :
# If the parent is already multiprocessing # If the parent is already multiprocessing
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
def read(self,**_args): def read(self,**_args):
_key = 'cmd' if 'cmd' in self._source else 'query' _data = super().read(**_args)
_kwargs = self._source[_key] if _key in self._source else None _schema = super().meta()
_data = super().read(**_kwargs) if _kwargs else super().read()
for _kwargs in self._targets : for _kwargs in self._targets :
# if _schema :
# _kwargs['schema'] = _schema
self.post(_data,**_kwargs) self.post(_data,**_kwargs)
return _data return _data
@ -124,9 +124,8 @@ class IETL(IReader) :
:_args parameters associated with writer object :_args parameters associated with writer object
""" """
writer = transport.get.writer(**_args) writer = transport.get.writer(**_args)
# if 'schema' in _args : if 'schema' in _args :
# writer.write(_data,schema=_args['schema']) writer.write(_data,schema=_args['schema'])
# else: else:
writer.write(_data)
writer.write(_data)
writer.close() writer.close()
Loading…
Cancel
Save