diff --git a/info/__init__.py b/info/__init__.py index 3eded86..ee36366 100644 --- a/info/__init__.py +++ b/info/__init__.py @@ -1,6 +1,6 @@ __app_name__ = 'data-transport' __author__ = 'The Phi Technology' -__version__= '2.2.6' +__version__= '2.4.0' __email__ = "info@the-phi.com" __license__=f""" Copyright 2010 - 2024, Steve L. Nyemba @@ -15,7 +15,9 @@ THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR I __whatsnew__=f"""version {__version__}, focuses on collaborative environments like jupyter-base servers (apache zeppelin; jupyter notebook, jupyterlab, jupyterhub) - 1. simpler syntax to create readers/writers - 2. auth-file registry that can be referenced using a label - 3. duckdb support + 1. support for apache iceberg data warehouse using spark + 2. Improved ETL & performance + 3. bug fixes: mongodb + + """ diff --git a/transport/__init__.py b/transport/__init__.py index b934760..8ab354a 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -18,7 +18,7 @@ Source Code is available under MIT License: """ import numpy as np -from transport import sql, nosql, cloud, other +from transport import sql, nosql, cloud, other, warehouse import pandas as pd import json import os @@ -33,7 +33,7 @@ PROVIDERS = {} def init(): global PROVIDERS - for _module in [cloud,sql,nosql,other] : + for _module in [cloud,sql,nosql,other,warehouse] : for _provider_name in dir(_module) : if _provider_name.startswith('__') or _provider_name == 'common': continue diff --git a/transport/providers/__init__.py b/transport/providers/__init__.py index 6422d74..35779d1 100644 --- a/transport/providers/__init__.py +++ b/transport/providers/__init__.py @@ -44,7 +44,8 @@ PGSQL = POSTGRESQL AWS_S3 = 's3' RABBIT = RABBITMQ - +ICEBERG='iceberg' +APACHE_ICEBERG = 'iceberg' # QLISTENER = 'qlistener' \ No newline at end of file diff --git a/transport/warehouse/__init__.py b/transport/warehouse/__init__.py new file mode 100644 index 0000000..d82324e --- /dev/null +++ b/transport/warehouse/__init__.py @@ -0,0 +1,7 @@ +""" +This namespace/package is intended to handle read/writes against data warehouse solutions like : + - apache iceberg + - clickhouse (...) +""" + +from . import iceberg \ No newline at end of file diff --git a/transport/warehouse/iceberg.py b/transport/warehouse/iceberg.py new file mode 100644 index 0000000..3083e68 --- /dev/null +++ b/transport/warehouse/iceberg.py @@ -0,0 +1,98 @@ +from pyspark.sql import SparkSession +import copy + +class Iceberg : + def __init__(self,**_args): + """ + providing catalog meta information (you must get this from apache iceberg) + """ + # + # @TODO: + # Make arrangements for additional configuration elements + # + self._session = SparkSession.builder.getOrCreate() + self._catalog = self._session.catalog + self._table = _args['table'] if 'table' in _args else None + + if 'catalog' in _args : + # + # Let us set the default catalog + self._catalog.setCurrentCatalog(_args['catalog']) + + else: + # No current catalog has been set ... + pass + if 'database' in _args : + self._database = _args['database'] + self._catalog.setCurrentDatabase(self._database) + else: + # + # Should we set the default as the first one if available ? + # + pass + self._catalogName = self._catalog.currentCatalog() + self._databaseName = self._catalog.currentDatabase() + def meta (self,**_args) : + """ + This function should return the schema of a table (only) + """ + _schema = [] + try: + _tableName = self._getPrefix(**_args) + f".{_args['table']}" + print (_tableName) + _tmp = self._session.table(_tableName).schema + _schema = _tmp.jsonValue()['fields'] + for _item in _schema : + del _item['nullable'],_item['metadata'] + except Exception as e: + + pass + return _schema + def _getPrefix (self,**_args): + _catName = self._catalogName if 'catalog' not in _args else _args['catalog'] + _datName = self._databaseName if 'database' not in _args else _args['database'] + + return '.'.join([_catName,_datName]) + + def has (self,**_args): + try: + _prefix = self._getPrefix(**_args) + if _prefix.endswith('.') : + return False + return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)] + except Exception as e: + print (e) + return False + def apply(self,sql): + pass +class Reader(Iceberg) : + def __init__(self,**_args): + super().__init__(**_args) + def read(self,**_args): + _table = self._table + _prefix = self._getPrefix(**_args) + if 'table' in _args or _table: + _table = _args['table'] if 'table' in _args else _table + _table = _prefix + f'.{_table}' + return self._session.table(_table).toPandas() + else: + sql = _args['sql'] + return self._session.sql(sql).toPandas() + pass +class Writer (Iceberg): + """ + Writing data to an Apache Iceberg data warehouse (using pyspark) + """ + def __init__(self,**_args): + super().__init__(**_args) + self._mode = 'append' + self._table = None if 'table' not in _args else _args['table'] + def write(self,_data,**_args): + _prefix = self._getPrefix(**_args) + if 'table' not in _args and not self._table : + raise Exception (f"Table Name should be specified for catalog/database {_prefix}") + rdd = self._session.createDataFrame(_data) + _mode = self._mode if 'mode' not in _args else _args['mode'] + _table = self._table if 'table' not in _args else _args['table'] + _table = f'{_prefix}.{_table}' + rdd.write.format('iceberg').mode(self._mode).save(_table)