diff --git a/bin/transport b/bin/transport index f6c511e..33c2a0d 100755 --- a/bin/transport +++ b/bin/transport @@ -56,6 +56,9 @@ def wait(jobs): # def wait (jobs): # while jobs : # jobs = [pthread for pthread in jobs if pthread.is_alive()] +def launch_etl (_args): + _etlTask = IETL(**_args) + _etlTask.read() @app_e.command(name="run") def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")], @@ -75,10 +78,10 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil jobs = [] for _args in _config : # pthread = etl.instance(**_args) #-- automatically starts the process - def bootup (): - _worker = IETL(**_args) - _worker.run() - pthread = Process(target=bootup) + # def bootup (): + # _worker = IETL(**_args) + # _worker.read() + pthread = Process(target=launch_etl,args=(_args,)) pthread.start() jobs.append(pthread) if len(jobs) == batch : diff --git a/transport/iowrapper.py b/transport/iowrapper.py index 105d024..7c7c173 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -99,6 +99,7 @@ class IETL(IReader) : """ def __init__(self,**_args): super().__init__(agent=transport.get.reader(**_args['source']),plugins=None) + self._source = _args['source'] if 'target' in _args: self._targets = _args['target'] if type(_args['target']) == list else [_args['target']] else: @@ -108,11 +109,10 @@ class IETL(IReader) : # If the parent is already multiprocessing self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] def read(self,**_args): - _data = super().read(**_args) - _schema = super().meta() + _key = 'cmd' if 'cmd' in self._source else 'query' + _kwargs = self._source[_key] if _key in self._source else None + _data = super().read(**_kwargs) if _kwargs else super().read() for _kwargs in self._targets : - # if _schema : - # _kwargs['schema'] = _schema self.post(_data,**_kwargs) return _data @@ -124,8 +124,9 @@ class IETL(IReader) : :_args parameters associated with writer object """ writer = transport.get.writer(**_args) - if 'schema' in _args : - writer.write(_data,schema=_args['schema']) - else: - writer.write(_data) + # if 'schema' in _args : + # writer.write(_data,schema=_args['schema']) + # else: + + writer.write(_data) writer.close() \ No newline at end of file