|
|
|
@ -59,8 +59,8 @@ class IReader(IO):
|
|
|
|
|
def __init__(self,_agent,pipeline=None):
|
|
|
|
|
super().__init__(_agent,pipeline)
|
|
|
|
|
def read(self,**_args):
|
|
|
|
|
if 'pipeline' in _args :
|
|
|
|
|
self._init_plugins(_args['pipeline'])
|
|
|
|
|
if 'plugins' in _args :
|
|
|
|
|
self._init_plugins(_args['plugins'])
|
|
|
|
|
_data = self._agent.read(**_args)
|
|
|
|
|
if self._plugins and self._plugins.ratio() > 0 :
|
|
|
|
|
_data = self._plugins.apply(_data)
|
|
|
|
@ -71,8 +71,8 @@ class IWriter(IO):
|
|
|
|
|
def __init__(self,_agent,pipeline=None):
|
|
|
|
|
super().__init__(_agent,pipeline)
|
|
|
|
|
def write(self,_data,**_args):
|
|
|
|
|
if 'pipeline' in _args :
|
|
|
|
|
self._init_plugins(_args['pipeline'])
|
|
|
|
|
if 'plugins' in _args :
|
|
|
|
|
self._init_plugins(_args['plugins'])
|
|
|
|
|
if self._plugins and self._plugins.ratio() > 0 :
|
|
|
|
|
_data = self._plugins.apply(_data)
|
|
|
|
|
|
|
|
|
@ -82,10 +82,6 @@ class IWriter(IO):
|
|
|
|
|
# The ETL object in its simplest form is an aggregation of read/write objects
|
|
|
|
|
# @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process
|
|
|
|
|
|
|
|
|
|
def _ProcessWriter (_data,_args):
|
|
|
|
|
writer = transport.get.writer(**_args)
|
|
|
|
|
writer.write(_data)
|
|
|
|
|
|
|
|
|
|
class IETL(IReader) :
|
|
|
|
|
"""
|
|
|
|
|
This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
|
|
|
|
@ -105,14 +101,6 @@ class IETL(IReader) :
|
|
|
|
|
|
|
|
|
|
for _kwargs in self._targets :
|
|
|
|
|
self.post(_data,**_kwargs)
|
|
|
|
|
# pthread = Process(target=_ProcessWriter,args=(_data,_kwargs))
|
|
|
|
|
# pthread.start()
|
|
|
|
|
# self.jobs.append(pthread)
|
|
|
|
|
|
|
|
|
|
# if not self._hasParentProcess :
|
|
|
|
|
# while self.jobs :
|
|
|
|
|
# jobs = [pthread for pthread in self.jobs if pthread.is_alive()]
|
|
|
|
|
# time.sleep(1)
|
|
|
|
|
|
|
|
|
|
return _data
|
|
|
|
|
def post (self,_data,**_args) :
|
|
|
|
|