adding warehouse support (iceberg)

v2.4
Steve Nyemba 1 month ago
parent 2df926da12
commit 07be81bace

@ -1,6 +1,6 @@
__app_name__ = 'data-transport' __app_name__ = 'data-transport'
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '2.2.6' __version__= '2.4.0'
__email__ = "info@the-phi.com" __email__ = "info@the-phi.com"
__license__=f""" __license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba Copyright 2010 - 2024, Steve L. Nyemba
@ -15,7 +15,9 @@ THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR I
__whatsnew__=f"""version {__version__}, focuses on collaborative environments like jupyter-base servers (apache zeppelin; jupyter notebook, jupyterlab, jupyterhub) __whatsnew__=f"""version {__version__}, focuses on collaborative environments like jupyter-base servers (apache zeppelin; jupyter notebook, jupyterlab, jupyterhub)
1. simpler syntax to create readers/writers 1. support for apache iceberg data warehouse using spark
2. auth-file registry that can be referenced using a label 2. Improved ETL & performance
3. duckdb support 3. bug fixes: mongodb
""" """

@ -18,7 +18,7 @@ Source Code is available under MIT License:
""" """
import numpy as np import numpy as np
from transport import sql, nosql, cloud, other from transport import sql, nosql, cloud, other, warehouse
import pandas as pd import pandas as pd
import json import json
import os import os
@ -33,7 +33,7 @@ PROVIDERS = {}
def init(): def init():
global PROVIDERS global PROVIDERS
for _module in [cloud,sql,nosql,other] : for _module in [cloud,sql,nosql,other,warehouse] :
for _provider_name in dir(_module) : for _provider_name in dir(_module) :
if _provider_name.startswith('__') or _provider_name == 'common': if _provider_name.startswith('__') or _provider_name == 'common':
continue continue

@ -44,7 +44,8 @@ PGSQL = POSTGRESQL
AWS_S3 = 's3' AWS_S3 = 's3'
RABBIT = RABBITMQ RABBIT = RABBITMQ
ICEBERG='iceberg'
APACHE_ICEBERG = 'iceberg'
# QLISTENER = 'qlistener' # QLISTENER = 'qlistener'

@ -0,0 +1,7 @@
"""
This namespace/package is intended to handle read/writes against data warehouse solutions like :
- apache iceberg
- clickhouse (...)
"""
from . import iceberg

@ -0,0 +1,98 @@
from pyspark.sql import SparkSession
import copy
class Iceberg :
def __init__(self,**_args):
"""
providing catalog meta information (you must get this from apache iceberg)
"""
#
# @TODO:
# Make arrangements for additional configuration elements
#
self._session = SparkSession.builder.getOrCreate()
self._catalog = self._session.catalog
self._table = _args['table'] if 'table' in _args else None
if 'catalog' in _args :
#
# Let us set the default catalog
self._catalog.setCurrentCatalog(_args['catalog'])
else:
# No current catalog has been set ...
pass
if 'database' in _args :
self._database = _args['database']
self._catalog.setCurrentDatabase(self._database)
else:
#
# Should we set the default as the first one if available ?
#
pass
self._catalogName = self._catalog.currentCatalog()
self._databaseName = self._catalog.currentDatabase()
def meta (self,**_args) :
"""
This function should return the schema of a table (only)
"""
_schema = []
try:
_tableName = self._getPrefix(**_args) + f".{_args['table']}"
print (_tableName)
_tmp = self._session.table(_tableName).schema
_schema = _tmp.jsonValue()['fields']
for _item in _schema :
del _item['nullable'],_item['metadata']
except Exception as e:
pass
return _schema
def _getPrefix (self,**_args):
_catName = self._catalogName if 'catalog' not in _args else _args['catalog']
_datName = self._databaseName if 'database' not in _args else _args['database']
return '.'.join([_catName,_datName])
def has (self,**_args):
try:
_prefix = self._getPrefix(**_args)
if _prefix.endswith('.') :
return False
return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)]
except Exception as e:
print (e)
return False
def apply(self,sql):
pass
class Reader(Iceberg) :
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args):
_table = self._table
_prefix = self._getPrefix(**_args)
if 'table' in _args or _table:
_table = _args['table'] if 'table' in _args else _table
_table = _prefix + f'.{_table}'
return self._session.table(_table).toPandas()
else:
sql = _args['sql']
return self._session.sql(sql).toPandas()
pass
class Writer (Iceberg):
"""
Writing data to an Apache Iceberg data warehouse (using pyspark)
"""
def __init__(self,**_args):
super().__init__(**_args)
self._mode = 'append'
self._table = None if 'table' not in _args else _args['table']
def write(self,_data,**_args):
_prefix = self._getPrefix(**_args)
if 'table' not in _args and not self._table :
raise Exception (f"Table Name should be specified for catalog/database {_prefix}")
rdd = self._session.createDataFrame(_data)
_mode = self._mode if 'mode' not in _args else _args['mode']
_table = self._table if 'table' not in _args else _args['table']
_table = f'{_prefix}.{_table}'
rdd.write.format('iceberg').mode(self._mode).save(_table)
Loading…
Cancel
Save