You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
data-transport/transport/iowrapper.py

296 lines
12 KiB
Python

"""
This class is a wrapper around read/write classes of cloud,sql,nosql,other packages
The wrapper allows for application of plugins as pre-post conditions.
NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading or writing:
- upon initialization we will load plugins
- on read/write we apply a pipeline (if passed as an argument)
"""
from transport.plugins import PluginLoader
import transport
from transport import providers
from multiprocessing import Process, RLock
import time
import types
from . import registry
from datetime import datetime
import pandas as pd
import numpy as np
import os
import sys
import itertools
import json
class BaseIO :
def __init__(self,**_args):
self._logger = _args['logger'] if 'logger' in _args else None
self._logTable = 'logs' if 'logTable' not in _args else _args['logTable']
def setLogger(self,_logger):
self._logger = _logger
def log (self,**_args):
if self._logger :
_date = str(datetime.now())
_data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args)
for key in _data :
if type(_data[key]) == list :
_data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]]
_data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key])
self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
class IO(BaseIO):
"""
Base wrapper class for read/write and support for logs
"""
def __init__(self,**_args):
#
# We need to initialize the logger here ...
#
super().__init__(**_args)
_agent = _args['agent']
plugins = _args['plugins']
# _logger = _args['logger'] if 'logger' in _args else None
# self._logger = _logger if not type(_agent) in [IReader,IWriter] else _agent._logger #transport.get.writer(label='logger') #if registry.has('logger') else None
# if not _logger and hasattr(_agent,'_logger') :
# self._logger = getattr(_agent,'_logger')
self._agent = _agent
_date = _date = str(datetime.now())
# self._logTable = 'logs' #'_'.join(['logs',_date[:10]+_date[11:19]]).replace(':','').replace('-','_')
if plugins :
self._init_plugins(plugins)
else:
self._plugins = None
# def setLogger(self,_logger):
# self._logger = _logger
# def log (self,**_args):
# if self._logger :
# _date = str(datetime.now())
# _data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args)
# for key in _data :
# if type(_data[key]) == list :
# _data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]]
# _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key])
# self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
def _init_plugins(self,_items):
"""
This function will load pipelined functions as a plugin loader
"""
registry.plugins.init()
self._plugins = PluginLoader(registry=registry.plugins)
[self._plugins.set(_name) for _name in _items]
self.log(action='init-plugins',object=self.getClassName(self),input =[_name for _name in _items])
# if 'path' in _args and 'names' in _args :
# self._plugins = PluginLoader(**_args)
# else:
# self._plugins = PluginLoader(registry=registry.plugins)
# [self._plugins.set(_pointer) for _pointer in _args]
#
# @TODO: We should have a way to log what plugins are loaded and ready to use
def meta (self,**_args):
if hasattr(self._agent,'meta') :
return self._agent.meta(**_args)
return []
def getClassName (self,_object):
return '.'.join([_object.__class__.__module__,_object.__class__.__name__])
def close(self):
if hasattr(self._agent,'close') :
self._agent.close()
def apply(self):
"""
applying pre/post conditions given a pipeline expression
"""
for _pointer in self._plugins :
_data = _pointer(_data)
time.sleep(1)
def apply(self,_query):
if hasattr(self._agent,'apply') :
return self._agent.apply(_query)
return None
def submit(self,_query):
return self.delegate('submit',_query)
def delegate(self,_name,_query):
if hasattr(self._agent,_name) :
pointer = getattr(self._agent,_name)
return pointer(_query)
return None
class IReader(IO):
"""
This is a wrapper for read functionalities
"""
def __init__(self,**_args):
super().__init__(**_args)
self._args = _args['args']if 'args' in _args else None
def _stream (self,_data ):
# self.log(action='streaming',object=self._agent._engine.name, input= type(_data).__name__)
_shape = []
for _segment in _data :
_shape += list(_segment.shape)
if self._plugins :
yield self._plugins.apply(_segment,self.log)
else:
yield _segment
_objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__])
_input = {'shape':_shape}
if hasattr(self._agent,'_table') :
_input['table'] = self._agent._table
self.log(action='streaming',object=_objectName, input= _input)
def read(self,**_args):
if 'plugins' in _args :
self._init_plugins(_args['plugins'])
if self._args :
_data = self._agent.read(**self._args)
else:
_data = self._agent.read(**_args)
_objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__])
if types.GeneratorType == type(_data):
return self._stream(_data)
# if self._plugins :
# return self._stream(_data)
# else:
# _count = 0
# for _segment in _data :
# _count += 1
# yield _segment
# self.log(action='streaming',object=_objectName, input= {'segments':_count})
# return _data
elif type(_data) == pd.DataFrame :
_shape = _data.shape #[0,0] if not _data.shape[] else list(_data.shape)
_input = {'shape':_shape}
if hasattr(self._agent,'_table') :
_input['table'] = self._agent._table
self.log(action='read',object=_objectName, input=_input)
if self._plugins :
_logs = []
_data = self._plugins.apply(_data,self.log)
return _data
class IWriter(IO):
lock = RLock()
def __init__(self,**_args):
super().__init__(**_args)
def write(self,_data,**_args):
if 'plugins' in _args :
self._init_plugins(_args['plugins'])
if self._plugins and self._plugins.ratio() > 0 :
_logs = []
_data = self._plugins.apply(_data,_logs,self.log)
# [self.log(**_item) for _item in _logs]
try:
# IWriter.lock.acquire()
self._agent.write(_data,**_args)
finally:
# IWriter.lock.release()
pass
#
# The ETL object in its simplest form is an aggregation of read/write objects
# @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process
class IETL(BaseIO) :
"""
This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
"""
def __init__(self,**_args):
# _source = _args['source']
# _plugins = _source['plugins'] if 'plugins' in _source else None
# # super().__init__(transport.get.reader(**_args['source']))
# super().__init__(agent=transport.get.reader(**_source),plugins=_plugins)
# # _logger =
# if 'target' in _args:
# self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
# else:
# self._targets = []
# self.jobs = []
# #
# # If the parent is already multiprocessing
# if 'token' in _source :
# self._logToken = _source['token']
# self._sourceArgs = _source['args'] if 'args' in _source else None
# self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
super().__init__()
self._source = _args['source']
self._targets= _args['target'] if _args['target'] == list else [_args['target']]
#
# ETL Initialization, we should provide some measure of context ...
#
# def run(self) :
# """
# We should apply the etl here, if we are in multiprocessing mode
# """
# return self.read()
def run(self,**_args):
# _data = super().read(**_args) if not self._sourceArgs else super().read(**self._sourceArgs)
# self._targets = [transport.get.writer(**_kwargs) for _kwargs in self._targets]
_reader = transport.get.reader(**self._source)
if hasattr(_reader,'_logger') :
self.setLogger(_reader._logger)
self.log(action='init-etl',input={'source':self._source,'target':self._targets})
_data = _reader.read(**self._source['args'])if 'args' in self._source else _reader.read()
_reader.close()
_writers = [transport.get.writer(**_kwargs) for _kwargs in self._targets]
_schema = [] if not getattr(_reader._agent,'_table') else _reader.meta()
if types.GeneratorType == type(_data):
_index = 0
for _segment in _data :
_index += 1
for _writer in _writers :
self.post(_segment,writer=_writer,index=_index,schema=_schema)
time.sleep(1)
else:
for _writer in _writers :
self.post(_data,writer=_writer,schema=_schema)
# pass
return _data
# return _data
def post (self,_data,**_args) :
"""
This function returns an instance of a process that will perform the write operation
:_args parameters associated with writer object
"""
#writer = transport.get.writer(**_args)
_input = {}
try:
_action = 'post'
_shape = dict(zip(['rows','columns'],_data.shape))
_index = _args['index'] if 'index' in _args else 0
writer = _args['writer']
_schema= _args['schema']
for _item in _schema :
if _item['type'] == 'INTEGER' :
_data.loc[:,_item['name']] = _data[_item['name']].copy().astype(np.int64)
writer.write(_data,schema=_schema)
#
# -- things to log
_input = {'shape':_shape,'segment':_index}
if hasattr(writer._agent,'_table'):
_input['table'] = writer._agent._table
except Exception as e:
_action = 'post-error'
print ([e])
pass
self.log(action=_action,object=writer._agent.__module__, input= _input)