@ -5,40 +5,103 @@ NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading o
- upon initialization we will load plugins
- on read / write we apply a pipeline ( if passed as an argument )
"""
from transport . plugins import plugin, PluginLoader
from transport . plugins import PluginLoader
import transport
from transport import providers
from multiprocessing import Process
from multiprocessing import Process , RLock
import time
import types
from . import registry
from datetime import datetime
import pandas as pd
import numpy as np
import os
import sys
import itertools
import json
import plugin_ix
class IO :
class BaseIO :
def __init__ ( self , * * _args ) :
self . _logger = _args [ ' logger ' ] if ' logger ' in _args else None
self . _logTable = ' logs ' if ' logTable ' not in _args else _args [ ' logTable ' ]
def setLogger ( self , _logger ) :
self . _logger = _logger
def log ( self , * * _args ) :
if self . _logger :
_date = str ( datetime . now ( ) )
_data = dict ( { ' pid ' : os . getpid ( ) , ' date ' : _date [ : 10 ] , ' time ' : _date [ 11 : 19 ] } , * * _args )
for key in _data :
if type ( _data [ key ] ) == list :
_data [ key ] = [ _item . __name__ if type ( _item ) . __name__ == ' function ' else _item for _item in _data [ key ] ]
_data [ key ] = str ( _data [ key ] ) if type ( _data [ key ] ) not in [ list , dict ] else json . dumps ( _data [ key ] )
self . _logger . write ( pd . DataFrame ( [ _data ] ) ) #,table=self._logTable)
class IO ( BaseIO ) :
"""
Base wrapper class for read / write and support for logs
"""
def __init__ ( self , _agent , plugins ) :
def __init__ ( self , * * _args ) :
#
# We need to initialize the logger here ...
#
super ( ) . __init__ ( * * _args )
_agent = _args [ ' agent ' ]
plugins = _args [ ' plugins ' ] if ' plugins ' else None
# _logger = _args['logger'] if 'logger' in _args else None
# self._logger = _logger if not type(_agent) in [IReader,IWriter] else _agent._logger #transport.get.writer(label='logger') #if registry.has('logger') else None
# if not _logger and hasattr(_agent,'_logger') :
# self._logger = getattr(_agent,'_logger')
self . _agent = _agent
_date = _date = str ( datetime . now ( ) )
self . _ixloader = plugin_ix . Loader ( registry = plugin_ix . Registry ( folder = transport . registry . REGISTRY_PATH ) )
# self._logTable = 'logs' #'_'.join(['logs',_date[:10]+_date[11:19]]).replace(':','').replace('-','_')
if plugins :
self . _init_plugins ( plugins )
else :
self . _plugins = None
self . init_plugins ( plugins )
# def setLogger(self,_logger):
# self._logger = _logger
# def log (self,**_args):
# if self._logger :
# _date = str(datetime.now())
# _data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args)
# for key in _data :
# if type(_data[key]) == list :
# _data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]]
# _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key])
# self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
# def _init_plugins(self,_items):
# """
# This function will load pipelined functions as a plugin loader
# """
# registry.plugins.init()
# self._plugins = PluginLoader(registry=registry.plugins)
# [self._plugins.set(_name) for _name in _items]
def _init_plugins ( self , _args ) :
"""
This function will load pipelined functions as a plugin loader
"""
if ' path ' in _args and ' names ' in _args :
self . _plugins = PluginLoader ( * * _args )
else :
self . _plugins = PluginLoader ( )
[ self . _plugins . set ( _pointer ) for _pointer in _args ]
#
# @TODO: We should have a way to log what plugins are loaded and ready to use
# self.log(action='init-plugins',object=self.getClassName(self),input =[_name for _name in _items])
# # if 'path' in _args and 'names' in _args :
# # self._plugins = PluginLoader(**_args)
# # else:
# # self._plugins = PluginLoader(registry=registry.plugins)
# # [self._plugins.set(_pointer) for _pointer in _args]
# #
# # @TODO: We should have a way to log what plugins are loaded and ready to use
def meta ( self , * * _args ) :
if hasattr ( self . _agent , ' meta ' ) :
return self . _agent . meta ( * * _args )
return [ ]
def getClassName ( self , _object ) :
return ' . ' . join ( [ _object . __class__ . __module__ , _object . __class__ . __name__ ] )
def close ( self ) :
if hasattr ( self . _agent , ' close ' ) :
self . _agent . close ( )
@ -48,6 +111,7 @@ class IO:
"""
for _pointer in self . _plugins :
_data = _pointer ( _data )
time . sleep ( 1 )
def apply ( self , _query ) :
if hasattr ( self . _agent , ' apply ' ) :
return self . _agent . apply ( _query )
@ -59,62 +123,183 @@ class IO:
pointer = getattr ( self . _agent , _name )
return pointer ( _query )
return None
def init_plugins ( self , plugins ) :
for _ref in plugins :
self . _ixloader . set ( _ref )
class IReader ( IO ) :
"""
This is a wrapper for read functionalities
"""
def __init__ ( self , _agent , pipeline = None ) :
super ( ) . __init__ ( _agent , pipeline )
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( * * _args )
self . _args = _args [ ' args ' ] if ' args ' in _args else None
def _stream ( self , _data ) :
# self.log(action='streaming',object=self._agent._engine.name, input= type(_data).__name__)
_shape = [ ]
for _segment in _data :
_shape + = list ( _segment . shape )
if self . _plugins :
# yield self._plugins.apply(_segment,self.log)
yield self . _ixloader . visitor ( _data , self . log )
else :
yield _segment
_objectName = ' . ' . join ( [ self . _agent . __class__ . __module__ , self . _agent . __class__ . __name__ ] )
_input = { ' shape ' : _shape }
if hasattr ( self . _agent , ' _table ' ) :
_input [ ' table ' ] = self . _agent . _table
self . log ( action = ' streaming ' , object = _objectName , input = _input )
def read ( self , * * _args ) :
if ' plugins ' in _args :
self . _init_plugins ( _args [ ' plugins ' ] )
_data = self . _agent . read ( * * _args )
if self . _plugins and self . _plugins . ratio ( ) > 0 :
_data = self . _plugins . apply ( _data )
#
# output data
return _data
self . init_plugins ( _args [ ' plugins ' ] )
if self . _args :
_data = self . _agent . read ( * * self . _args )
else :
_data = self . _agent . read ( * * _args )
_objectName = ' . ' . join ( [ self . _agent . __class__ . __module__ , self . _agent . __class__ . __name__ ] )
if types . GeneratorType == type ( _data ) :
return self . _stream ( _data )
# if self._plugins :
# return self._stream(_data)
# else:
# _count = 0
# for _segment in _data :
# _count += 1
# yield _segment
# self.log(action='streaming',object=_objectName, input= {'segments':_count})
# return _data
elif type ( _data ) == pd . DataFrame :
_shape = _data . shape #[0,0] if not _data.shape[] else list(_data.shape)
_input = { ' shape ' : _shape }
if hasattr ( self . _agent , ' _table ' ) :
_input [ ' table ' ] = self . _agent . _table
self . log ( action = ' read ' , object = _objectName , input = _input )
_data = self . _ixloader . visitor ( _data )
return _data
class IWriter ( IO ) :
def __init__ ( self , _agent , pipeline = None ) :
super ( ) . __init__ ( _agent , pipeline )
lock = RLock ( )
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( * * _args )
def write ( self , _data , * * _args ) :
if ' plugins ' in _args :
self . _init_plugins ( _args [ ' plugins ' ] )
if self . _plugins and self . _plugins . ratio ( ) > 0 :
_data = self . _plugins . apply ( _data )
self . _agent . write ( _data , * * _args )
# if self._plugins and self._plugins.ratio() > 0 :
# _logs = []
# _data = self._plugins.apply(_data,_logs,self.log)
# [self.log(**_item) for _item in _logs]
try :
# IWriter.lock.acquire()
_data = self . _ixloader . visitor ( _data )
self . _agent . write ( _data , * * _args )
finally :
# IWriter.lock.release()
pass
#
# The ETL object in its simplest form is an aggregation of read/write objects
# @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process
class IETL ( IReader ) :
class IETL ( BaseIO ) :
"""
This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
"""
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( transport . get . reader ( * * _args [ ' source ' ] ) )
if ' target ' in _args :
self . _targets = _args [ ' target ' ] if type ( _args [ ' target ' ] ) == list else [ _args [ ' target ' ] ]
else :
self . _targets = [ ]
self . jobs = [ ]
# _source = _args['source']
# _plugins = _source['plugins'] if 'plugins' in _source else None
# # super().__init__(transport.get.reader(**_args['source']))
# super().__init__(agent=transport.get.reader(**_source),plugins=_plugins)
# # _logger =
# if 'target' in _args:
# self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
# else:
# self._targets = []
# self.jobs = []
# #
# # If the parent is already multiprocessing
# if 'token' in _source :
# self._logToken = _source['token']
# self._sourceArgs = _source['args'] if 'args' in _source else None
# self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
super ( ) . __init__ ( )
self . _source = _args [ ' source ' ]
self . _targets = _args [ ' target ' ] if _args [ ' target ' ] == list else [ _args [ ' target ' ] ]
#
# If the parent is already multiprocessing
self . _hasParentProcess = False if ' hasParentProcess ' not in _args else _args [ ' hasParentProcess ' ]
def read ( self , * * _args ) :
_data = super ( ) . read ( * * _args )
for _kwargs in self . _targets :
self . post ( _data , * * _kwargs )
# ETL Initialization, we should provide some measure of context ...
#
# def run(self) :
# """
# We should apply the etl here, if we are in multiprocessing mode
# """
# return self.read()
def run ( self , * * _args ) :
# _data = super().read(**_args) if not self._sourceArgs else super().read(**self._sourceArgs)
# self._targets = [transport.get.writer(**_kwargs) for _kwargs in self._targets]
_reader = transport . get . reader ( * * self . _source )
if hasattr ( _reader , ' _logger ' ) :
self . setLogger ( _reader . _logger )
self . log ( action = ' init-etl ' , input = { ' source ' : self . _source , ' target ' : self . _targets } )
_data = _reader . read ( * * self . _source [ ' args ' ] ) if ' args ' in self . _source else _reader . read ( )
_reader . close ( )
_writers = [ transport . get . writer ( * * _kwargs ) for _kwargs in self . _targets ]
_schema = [ ] if not getattr ( _reader . _agent , ' _table ' ) else _reader . meta ( )
if types . GeneratorType == type ( _data ) :
_index = 0
for _segment in _data :
_index + = 1
for _writer in _writers :
self . post ( _segment , writer = _writer , index = _index , schema = _schema )
time . sleep ( 1 )
else :
for _writer in _writers :
self . post ( _data , writer = _writer , schema = _schema )
# pass
return _data
# return _data
def post ( self , _data , * * _args ) :
"""
This function returns an instance of a process that will perform the write operation
: _args parameters associated with writer object
"""
writer = transport . get . writer ( * * _args )
writer . write ( _data )
writer . close ( )
#writer = transport.get.writer(**_args)
_input = { }
try :
_action = ' post '
_shape = dict ( zip ( [ ' rows ' , ' columns ' ] , _data . shape ) )
_index = _args [ ' index ' ] if ' index ' in _args else 0
writer = _args [ ' writer ' ]
_schema = _args [ ' schema ' ]
#
# -- things to log
_input = { ' shape ' : _shape , ' segment ' : _index }
if hasattr ( writer . _agent , ' _table ' ) :
_input [ ' table ' ] = writer . _agent . _table
for _item in _schema :
if _item [ ' type ' ] in [ ' INTEGER ' , ' BIGINT ' , ' INT ' ] :
_column = _item [ ' name ' ]
_data [ _column ] = _data [ _column ] . copy ( ) . fillna ( 0 ) . astype ( np . int64 )
writer . write ( _data , schema = _schema )
except Exception as e :
_action = ' post-error '
_input [ ' error ' ] = str ( e )
print ( [ e ] )
pass
self . log ( action = _action , object = writer . _agent . __module__ , input = _input )