Compare commits

..

No commits in common. '60bf2e8872ed78f05e1829c0dcab21e5d3ae39d8' and 'f84a78e0bce7ddb4ecaa3aa892c99e2520ad55b2' have entirely different histories.

@ -56,9 +56,6 @@ def wait(jobs):
# def wait (jobs):
# while jobs :
# jobs = [pthread for pthread in jobs if pthread.is_alive()]
def launch_etl (_args):
_etlTask = IETL(**_args)
_etlTask.read()
@app_e.command(name="run")
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
@ -78,10 +75,10 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil
jobs = []
for _args in _config :
# pthread = etl.instance(**_args) #-- automatically starts the process
# def bootup ():
# _worker = IETL(**_args)
# _worker.read()
pthread = Process(target=launch_etl,args=(_args,))
def bootup ():
_worker = IETL(**_args)
_worker.run()
pthread = Process(target=bootup)
pthread.start()
jobs.append(pthread)
if len(jobs) == batch :

@ -99,7 +99,6 @@ class IETL(IReader) :
"""
def __init__(self,**_args):
super().__init__(agent=transport.get.reader(**_args['source']),plugins=None)
self._source = _args['source']
if 'target' in _args:
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
else:
@ -109,10 +108,11 @@ class IETL(IReader) :
# If the parent is already multiprocessing
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
def read(self,**_args):
_key = 'cmd' if 'cmd' in self._source else 'query'
_kwargs = self._source[_key] if _key in self._source else None
_data = super().read(**_kwargs) if _kwargs else super().read()
_data = super().read(**_args)
_schema = super().meta()
for _kwargs in self._targets :
# if _schema :
# _kwargs['schema'] = _schema
self.post(_data,**_kwargs)
return _data
@ -124,9 +124,8 @@ class IETL(IReader) :
:_args parameters associated with writer object
"""
writer = transport.get.writer(**_args)
# if 'schema' in _args :
# writer.write(_data,schema=_args['schema'])
# else:
if 'schema' in _args :
writer.write(_data,schema=_args['schema'])
else:
writer.write(_data)
writer.close()
Loading…
Cancel
Save