|
|
@ -16,7 +16,8 @@ from datetime import datetime
|
|
|
|
import pandas as pd
|
|
|
|
import pandas as pd
|
|
|
|
import os
|
|
|
|
import os
|
|
|
|
import sys
|
|
|
|
import sys
|
|
|
|
|
|
|
|
import itertools
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
|
|
|
|
|
|
|
class IO:
|
|
|
|
class IO:
|
|
|
|
"""
|
|
|
|
"""
|
|
|
@ -29,9 +30,9 @@ class IO:
|
|
|
|
#
|
|
|
|
#
|
|
|
|
# registry.init()
|
|
|
|
# registry.init()
|
|
|
|
|
|
|
|
|
|
|
|
self._logger = _logger #transport.get.writer(label='logger') #if registry.has('logger') else None
|
|
|
|
self._logger = _logger if not type(_agent) in [IReader,IWriter] else _agent._logger #transport.get.writer(label='logger') #if registry.has('logger') else None
|
|
|
|
if not _logger and hasattr(_agent,'_logger') :
|
|
|
|
# if not _logger and hasattr(_agent,'_logger') :
|
|
|
|
self._logger = getattr(_agent,'_logger')
|
|
|
|
# self._logger = getattr(_agent,'_logger')
|
|
|
|
self._agent = _agent
|
|
|
|
self._agent = _agent
|
|
|
|
_date = _date = str(datetime.now())
|
|
|
|
_date = _date = str(datetime.now())
|
|
|
|
self._logTable = 'logs' #'_'.join(['logs',_date[:10]+_date[11:19]]).replace(':','').replace('-','_')
|
|
|
|
self._logTable = 'logs' #'_'.join(['logs',_date[:10]+_date[11:19]]).replace(':','').replace('-','_')
|
|
|
@ -47,8 +48,11 @@ class IO:
|
|
|
|
_date = str(datetime.now())
|
|
|
|
_date = str(datetime.now())
|
|
|
|
_data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args)
|
|
|
|
_data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args)
|
|
|
|
for key in _data :
|
|
|
|
for key in _data :
|
|
|
|
_data[key] = str(_data[key])
|
|
|
|
_data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key])
|
|
|
|
self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
|
|
|
|
self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
print ([' ********** '])
|
|
|
|
|
|
|
|
print (_args)
|
|
|
|
def _init_plugins(self,_items):
|
|
|
|
def _init_plugins(self,_items):
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
This function will load pipelined functions as a plugin loader
|
|
|
|
This function will load pipelined functions as a plugin loader
|
|
|
@ -117,12 +121,19 @@ class IReader(IO):
|
|
|
|
if self._plugins :
|
|
|
|
if self._plugins :
|
|
|
|
return self._stream(_data)
|
|
|
|
return self._stream(_data)
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
self.log(action='streaming',object=_objectName, input= {'memory_size':sys.getsizeof(_data)})
|
|
|
|
_count = 0
|
|
|
|
return _data
|
|
|
|
for _segment in _data :
|
|
|
|
|
|
|
|
_count += 1
|
|
|
|
|
|
|
|
yield _segment
|
|
|
|
|
|
|
|
self.log(action='streaming',object=_objectName, input= {'segments':_count})
|
|
|
|
|
|
|
|
# return _data
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.log(action='read',object=_objectName, input=_data.shape)
|
|
|
|
self.log(action='read',object=_objectName, input=_data.shape)
|
|
|
|
if self._plugins :
|
|
|
|
if self._plugins :
|
|
|
|
_data = self._plugins.apply(_data)
|
|
|
|
_logs = []
|
|
|
|
|
|
|
|
_data = self._plugins.apply(_data,self.log)
|
|
|
|
return _data
|
|
|
|
return _data
|
|
|
|
|
|
|
|
|
|
|
|
# if self._plugins and self._plugins.ratio() > 0 :
|
|
|
|
# if self._plugins and self._plugins.ratio() > 0 :
|
|
|
@ -144,7 +155,10 @@ class IWriter(IO):
|
|
|
|
if 'plugins' in _args :
|
|
|
|
if 'plugins' in _args :
|
|
|
|
self._init_plugins(_args['plugins'])
|
|
|
|
self._init_plugins(_args['plugins'])
|
|
|
|
if self._plugins and self._plugins.ratio() > 0 :
|
|
|
|
if self._plugins and self._plugins.ratio() > 0 :
|
|
|
|
_data = self._plugins.apply(_data,self._logger)
|
|
|
|
_logs = []
|
|
|
|
|
|
|
|
_data = self._plugins.apply(_data,_logs,self.log)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# [self.log(**_item) for _item in _logs]
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
# IWriter.lock.acquire()
|
|
|
|
# IWriter.lock.acquire()
|
|
|
|
self._agent.write(_data,**_args)
|
|
|
|
self._agent.write(_data,**_args)
|
|
|
@ -161,7 +175,13 @@ class IETL(IReader) :
|
|
|
|
This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
|
|
|
|
This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
def __init__(self,**_args):
|
|
|
|
def __init__(self,**_args):
|
|
|
|
super().__init__(transport.get.reader(**_args['source']))
|
|
|
|
_source = _args['source']
|
|
|
|
|
|
|
|
_plugins = _source['plugins'] if 'plugins' in _source else None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# super().__init__(transport.get.reader(**_args['source']))
|
|
|
|
|
|
|
|
super().__init__(transport.get.reader(**_source),_plugins)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# _logger =
|
|
|
|
if 'target' in _args:
|
|
|
|
if 'target' in _args:
|
|
|
|
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
|
|
|
|
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|