|
|
|
@ -31,7 +31,7 @@ class IO:
|
|
|
|
|
# registry.init()
|
|
|
|
|
_agent = _args['agent']
|
|
|
|
|
plugins = _args['plugins']
|
|
|
|
|
_logger = _args['logger']
|
|
|
|
|
_logger = _args['logger'] if 'logger' in _args else None
|
|
|
|
|
self._logger = _logger if not type(_agent) in [IReader,IWriter] else _agent._logger #transport.get.writer(label='logger') #if registry.has('logger') else None
|
|
|
|
|
# if not _logger and hasattr(_agent,'_logger') :
|
|
|
|
|
# self._logger = getattr(_agent,'_logger')
|
|
|
|
@ -103,7 +103,7 @@ class IReader(IO):
|
|
|
|
|
This is a wrapper for read functionalities
|
|
|
|
|
"""
|
|
|
|
|
def __init__(self,**_args):
|
|
|
|
|
super().__init__(_args['agent'],_args['plugins'],_args['logger'])
|
|
|
|
|
super().__init__(**_args)
|
|
|
|
|
self._args = _args['args']if 'args' in _args else None
|
|
|
|
|
def _stream (self,_data ):
|
|
|
|
|
# self.log(action='streaming',object=self._agent._engine.name, input= type(_data).__name__)
|
|
|
|
@ -151,7 +151,7 @@ class IReader(IO):
|
|
|
|
|
class IWriter(IO):
|
|
|
|
|
lock = RLock()
|
|
|
|
|
def __init__(self,**_args):
|
|
|
|
|
super().__init__(_args['agent'],_args['plugins'],_args['logger'])
|
|
|
|
|
super().__init__(**_args)
|
|
|
|
|
def write(self,_data,**_args):
|
|
|
|
|
if 'plugins' in _args :
|
|
|
|
|
self._init_plugins(_args['plugins'])
|
|
|
|
@ -180,7 +180,7 @@ class IETL(IReader) :
|
|
|
|
|
_plugins = _source['plugins'] if 'plugins' in _source else None
|
|
|
|
|
|
|
|
|
|
# super().__init__(transport.get.reader(**_args['source']))
|
|
|
|
|
super().__init__(transport.get.reader(**_source),_plugins)
|
|
|
|
|
super().__init__(agent=transport.get.reader(**_source),plugins=_plugins)
|
|
|
|
|
# _logger =
|
|
|
|
|
if 'target' in _args:
|
|
|
|
|
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
|
|
|
|
|