From dc9329218aaa1e5f74469ad8ffac878dff0f3626 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 26 Nov 2024 19:30:56 -0600 Subject: [PATCH] bug fixes: read, with source that accepts an sql query --- info/__init__.py | 2 +- transport/__init__.py | 6 +++++- transport/iowrapper.py | 16 +++++++++------- transport/warehouse/iceberg.py | 3 +++ 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/info/__init__.py b/info/__init__.py index 760b575..2ca3ba7 100644 --- a/info/__init__.py +++ b/info/__init__.py @@ -1,6 +1,6 @@ __app_name__ = 'data-transport' __author__ = 'The Phi Technology' -__version__= '2.4.8' +__version__= '2.4.9' __email__ = "info@the-phi.com" __license__=f""" Copyright 2010 - 2024, Steve L. Nyemba diff --git a/transport/__init__.py b/transport/__init__.py index 64bda0d..f883f20 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -132,7 +132,11 @@ def instance (**_args): _logger = _logger(**_kwargs) else: _logger = None - _datatransport = IReader(_agent,_plugins,_logger) if _context == 'read' else IWriter(_agent,_plugins,_logger) + _kwargs = {'agent':_agent,'plugins':_plugins,'logger':_logger} + if 'args' in _args : + _kwargs['args'] = _kwargs['args'] + # _datatransport = IReader(_agent,_plugins,_logger) if _context == 'read' else IWriter(_agent,_plugins,_logger) + _datatransport = IReader(_**_kwargs) if _context == 'read' else IWriter(**_kwargs) return _datatransport else: diff --git a/transport/iowrapper.py b/transport/iowrapper.py index 04a8531..f1598c7 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -23,13 +23,15 @@ class IO: """ Base wrapper class for read/write and support for logs """ - def __init__(self,_agent,plugins,_logger=None): + def __init__(self,**_args): # # We need to initialize the logger here ... # # registry.init() - + _agent = _args['agent'] + plugins = _args['plugins'] + _logger = _args['logger'] self._logger = _logger if not type(_agent) in [IReader,IWriter] else _agent._logger #transport.get.writer(label='logger') #if registry.has('logger') else None # if not _logger and hasattr(_agent,'_logger') : # self._logger = getattr(_agent,'_logger') @@ -100,9 +102,9 @@ class IReader(IO): """ This is a wrapper for read functionalities """ - def __init__(self,_agent,_plugins=None,_logger=None): - super().__init__(_agent,_plugins,_logger) - self._args = if 'args' in _args else None + def __init__(self,**_args): + super().__init__(_args['agent'],_args['plugins'],_args['logger']) + self._args = _args['args']if 'args' in _args else None def _stream (self,_data ): # self.log(action='streaming',object=self._agent._engine.name, input= type(_data).__name__) _shape = [] @@ -148,8 +150,8 @@ class IReader(IO): class IWriter(IO): lock = RLock() - def __init__(self,_agent,pipeline=None,_logger=None): - super().__init__(_agent,pipeline,_logger) + def __init__(self,**_args): + super().__init__(_args['agent'],_args['plugins'],_args['logger']) def write(self,_data,**_args): if 'plugins' in _args : self._init_plugins(_args['plugins']) diff --git a/transport/warehouse/iceberg.py b/transport/warehouse/iceberg.py index 518f570..7d7297a 100644 --- a/transport/warehouse/iceberg.py +++ b/transport/warehouse/iceberg.py @@ -1,6 +1,9 @@ """ dependency: - spark and SPARK_HOME environment variable must be set +NOTE: + When using streaming option, insure that it is inline with default (1000 rows) or increase it in spark-defaults.conf + """ from pyspark.sql import SparkSession from pyspark import SparkContext