Compare commits

...

2 Commits

Author SHA1 Message Date
Steve Nyemba 685aac7d6b bug fix: write when table doesn't exist
3 days ago
Steve Nyemba 07be81bace adding warehouse support (iceberg)
3 days ago

@ -24,7 +24,8 @@ from multiprocessing import Process
import os import os
import transport import transport
from transport import etl # from transport import etl
from transport.iowrapper import IETL
# from transport import providers # from transport import providers
import typer import typer
from typing_extensions import Annotated from typing_extensions import Annotated
@ -60,10 +61,13 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil
_config = [_config[ int(index)]] _config = [_config[ int(index)]]
jobs = [] jobs = []
for _args in _config : for _args in _config :
pthread = etl.instance(**_args) #-- automatically starts the process # pthread = etl.instance(**_args) #-- automatically starts the process
_worker = IETL(**_args)
pthread = Process(target=_worker.run)
pthread.start()
jobs.append(pthread) jobs.append(pthread)
# #
# @TODO: Log the number of processes started and estimated time # @TODO: Log the number of processes started and estfrom transport impfrom transport impimated time
while jobs : while jobs :
jobs = [pthread for pthread in jobs if pthread.is_alive()] jobs = [pthread for pthread in jobs if pthread.is_alive()]
time.sleep(1) time.sleep(1)
@ -88,6 +92,7 @@ def version():
""" """
print (transport.__app_name__,'version ',transport.__version__) print (transport.__app_name__,'version ',transport.__version__)
print ()
print (transport.__license__) print (transport.__license__)
@app.command() @app.command()

@ -1,6 +1,6 @@
__app_name__ = 'data-transport' __app_name__ = 'data-transport'
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '2.2.6' __version__= '2.4.0'
__email__ = "info@the-phi.com" __email__ = "info@the-phi.com"
__license__=f""" __license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba Copyright 2010 - 2024, Steve L. Nyemba
@ -15,7 +15,9 @@ THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR I
__whatsnew__=f"""version {__version__}, focuses on collaborative environments like jupyter-base servers (apache zeppelin; jupyter notebook, jupyterlab, jupyterhub) __whatsnew__=f"""version {__version__}, focuses on collaborative environments like jupyter-base servers (apache zeppelin; jupyter notebook, jupyterlab, jupyterhub)
1. simpler syntax to create readers/writers 1. support for apache iceberg data warehouse using spark
2. auth-file registry that can be referenced using a label 2. Improved ETL & performance
3. duckdb support 3. bug fixes: mongodb
""" """

@ -18,7 +18,7 @@ Source Code is available under MIT License:
""" """
import numpy as np import numpy as np
from transport import sql, nosql, cloud, other from transport import sql, nosql, cloud, other, warehouse
import pandas as pd import pandas as pd
import json import json
import os import os
@ -33,7 +33,7 @@ PROVIDERS = {}
def init(): def init():
global PROVIDERS global PROVIDERS
for _module in [cloud,sql,nosql,other] : for _module in [cloud,sql,nosql,other,warehouse] :
for _provider_name in dir(_module) : for _provider_name in dir(_module) :
if _provider_name.startswith('__') or _provider_name == 'common': if _provider_name.startswith('__') or _provider_name == 'common':
continue continue

@ -103,6 +103,14 @@ class IETL(IReader) :
# #
# If the parent is already multiprocessing # If the parent is already multiprocessing
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
def run(self) :
"""
We should apply the etl here, if we are in multiprocessing mode
"""
_data = super().read()
for _kwargs in self._targets :
self.post(_data,**_kwargs)
def read(self,**_args): def read(self,**_args):
_data = super().read(**_args) _data = super().read(**_args)

@ -44,7 +44,8 @@ PGSQL = POSTGRESQL
AWS_S3 = 's3' AWS_S3 = 's3'
RABBIT = RABBITMQ RABBIT = RABBITMQ
ICEBERG='iceberg'
APACHE_ICEBERG = 'iceberg'
# QLISTENER = 'qlistener' # QLISTENER = 'qlistener'

@ -0,0 +1,7 @@
"""
This namespace/package is intended to handle read/writes against data warehouse solutions like :
- apache iceberg
- clickhouse (...)
"""
from . import iceberg

@ -0,0 +1,103 @@
from pyspark.sql import SparkSession
import copy
class Iceberg :
def __init__(self,**_args):
"""
providing catalog meta information (you must get this from apache iceberg)
"""
#
# @TODO:
# Make arrangements for additional configuration elements
#
self._session = SparkSession.builder.getOrCreate()
self._catalog = self._session.catalog
self._table = _args['table'] if 'table' in _args else None
if 'catalog' in _args :
#
# Let us set the default catalog
self._catalog.setCurrentCatalog(_args['catalog'])
else:
# No current catalog has been set ...
pass
if 'database' in _args :
self._database = _args['database']
self._catalog.setCurrentDatabase(self._database)
else:
#
# Should we set the default as the first one if available ?
#
pass
self._catalogName = self._catalog.currentCatalog()
self._databaseName = self._catalog.currentDatabase()
def meta (self,**_args) :
"""
This function should return the schema of a table (only)
"""
_schema = []
try:
_tableName = self._getPrefix(**_args) + f".{_args['table']}"
print (_tableName)
_tmp = self._session.table(_tableName).schema
_schema = _tmp.jsonValue()['fields']
for _item in _schema :
del _item['nullable'],_item['metadata']
except Exception as e:
pass
return _schema
def _getPrefix (self,**_args):
_catName = self._catalogName if 'catalog' not in _args else _args['catalog']
_datName = self._databaseName if 'database' not in _args else _args['database']
return '.'.join([_catName,_datName])
def has (self,**_args):
try:
_prefix = self._getPrefix(**_args)
if _prefix.endswith('.') :
return False
return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)]
except Exception as e:
print (e)
return False
def apply(self,sql):
pass
class Reader(Iceberg) :
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args):
_table = self._table
_prefix = self._getPrefix(**_args)
if 'table' in _args or _table:
_table = _args['table'] if 'table' in _args else _table
_table = _prefix + f'.{_table}'
return self._session.table(_table).toPandas()
else:
sql = _args['sql']
return self._session.sql(sql).toPandas()
pass
class Writer (Iceberg):
"""
Writing data to an Apache Iceberg data warehouse (using pyspark)
"""
def __init__(self,**_args):
super().__init__(**_args)
self._mode = 'append' if 'mode' not in _args else _args['mode']
self._table = None if 'table' not in _args else _args['table']
def write(self,_data,**_args):
_prefix = self._getPrefix(**_args)
if 'table' not in _args and not self._table :
raise Exception (f"Table Name should be specified for catalog/database {_prefix}")
rdd = self._session.createDataFrame(_data)
_mode = self._mode if 'mode' not in _args else _args['mode']
_table = self._table if 'table' not in _args else _args['table']
if not self.has(table=_table) :
_mode = 'overwrite'
rdd.write.format('iceberg').mode(_mode).saveAsTable(_table)
else:
_table = f'{_prefix}.{_table}'
rdd.write.format('iceberg').mode(_mode).save(_table)
Loading…
Cancel
Save