bug fixes: read, with source that accepts an sql query

v2.4
Steve Nyemba 9 hours ago
parent 990bb343a4
commit dc9329218a

@ -1,6 +1,6 @@
__app_name__ = 'data-transport'
__author__ = 'The Phi Technology'
__version__= '2.4.8'
__version__= '2.4.9'
__email__ = "info@the-phi.com"
__license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba

@ -132,7 +132,11 @@ def instance (**_args):
_logger = _logger(**_kwargs)
else:
_logger = None
_datatransport = IReader(_agent,_plugins,_logger) if _context == 'read' else IWriter(_agent,_plugins,_logger)
_kwargs = {'agent':_agent,'plugins':_plugins,'logger':_logger}
if 'args' in _args :
_kwargs['args'] = _kwargs['args']
# _datatransport = IReader(_agent,_plugins,_logger) if _context == 'read' else IWriter(_agent,_plugins,_logger)
_datatransport = IReader(_**_kwargs) if _context == 'read' else IWriter(**_kwargs)
return _datatransport
else:

@ -23,13 +23,15 @@ class IO:
"""
Base wrapper class for read/write and support for logs
"""
def __init__(self,_agent,plugins,_logger=None):
def __init__(self,**_args):
#
# We need to initialize the logger here ...
#
# registry.init()
_agent = _args['agent']
plugins = _args['plugins']
_logger = _args['logger']
self._logger = _logger if not type(_agent) in [IReader,IWriter] else _agent._logger #transport.get.writer(label='logger') #if registry.has('logger') else None
# if not _logger and hasattr(_agent,'_logger') :
# self._logger = getattr(_agent,'_logger')
@ -100,9 +102,9 @@ class IReader(IO):
"""
This is a wrapper for read functionalities
"""
def __init__(self,_agent,_plugins=None,_logger=None):
super().__init__(_agent,_plugins,_logger)
self._args = if 'args' in _args else None
def __init__(self,**_args):
super().__init__(_args['agent'],_args['plugins'],_args['logger'])
self._args = _args['args']if 'args' in _args else None
def _stream (self,_data ):
# self.log(action='streaming',object=self._agent._engine.name, input= type(_data).__name__)
_shape = []
@ -148,8 +150,8 @@ class IReader(IO):
class IWriter(IO):
lock = RLock()
def __init__(self,_agent,pipeline=None,_logger=None):
super().__init__(_agent,pipeline,_logger)
def __init__(self,**_args):
super().__init__(_args['agent'],_args['plugins'],_args['logger'])
def write(self,_data,**_args):
if 'plugins' in _args :
self._init_plugins(_args['plugins'])

@ -1,6 +1,9 @@
"""
dependency:
- spark and SPARK_HOME environment variable must be set
NOTE:
When using streaming option, insure that it is inline with default (1000 rows) or increase it in spark-defaults.conf
"""
from pyspark.sql import SparkSession
from pyspark import SparkContext

Loading…
Cancel
Save