From 5c423205c580791bb82d677d84ece4546382af8d Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Fri, 27 Dec 2024 12:55:04 -0600 Subject: [PATCH] bug fixes and enhancements, iceberg casting, typer parameters, etl throtling --- bin/transport | 44 +++++++--- info/__init__.py | 2 +- transport/__init__.py | 3 +- transport/iowrapper.py | 155 ++++++++++++++++++++++----------- transport/warehouse/iceberg.py | 31 ++++++- 5 files changed, 167 insertions(+), 68 deletions(-) diff --git a/bin/transport b/bin/transport index bb35f7a..97332ec 100755 --- a/bin/transport +++ b/bin/transport @@ -32,8 +32,11 @@ from typing_extensions import Annotated from typing import Optional import time from termcolor import colored +from enum import Enum +from typing import Tuple app = typer.Typer() +app_x = typer.Typer() REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport']) REGISTRY_FILE= 'transport-registry.json' CHECK_MARK = ' '.join(['[',colored(u'\u2713', 'green'),']']) @@ -45,10 +48,15 @@ def wait(jobs): while jobs : jobs = [thread for thread in jobs if thread.is_alive()] time.sleep(1) +def wait (jobs): + while jobs : + jobs = [pthread for pthread in jobs if pthread.is_alive()] -@app.command(name="apply-etl") +@app.command(name="etl") def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")], - index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed")): + index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"), + batch:int = typer.Option(default=5, help="The number of parallel processes to run at once") + ): """ This function applies data transport ETL feature to read data from one source to write it one or several others """ @@ -57,20 +65,28 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil file = open(path) _config = json.loads (file.read() ) file.close() - if index : + if index is not None: _config = [_config[ int(index)]] - jobs = [] + jobs = [] for _args in _config : # pthread = etl.instance(**_args) #-- automatically starts the process - _worker = IETL(**_args) - pthread = Process(target=_worker.run) + def bootup (): + _worker = IETL(**_args) + _worker.run() + pthread = Process(target=bootup) pthread.start() jobs.append(pthread) + if len(jobs) == batch : + wait(jobs) + jobs = [] + + if jobs : + wait (jobs) # # @TODO: Log the number of processes started and estfrom transport impfrom transport impimated time - while jobs : - jobs = [pthread for pthread in jobs if pthread.is_alive()] - time.sleep(1) + # while jobs : + # jobs = [pthread for pthread in jobs if pthread.is_alive()] + # time.sleep(1) # # @TODO: Log the job termination here ... @app.command(name="providers") @@ -150,8 +166,9 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be except Exception as e: _msg = f"""{TIMES_MARK} {e}""" print (_msg) - -@app.command(name='plugin-add') + + pass +@app_x.command(name='add') def register_plugs ( alias:Annotated[str,typer.Argument(help="unique alias fo the file being registered")], path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")] @@ -164,7 +181,7 @@ def register_plugs ( _mark = TIMES_MARK if not _log else CHECK_MARK _msg = f"""Could NOT add the \033[1m{alias}\033[0m to the registry""" if not _log else f""" successfully added {alias}, {len(_log)} functions added""" print (f"""{_mark} {_msg}""") -@app.command(name="plugin-list") +@app_x.command(name="list") def registry_list (): transport.registry.plugins.init() @@ -177,7 +194,7 @@ def registry_list (): else: print (f"""{TIMES_MARK}, Plugin registry is not available or needs initialization""") -@app.command(name="plugin-test") +@app_x.command(name="test") def registry_test (key): """ This function allows to test syntax for a plugin i.e in terms of alis@function @@ -190,6 +207,7 @@ def registry_test (key): else: print (f"{TIMES_MARK} unable to load \033[1m{key}\033[0m. Make sure it is registered") +app.add_typer(app_x, name="plugins") if __name__ == '__main__' : app() diff --git a/info/__init__.py b/info/__init__.py index 2ca3ba7..33a7edf 100644 --- a/info/__init__.py +++ b/info/__init__.py @@ -1,6 +1,6 @@ __app_name__ = 'data-transport' __author__ = 'The Phi Technology' -__version__= '2.4.9' +__version__= '2.4.12 __email__ = "info@the-phi.com" __license__=f""" Copyright 2010 - 2024, Steve L. Nyemba diff --git a/transport/__init__.py b/transport/__init__.py index d9070db..33a3261 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -119,7 +119,7 @@ def instance (**_args): # _kwa = registry.get('logger') # _lmodule = getPROVIDERS[_kwa['provider']] - if ('label' not in _args and registry.has('logger')): + if ( ('label' in _args and _args['label'] != 'logger') and registry.has('logger')): # # We did not request label called logger, so we are setting up a logger if it is specified in the registry # @@ -132,6 +132,7 @@ def instance (**_args): _logger = _logger(**_kwargs) else: _logger = None + _kwargs = {'agent':_agent,'plugins':_plugins,'logger':_logger} if 'args' in _args : _kwargs['args'] = _args['args'] diff --git a/transport/iowrapper.py b/transport/iowrapper.py index 2633eed..1703fc0 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -14,12 +14,33 @@ import types from . import registry from datetime import datetime import pandas as pd +import numpy as np import os import sys import itertools import json +class BaseIO : + def __init__(self,**_args): + + self._logger = _args['logger'] if 'logger' in _args else None + self._logTable = 'logs' if 'logTable' not in _args else _args['logTable'] -class IO: + def setLogger(self,_logger): + self._logger = _logger + def log (self,**_args): + + if self._logger : + _date = str(datetime.now()) + _data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args) + for key in _data : + if type(_data[key]) == list : + _data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]] + + _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key]) + + self._logger.write(pd.DataFrame([_data])) #,table=self._logTable) + +class IO(BaseIO): """ Base wrapper class for read/write and support for logs """ @@ -28,33 +49,34 @@ class IO: # # We need to initialize the logger here ... # - # registry.init() + super().__init__(**_args) _agent = _args['agent'] plugins = _args['plugins'] - _logger = _args['logger'] if 'logger' in _args else None - self._logger = _logger if not type(_agent) in [IReader,IWriter] else _agent._logger #transport.get.writer(label='logger') #if registry.has('logger') else None + # _logger = _args['logger'] if 'logger' in _args else None + # self._logger = _logger if not type(_agent) in [IReader,IWriter] else _agent._logger #transport.get.writer(label='logger') #if registry.has('logger') else None # if not _logger and hasattr(_agent,'_logger') : # self._logger = getattr(_agent,'_logger') self._agent = _agent _date = _date = str(datetime.now()) - self._logTable = 'logs' #'_'.join(['logs',_date[:10]+_date[11:19]]).replace(':','').replace('-','_') + # self._logTable = 'logs' #'_'.join(['logs',_date[:10]+_date[11:19]]).replace(':','').replace('-','_') if plugins : self._init_plugins(plugins) else: self._plugins = None - def setLogger(self,_logger): - self._logger = _logger - def log (self,**_args): - if self._logger : - _date = str(datetime.now()) - _data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args) - for key in _data : - if type(_data[key]) == list : - _data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]] + # def setLogger(self,_logger): + # self._logger = _logger + # def log (self,**_args): + # if self._logger : + # _date = str(datetime.now()) + # _data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args) + # for key in _data : + # if type(_data[key]) == list : + # _data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]] - _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key]) - self._logger.write(pd.DataFrame([_data])) #,table=self._logTable) + # _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key]) + + # self._logger.write(pd.DataFrame([_data])) #,table=self._logTable) def _init_plugins(self,_items): """ This function will load pipelined functions as a plugin loader @@ -63,7 +85,7 @@ class IO: self._plugins = PluginLoader(registry=registry.plugins) [self._plugins.set(_name) for _name in _items] - self.log(action='init-plugins',caller='read',object=self.getClassName(self),input =[_name for _name in _items]) + self.log(action='init-plugins',object=self.getClassName(self),input =[_name for _name in _items]) # if 'path' in _args and 'names' in _args : # self._plugins = PluginLoader(**_args) # else: @@ -109,13 +131,17 @@ class IReader(IO): # self.log(action='streaming',object=self._agent._engine.name, input= type(_data).__name__) _shape = [] for _segment in _data : - _shape.append(list(_segment.shape)) + _shape += list(_segment.shape) if self._plugins : yield self._plugins.apply(_segment,self.log) else: yield _segment _objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__]) - self.log(action='streaming',object=_objectName, input= {'shape':_shape}) + + _input = {'shape':_shape} + if hasattr(self._agent,'_table') : + _input['table'] = self._agent._table + self.log(action='streaming',object=_objectName, input= _input) def read(self,**_args): @@ -141,8 +167,11 @@ class IReader(IO): # return _data elif type(_data) == pd.DataFrame : _shape = _data.shape #[0,0] if not _data.shape[] else list(_data.shape) + _input = {'shape':_shape} + if hasattr(self._agent,'_table') : + _input['table'] = self._agent._table - self.log(action='read',object=_objectName, input=_shape) + self.log(action='read',object=_objectName, input=_input) if self._plugins : _logs = [] _data = self._plugins.apply(_data,self.log) @@ -171,48 +200,67 @@ class IWriter(IO): # The ETL object in its simplest form is an aggregation of read/write objects # @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process -class IETL(IReader) : +class IETL(BaseIO) : """ This class performs an ETL operation by ineriting a read and adding writes as pipeline functions """ def __init__(self,**_args): - _source = _args['source'] - _plugins = _source['plugins'] if 'plugins' in _source else None + # _source = _args['source'] + # _plugins = _source['plugins'] if 'plugins' in _source else None - # super().__init__(transport.get.reader(**_args['source'])) - super().__init__(agent=transport.get.reader(**_source),plugins=_plugins) - # _logger = - if 'target' in _args: - self._targets = _args['target'] if type(_args['target']) == list else [_args['target']] - else: - self._targets = [] - self.jobs = [] + # # super().__init__(transport.get.reader(**_args['source'])) + # super().__init__(agent=transport.get.reader(**_source),plugins=_plugins) + # # _logger = + # if 'target' in _args: + # self._targets = _args['target'] if type(_args['target']) == list else [_args['target']] + # else: + # self._targets = [] + # self.jobs = [] + + # # + # # If the parent is already multiprocessing + # if 'token' in _source : + # self._logToken = _source['token'] + # self._sourceArgs = _source['args'] if 'args' in _source else None + # self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] + super().__init__() + self._source = _args['source'] + self._targets= _args['target'] if _args['target'] == list else [_args['target']] # - # If the parent is already multiprocessing - if 'token' in _source : - self._logToken = _source['token'] - self._sourceArgs = _source['args'] if 'args' in _source else None - self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] + # ETL Initialization, we should provide some measure of context ... + # + # def run(self) : # """ # We should apply the etl here, if we are in multiprocessing mode # """ # return self.read() def run(self,**_args): - _data = super().read(**_args) if not self._sourceArgs else super().read(**self._sourceArgs) - self._targets = [transport.get.writer(**_kwargs) for _kwargs in self._targets] + # _data = super().read(**_args) if not self._sourceArgs else super().read(**self._sourceArgs) + # self._targets = [transport.get.writer(**_kwargs) for _kwargs in self._targets] + + _reader = transport.get.reader(**self._source) + if hasattr(_reader,'_logger') : + self.setLogger(_reader._logger) + + self.log(action='init-etl',input={'source':self._source,'target':self._targets}) + _data = _reader.read(**self._source['args'])if 'args' in self._source else _reader.read() + _reader.close() + _writers = [transport.get.writer(**_kwargs) for _kwargs in self._targets] + _schema = [] if not getattr(_reader._agent,'_table') else _reader.meta() if types.GeneratorType == type(_data): _index = 0 for _segment in _data : _index += 1 - for _writer in self._targets : - self.post(_segment,writer=_writer,index=_index) + for _writer in _writers : + self.post(_segment,writer=_writer,index=_index,schema=_schema) time.sleep(1) + else: - for _writer in self._targets : - self.post(_data,writer=_writer) - + for _writer in _writers : + self.post(_data,writer=_writer,schema=_schema) + # pass return _data # return _data def post (self,_data,**_args) : @@ -221,18 +269,27 @@ class IETL(IReader) : :_args parameters associated with writer object """ #writer = transport.get.writer(**_args) - + _input = {} try: _action = 'post' _shape = dict(zip(['rows','columns'],_data.shape)) _index = _args['index'] if 'index' in _args else 0 - writer = _args['writer'] - writer.write(_data) + writer = _args['writer'] + _schema= _args['schema'] + for _item in _schema : + if _item['type'] == 'INTEGER' : + _data.loc[:,_item['name']] = _data[_item['name']].copy().astype(np.int64) + writer.write(_data,schema=_schema) + # + # -- things to log + _input = {'shape':_shape,'segment':_index} + if hasattr(writer._agent,'_table'): + _input['table'] = writer._agent._table except Exception as e: _action = 'post-error' - print (e) + print ([e]) pass - - self.log(action=_action,object=writer._agent.__module__, input= {'shape':_shape,'segment':_index}) + + self.log(action=_action,object=writer._agent.__module__, input= _input) diff --git a/transport/warehouse/iceberg.py b/transport/warehouse/iceberg.py index b6926c9..4e73c62 100644 --- a/transport/warehouse/iceberg.py +++ b/transport/warehouse/iceberg.py @@ -7,7 +7,8 @@ NOTE: """ from pyspark.sql import SparkSession from pyspark import SparkContext - +from pyspark.sql.types import * +from pyspark.sql.functions import col, to_date, to_timestamp import copy class Iceberg : @@ -25,6 +26,7 @@ class Iceberg : # Make arrangements for additional configuration elements # self._session = SparkSession.builder.appName("data-transport").getOrCreate() + self._session.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") # self._session.sparkContext.setLogLevel("ERROR") self._catalog = self._session.catalog self._table = _args['table'] if 'table' in _args else None @@ -53,7 +55,8 @@ class Iceberg : """ _schema = [] try: - _tableName = self._getPrefix(**_args) + f".{_args['table']}" + _table = _args['table'] if 'table' in _args else self._table + _tableName = self._getPrefix(**_args) + f".{_table}" _tmp = self._session.table(_tableName).schema _schema = _tmp.jsonValue()['fields'] for _item in _schema : @@ -106,15 +109,34 @@ class Writer (Iceberg): super().__init__(**_args) self._mode = 'append' if 'mode' not in _args else _args['mode'] self._table = None if 'table' not in _args else _args['table'] + def format (self,_schema) : + _iceSchema = StructType([]) + _map = {'integer':IntegerType(),'float':DoubleType(),'double':DoubleType(),'date':DateType(), + 'timestamp':TimestampType(),'datetime':TimestampType(),'string':StringType(),'varchar':StringType()} + for _item in _schema : + _name = _item['name'] + _type = _item['type'].lower() + if _type not in _map : + _iceType = StringType() + else: + _iceType = _map[_type] + + _iceSchema.add (StructField(_name,_iceType,True)) + return _iceSchema if len(_iceSchema) else [] def write(self,_data,**_args): _prefix = self._getPrefix(**_args) if 'table' not in _args and not self._table : raise Exception (f"Table Name should be specified for catalog/database {_prefix}") - rdd = self._session.createDataFrame(_data,verifySchema=False) + _schema = self.format(_args['schema']) if 'schema' in _args else [] + if not _schema : + rdd = self._session.createDataFrame(_data,verifySchema=False) + else : + rdd = self._session.createDataFrame(_data,schema=_schema,verifySchema=True) _mode = self._mode if 'mode' not in _args else _args['mode'] _table = self._table if 'table' not in _args else _args['table'] + # print (_data.shape,_mode,_table) - + if not self._session.catalog.tableExists(_table): # # @TODO: # # add partitioning information here @@ -125,4 +147,5 @@ class Writer (Iceberg): else: # rdd.writeTo(_table).append() # # _table = f'{_prefix}.{_table}' + rdd.coalesce(10).write.format('iceberg').mode('append').save(_table)