Compare commits
11 Commits
Author | SHA1 | Date |
---|---|---|
Steve L. Nyemba | 492dc8f374 | 1 month ago |
Steve L. Nyemba | e848367378 | 1 month ago |
Steve L. Nyemba | c872ba8cc2 | 1 month ago |
Steve L. Nyemba | baa8164f16 | 3 months ago |
Steve L. Nyemba | 31556ebd32 | 3 months ago |
Steve L. Nyemba | 1e7839198a | 3 months ago |
Steve L. Nyemba | dce50a967e | 4 months ago |
Steve L. Nyemba | 5ccb073865 | 4 months ago |
Steve L. Nyemba | 3081fb98e7 | 6 months ago |
Steve L. Nyemba | 58959359ad | 8 months ago |
Steve L. Nyemba | 68b8f6af5f | 8 months ago |
@ -0,0 +1,19 @@
|
|||||||
|
"""
|
||||||
|
This file will be intended to handle duckdb database
|
||||||
|
"""
|
||||||
|
|
||||||
|
import duckdb
|
||||||
|
from transport.common import Reader,Writer
|
||||||
|
|
||||||
|
class Duck(Reader):
|
||||||
|
def __init__(self,**_args):
|
||||||
|
super().__init__(**_args)
|
||||||
|
self._path = None if 'path' not in _args else _args['path']
|
||||||
|
self._handler = duckdb.connect() if not self._path else duckdb.connect(self._path)
|
||||||
|
|
||||||
|
|
||||||
|
class DuckReader(Duck) :
|
||||||
|
def __init__(self,**_args):
|
||||||
|
super().__init__(**_args)
|
||||||
|
def read(self,**_args) :
|
||||||
|
pass
|
@ -1,7 +0,0 @@
|
|||||||
"""
|
|
||||||
This namespace/package is intended to handle read/writes against data warehouse solutions like :
|
|
||||||
- apache iceberg
|
|
||||||
- clickhouse (...)
|
|
||||||
"""
|
|
||||||
|
|
||||||
from . import iceberg, drill
|
|
@ -1,115 +0,0 @@
|
|||||||
"""
|
|
||||||
dependency:
|
|
||||||
- spark and SPARK_HOME environment variable must be set
|
|
||||||
"""
|
|
||||||
from pyspark.sql import SparkSession
|
|
||||||
from pyspark import SparkContext
|
|
||||||
|
|
||||||
import copy
|
|
||||||
|
|
||||||
class Iceberg :
|
|
||||||
def __init__(self,**_args):
|
|
||||||
"""
|
|
||||||
providing catalog meta information (you must get this from apache iceberg)
|
|
||||||
"""
|
|
||||||
#
|
|
||||||
# Turning off logging (it's annoying & un-professional)
|
|
||||||
#
|
|
||||||
# _spconf = SparkContext()
|
|
||||||
# _spconf.setLogLevel("ERROR")
|
|
||||||
#
|
|
||||||
# @TODO:
|
|
||||||
# Make arrangements for additional configuration elements
|
|
||||||
#
|
|
||||||
self._session = SparkSession.builder.getOrCreate()
|
|
||||||
# self._session.sparkContext.setLogLevel("ERROR")
|
|
||||||
self._catalog = self._session.catalog
|
|
||||||
self._table = _args['table'] if 'table' in _args else None
|
|
||||||
|
|
||||||
if 'catalog' in _args :
|
|
||||||
#
|
|
||||||
# Let us set the default catalog
|
|
||||||
self._catalog.setCurrentCatalog(_args['catalog'])
|
|
||||||
|
|
||||||
else:
|
|
||||||
# No current catalog has been set ...
|
|
||||||
pass
|
|
||||||
if 'database' in _args :
|
|
||||||
self._database = _args['database']
|
|
||||||
self._catalog.setCurrentDatabase(self._database)
|
|
||||||
else:
|
|
||||||
#
|
|
||||||
# Should we set the default as the first one if available ?
|
|
||||||
#
|
|
||||||
pass
|
|
||||||
self._catalogName = self._catalog.currentCatalog()
|
|
||||||
self._databaseName = self._catalog.currentDatabase()
|
|
||||||
def meta (self,**_args) :
|
|
||||||
"""
|
|
||||||
This function should return the schema of a table (only)
|
|
||||||
"""
|
|
||||||
_schema = []
|
|
||||||
try:
|
|
||||||
_tableName = self._getPrefix(**_args) + f".{_args['table']}"
|
|
||||||
print (_tableName)
|
|
||||||
_tmp = self._session.table(_tableName).schema
|
|
||||||
_schema = _tmp.jsonValue()['fields']
|
|
||||||
for _item in _schema :
|
|
||||||
del _item['nullable'],_item['metadata']
|
|
||||||
except Exception as e:
|
|
||||||
|
|
||||||
pass
|
|
||||||
return _schema
|
|
||||||
def _getPrefix (self,**_args):
|
|
||||||
_catName = self._catalogName if 'catalog' not in _args else _args['catalog']
|
|
||||||
_datName = self._databaseName if 'database' not in _args else _args['database']
|
|
||||||
|
|
||||||
return '.'.join([_catName,_datName])
|
|
||||||
|
|
||||||
def has (self,**_args):
|
|
||||||
try:
|
|
||||||
_prefix = self._getPrefix(**_args)
|
|
||||||
if _prefix.endswith('.') :
|
|
||||||
return False
|
|
||||||
return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)]
|
|
||||||
except Exception as e:
|
|
||||||
print (e)
|
|
||||||
return False
|
|
||||||
def apply(self,sql):
|
|
||||||
pass
|
|
||||||
class Reader(Iceberg) :
|
|
||||||
def __init__(self,**_args):
|
|
||||||
super().__init__(**_args)
|
|
||||||
def read(self,**_args):
|
|
||||||
_table = self._table
|
|
||||||
_prefix = self._getPrefix(**_args)
|
|
||||||
if 'table' in _args or _table:
|
|
||||||
_table = _args['table'] if 'table' in _args else _table
|
|
||||||
_table = _prefix + f'.{_table}'
|
|
||||||
return self._session.table(_table).toPandas()
|
|
||||||
else:
|
|
||||||
sql = _args['sql']
|
|
||||||
return self._session.sql(sql).toPandas()
|
|
||||||
pass
|
|
||||||
class Writer (Iceberg):
|
|
||||||
"""
|
|
||||||
Writing data to an Apache Iceberg data warehouse (using pyspark)
|
|
||||||
"""
|
|
||||||
def __init__(self,**_args):
|
|
||||||
super().__init__(**_args)
|
|
||||||
self._mode = 'append' if 'mode' not in _args else _args['mode']
|
|
||||||
self._table = None if 'table' not in _args else _args['table']
|
|
||||||
def write(self,_data,**_args):
|
|
||||||
_prefix = self._getPrefix(**_args)
|
|
||||||
if 'table' not in _args and not self._table :
|
|
||||||
raise Exception (f"Table Name should be specified for catalog/database {_prefix}")
|
|
||||||
rdd = self._session.createDataFrame(_data)
|
|
||||||
_mode = self._mode if 'mode' not in _args else _args['mode']
|
|
||||||
_table = self._table if 'table' not in _args else _args['table']
|
|
||||||
|
|
||||||
if not self.has(table=_table) :
|
|
||||||
_mode = 'overwrite'
|
|
||||||
rdd.write.format('iceberg').mode(_mode).saveAsTable(_table)
|
|
||||||
else:
|
|
||||||
_table = f'{_prefix}.{_table}'
|
|
||||||
rdd.write.format('iceberg').mode(_mode).save(_table)
|
|
Loading…
Reference in new issue