v2.2.0 #7

Merged
steve merged 2 commits from v2.2.0 into master 5 days ago

@ -56,6 +56,9 @@ def wait(jobs):
# def wait (jobs):
# while jobs :
# jobs = [pthread for pthread in jobs if pthread.is_alive()]
def launch_etl (_args):
_etlTask = IETL(**_args)
_etlTask.read()
@app_e.command(name="run")
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
@ -75,10 +78,10 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil
jobs = []
for _args in _config :
# pthread = etl.instance(**_args) #-- automatically starts the process
def bootup ():
_worker = IETL(**_args)
_worker.run()
pthread = Process(target=bootup)
# def bootup ():
# _worker = IETL(**_args)
# _worker.read()
pthread = Process(target=launch_etl,args=(_args,))
pthread.start()
jobs.append(pthread)
if len(jobs) == batch :

@ -99,6 +99,7 @@ class IETL(IReader) :
"""
def __init__(self,**_args):
super().__init__(agent=transport.get.reader(**_args['source']),plugins=None)
self._source = _args['source']
if 'target' in _args:
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
else:
@ -108,11 +109,10 @@ class IETL(IReader) :
# If the parent is already multiprocessing
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
def read(self,**_args):
_data = super().read(**_args)
_schema = super().meta()
_key = 'cmd' if 'cmd' in self._source else 'query'
_kwargs = self._source[_key] if _key in self._source else None
_data = super().read(**_kwargs) if _kwargs else super().read()
for _kwargs in self._targets :
# if _schema :
# _kwargs['schema'] = _schema
self.post(_data,**_kwargs)
return _data
@ -124,8 +124,9 @@ class IETL(IReader) :
:_args parameters associated with writer object
"""
writer = transport.get.writer(**_args)
if 'schema' in _args :
writer.write(_data,schema=_args['schema'])
else:
# if 'schema' in _args :
# writer.write(_data,schema=_args['schema'])
# else:
writer.write(_data)
writer.close()
Loading…
Cancel
Save