From 870c1caed3688205b5808c5f90d70dabb24f03c4 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Mon, 10 Jun 2024 01:03:59 -0500 Subject: [PATCH] bug fix: use plugins to refer to plugins --- transport/iowrapper.py | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/transport/iowrapper.py b/transport/iowrapper.py index e3ff611..d6cba1c 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -59,8 +59,8 @@ class IReader(IO): def __init__(self,_agent,pipeline=None): super().__init__(_agent,pipeline) def read(self,**_args): - if 'pipeline' in _args : - self._init_plugins(_args['pipeline']) + if 'plugins' in _args : + self._init_plugins(_args['plugins']) _data = self._agent.read(**_args) if self._plugins and self._plugins.ratio() > 0 : _data = self._plugins.apply(_data) @@ -71,8 +71,8 @@ class IWriter(IO): def __init__(self,_agent,pipeline=None): super().__init__(_agent,pipeline) def write(self,_data,**_args): - if 'pipeline' in _args : - self._init_plugins(_args['pipeline']) + if 'plugins' in _args : + self._init_plugins(_args['plugins']) if self._plugins and self._plugins.ratio() > 0 : _data = self._plugins.apply(_data) @@ -82,10 +82,6 @@ class IWriter(IO): # The ETL object in its simplest form is an aggregation of read/write objects # @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process -def _ProcessWriter (_data,_args): - writer = transport.get.writer(**_args) - writer.write(_data) - class IETL(IReader) : """ This class performs an ETL operation by ineriting a read and adding writes as pipeline functions @@ -105,15 +101,7 @@ class IETL(IReader) : for _kwargs in self._targets : self.post(_data,**_kwargs) - # pthread = Process(target=_ProcessWriter,args=(_data,_kwargs)) - # pthread.start() - # self.jobs.append(pthread) - - # if not self._hasParentProcess : - # while self.jobs : - # jobs = [pthread for pthread in self.jobs if pthread.is_alive()] - # time.sleep(1) - + return _data def post (self,_data,**_args) : """