From 3b9203c0a0a2aee60dd8ed1baec5a151cc599146 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Wed, 17 Dec 2025 12:50:14 -0600 Subject: [PATCH 1/2] etl bug fix --- bin/transport | 11 +++++++---- transport/iowrapper.py | 17 +++++++++-------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/bin/transport b/bin/transport index f6c511e..33c2a0d 100755 --- a/bin/transport +++ b/bin/transport @@ -56,6 +56,9 @@ def wait(jobs): # def wait (jobs): # while jobs : # jobs = [pthread for pthread in jobs if pthread.is_alive()] +def launch_etl (_args): + _etlTask = IETL(**_args) + _etlTask.read() @app_e.command(name="run") def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")], @@ -75,10 +78,10 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil jobs = [] for _args in _config : # pthread = etl.instance(**_args) #-- automatically starts the process - def bootup (): - _worker = IETL(**_args) - _worker.run() - pthread = Process(target=bootup) + # def bootup (): + # _worker = IETL(**_args) + # _worker.read() + pthread = Process(target=launch_etl,args=(_args,)) pthread.start() jobs.append(pthread) if len(jobs) == batch : diff --git a/transport/iowrapper.py b/transport/iowrapper.py index 105d024..5261eac 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -99,6 +99,7 @@ class IETL(IReader) : """ def __init__(self,**_args): super().__init__(agent=transport.get.reader(**_args['source']),plugins=None) + self._source = _args['source'] if 'target' in _args: self._targets = _args['target'] if type(_args['target']) == list else [_args['target']] else: @@ -108,11 +109,10 @@ class IETL(IReader) : # If the parent is already multiprocessing self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] def read(self,**_args): - _data = super().read(**_args) - _schema = super().meta() + _key = 'cmd' if 'cmd' in self._source else 'query' + _kwargs = self._source[_key] if _key in self._source else None + _data = super().read(**_kwargs) for _kwargs in self._targets : - # if _schema : - # _kwargs['schema'] = _schema self.post(_data,**_kwargs) return _data @@ -124,8 +124,9 @@ class IETL(IReader) : :_args parameters associated with writer object """ writer = transport.get.writer(**_args) - if 'schema' in _args : - writer.write(_data,schema=_args['schema']) - else: - writer.write(_data) + # if 'schema' in _args : + # writer.write(_data,schema=_args['schema']) + # else: + print ("writing .... ",_data.shape) + writer.write(_data) writer.close() \ No newline at end of file From f035f70fa09d64d6ed633042820d6a125c11b04b Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Wed, 17 Dec 2025 13:02:25 -0600 Subject: [PATCH 2/2] bug fix --- transport/iowrapper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transport/iowrapper.py b/transport/iowrapper.py index 5261eac..7c7c173 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -111,7 +111,7 @@ class IETL(IReader) : def read(self,**_args): _key = 'cmd' if 'cmd' in self._source else 'query' _kwargs = self._source[_key] if _key in self._source else None - _data = super().read(**_kwargs) + _data = super().read(**_kwargs) if _kwargs else super().read() for _kwargs in self._targets : self.post(_data,**_kwargs) @@ -127,6 +127,6 @@ class IETL(IReader) : # if 'schema' in _args : # writer.write(_data,schema=_args['schema']) # else: - print ("writing .... ",_data.shape) + writer.write(_data) writer.close() \ No newline at end of file