""" This class is a wrapper around read/write classes of cloud,sql,nosql,other packages The wrapper allows for application of plugins as pre-post conditions. NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading or writing: - upon initialization we will load plugins - on read/write we apply a pipeline (if passed as an argument) """ from transport.plugins import PluginLoader import transport from transport import providers from multiprocessing import Process import time import types from . import registry class IO: """ Base wrapper class for read/write and support for logs """ def __init__(self,_agent,plugins): self._agent = _agent if plugins : self._init_plugins(plugins) else: self._plugins = None def _init_plugins(self,_items): """ This function will load pipelined functions as a plugin loader """ registry.plugins.init() self._plugins = PluginLoader(registry=registry.plugins) [self._plugins.set(_name) for _name in _items] # if 'path' in _args and 'names' in _args : # self._plugins = PluginLoader(**_args) # else: # self._plugins = PluginLoader(registry=registry.plugins) # [self._plugins.set(_pointer) for _pointer in _args] # # @TODO: We should have a way to log what plugins are loaded and ready to use def meta (self,**_args): if hasattr(self._agent,'meta') : return self._agent.meta(**_args) return [] def close(self): if hasattr(self._agent,'close') : self._agent.close() def apply(self): """ applying pre/post conditions given a pipeline expression """ for _pointer in self._plugins : _data = _pointer(_data) def apply(self,_query): if hasattr(self._agent,'apply') : return self._agent.apply(_query) return None def submit(self,_query): return self.delegate('submit',_query) def delegate(self,_name,_query): if hasattr(self._agent,_name) : pointer = getattr(self._agent,_name) return pointer(_query) return None class IReader(IO): """ This is a wrapper for read functionalities """ def __init__(self,_agent,pipeline=None): super().__init__(_agent,pipeline) def _stream (self,_data ): for _segment in _data : yield self._plugins.apply(_segment) def read(self,**_args): if 'plugins' in _args : self._init_plugins(_args['plugins']) _data = self._agent.read(**_args) if self._plugins and self._plugins.ratio() > 0 : if types.GeneratorType == type(_data): return self._stream(_data) else: _data = self._plugins.apply(_data) return _data else: return _data class IWriter(IO): def __init__(self,_agent,pipeline=None): super().__init__(_agent,pipeline) def write(self,_data,**_args): if 'plugins' in _args : self._init_plugins(_args['plugins']) if self._plugins and self._plugins.ratio() > 0 : _data = self._plugins.apply(_data) self._agent.write(_data,**_args) # # The ETL object in its simplest form is an aggregation of read/write objects # @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process class IETL(IReader) : """ This class performs an ETL operation by ineriting a read and adding writes as pipeline functions """ def __init__(self,**_args): super().__init__(transport.get.reader(**_args['source'])) if 'target' in _args: self._targets = _args['target'] if type(_args['target']) == list else [_args['target']] else: self._targets = [] self.jobs = [] # # If the parent is already multiprocessing self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] def run(self) : """ We should apply the etl here, if we are in multiprocessing mode """ _data = super().read() for _kwargs in self._targets : self.post(_data,**_kwargs) def read(self,**_args): _data = super().read(**_args) if types.GeneratorType == type(_data): for _segment in _data : for _kwars in self._targets : self.post(_segment,**_kwargs) else: for _kwargs in self._targets : self.post(_data,**_kwargs) return _data # return _data def post (self,_data,**_args) : """ This function returns an instance of a process that will perform the write operation :_args parameters associated with writer object """ writer = transport.get.writer(**_args) writer.write(_data) writer.close()