|
|
|
@ -99,6 +99,7 @@ class IETL(IReader) :
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
def __init__(self,**_args):
|
|
|
|
def __init__(self,**_args):
|
|
|
|
super().__init__(agent=transport.get.reader(**_args['source']),plugins=None)
|
|
|
|
super().__init__(agent=transport.get.reader(**_args['source']),plugins=None)
|
|
|
|
|
|
|
|
self._source = _args['source']
|
|
|
|
if 'target' in _args:
|
|
|
|
if 'target' in _args:
|
|
|
|
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
|
|
|
|
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
@ -108,11 +109,10 @@ class IETL(IReader) :
|
|
|
|
# If the parent is already multiprocessing
|
|
|
|
# If the parent is already multiprocessing
|
|
|
|
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
|
|
|
|
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
|
|
|
|
def read(self,**_args):
|
|
|
|
def read(self,**_args):
|
|
|
|
_data = super().read(**_args)
|
|
|
|
_key = 'cmd' if 'cmd' in self._source else 'query'
|
|
|
|
_schema = super().meta()
|
|
|
|
_kwargs = self._source[_key] if _key in self._source else None
|
|
|
|
|
|
|
|
_data = super().read(**_kwargs)
|
|
|
|
for _kwargs in self._targets :
|
|
|
|
for _kwargs in self._targets :
|
|
|
|
# if _schema :
|
|
|
|
|
|
|
|
# _kwargs['schema'] = _schema
|
|
|
|
|
|
|
|
self.post(_data,**_kwargs)
|
|
|
|
self.post(_data,**_kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
return _data
|
|
|
|
return _data
|
|
|
|
@ -124,8 +124,9 @@ class IETL(IReader) :
|
|
|
|
:_args parameters associated with writer object
|
|
|
|
:_args parameters associated with writer object
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
writer = transport.get.writer(**_args)
|
|
|
|
writer = transport.get.writer(**_args)
|
|
|
|
if 'schema' in _args :
|
|
|
|
# if 'schema' in _args :
|
|
|
|
writer.write(_data,schema=_args['schema'])
|
|
|
|
# writer.write(_data,schema=_args['schema'])
|
|
|
|
else:
|
|
|
|
# else:
|
|
|
|
|
|
|
|
print ("writing .... ",_data.shape)
|
|
|
|
writer.write(_data)
|
|
|
|
writer.write(_data)
|
|
|
|
writer.close()
|
|
|
|
writer.close()
|