Compare commits

...

4 Commits
master ... v2.4

@ -24,7 +24,8 @@ from multiprocessing import Process
import os import os
import transport import transport
from transport import etl # from transport import etl
from transport.iowrapper import IETL
# from transport import providers # from transport import providers
import typer import typer
from typing_extensions import Annotated from typing_extensions import Annotated
@ -60,10 +61,13 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil
_config = [_config[ int(index)]] _config = [_config[ int(index)]]
jobs = [] jobs = []
for _args in _config : for _args in _config :
pthread = etl.instance(**_args) #-- automatically starts the process # pthread = etl.instance(**_args) #-- automatically starts the process
_worker = IETL(**_args)
pthread = Process(target=_worker.run)
pthread.start()
jobs.append(pthread) jobs.append(pthread)
# #
# @TODO: Log the number of processes started and estimated time # @TODO: Log the number of processes started and estfrom transport impfrom transport impimated time
while jobs : while jobs :
jobs = [pthread for pthread in jobs if pthread.is_alive()] jobs = [pthread for pthread in jobs if pthread.is_alive()]
time.sleep(1) time.sleep(1)
@ -88,6 +92,7 @@ def version():
""" """
print (transport.__app_name__,'version ',transport.__version__) print (transport.__app_name__,'version ',transport.__version__)
print ()
print (transport.__license__) print (transport.__license__)
@app.command() @app.command()

@ -1,6 +1,6 @@
__app_name__ = 'data-transport' __app_name__ = 'data-transport'
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '2.2.6' __version__= '2.4.0'
__email__ = "info@the-phi.com" __email__ = "info@the-phi.com"
__license__=f""" __license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba Copyright 2010 - 2024, Steve L. Nyemba
@ -15,7 +15,9 @@ THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR I
__whatsnew__=f"""version {__version__}, focuses on collaborative environments like jupyter-base servers (apache zeppelin; jupyter notebook, jupyterlab, jupyterhub) __whatsnew__=f"""version {__version__}, focuses on collaborative environments like jupyter-base servers (apache zeppelin; jupyter notebook, jupyterlab, jupyterhub)
1. simpler syntax to create readers/writers 1. support for apache iceberg data warehouse using spark
2. auth-file registry that can be referenced using a label 2. Improved ETL & performance
3. duckdb support 3. bug fixes: mongodb
""" """

@ -19,7 +19,7 @@ args = {
"packages": find_packages(include=['info','transport', 'transport.*'])} "packages": find_packages(include=['info','transport', 'transport.*'])}
args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite'] args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite']
args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql'] args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql','pyspark']
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
args['scripts'] = ['bin/transport'] args['scripts'] = ['bin/transport']
# if sys.version_info[0] == 2 : # if sys.version_info[0] == 2 :

@ -18,7 +18,7 @@ Source Code is available under MIT License:
""" """
import numpy as np import numpy as np
from transport import sql, nosql, cloud, other from transport import sql, nosql, cloud, other, warehouse
import pandas as pd import pandas as pd
import json import json
import os import os
@ -33,7 +33,7 @@ PROVIDERS = {}
def init(): def init():
global PROVIDERS global PROVIDERS
for _module in [cloud,sql,nosql,other] : for _module in [cloud,sql,nosql,other,warehouse] :
for _provider_name in dir(_module) : for _provider_name in dir(_module) :
if _provider_name.startswith('__') or _provider_name == 'common': if _provider_name.startswith('__') or _provider_name == 'common':
continue continue

@ -1,19 +0,0 @@
"""
This file will be intended to handle duckdb database
"""
import duckdb
from transport.common import Reader,Writer
class Duck(Reader):
def __init__(self,**_args):
super().__init__(**_args)
self._path = None if 'path' not in _args else _args['path']
self._handler = duckdb.connect() if not self._path else duckdb.connect(self._path)
class DuckReader(Duck) :
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args) :
pass

@ -103,6 +103,14 @@ class IETL(IReader) :
# #
# If the parent is already multiprocessing # If the parent is already multiprocessing
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
def run(self) :
"""
We should apply the etl here, if we are in multiprocessing mode
"""
_data = super().read()
for _kwargs in self._targets :
self.post(_data,**_kwargs)
def read(self,**_args): def read(self,**_args):
_data = super().read(**_args) _data = super().read(**_args)

@ -44,7 +44,9 @@ PGSQL = POSTGRESQL
AWS_S3 = 's3' AWS_S3 = 's3'
RABBIT = RABBITMQ RABBIT = RABBITMQ
ICEBERG='iceberg'
APACHE_ICEBERG = 'iceberg'
DRILL = 'drill'
APACHE_DRILL = 'drill'
# QLISTENER = 'qlistener' # QLISTENER = 'qlistener'

@ -9,11 +9,13 @@ import pandas as pd
class Base: class Base:
def __init__(self,**_args): def __init__(self,**_args):
# print ([' ## ',_args])
self._host = _args['host'] if 'host' in _args else 'localhost' self._host = _args['host'] if 'host' in _args else 'localhost'
self._port = None self._port = None if 'port' not in _args else _args['port']
self._database = _args['database'] self._database = _args['database']
self._table = _args['table'] if 'table' in _args else None self._table = _args['table'] if 'table' in _args else None
self._engine= sqa.create_engine(self._get_uri(**_args),future=True) self._engine= sqa.create_engine(self._get_uri(**_args),future=True)
self._chunksize = 0 if 'chunksize' not in _args else _args['chunksize']
def _set_uri(self,**_args) : def _set_uri(self,**_args) :
""" """
:provider provider :provider provider
@ -59,8 +61,10 @@ class Base:
@TODO: Execution of stored procedures @TODO: Execution of stored procedures
""" """
if sql.lower().startswith('select') or sql.lower().startswith('with') : if sql.lower().startswith('select') or sql.lower().startswith('with') :
if self._chunksize :
return pd.read_sql(sql,self._engine) return pd.read_sql(sql,self._engine,chunksize=self._chunksize)
else:
return pd.read_sql(sql,self._engine)
else: else:
_handler = self._engine.connect() _handler = self._engine.connect()
_handler.execute(text(sql)) _handler.execute(text(sql))
@ -105,6 +109,7 @@ class BaseReader(SQLBase):
if 'sql' in _args : if 'sql' in _args :
sql = _args['sql'] sql = _args['sql']
else: else:
# print (dir (self))
_table = _args['table'] if 'table' in _args else self._table _table = _args['table'] if 'table' in _args else self._table
sql = f'SELECT * FROM {_table}' sql = f'SELECT * FROM {_table}'
return self.apply(sql) return self.apply(sql)

@ -0,0 +1,7 @@
"""
This namespace/package is intended to handle read/writes against data warehouse solutions like :
- apache iceberg
- clickhouse (...)
"""
from . import iceberg, drill

@ -0,0 +1,115 @@
"""
dependency:
- spark and SPARK_HOME environment variable must be set
"""
from pyspark.sql import SparkSession
from pyspark import SparkContext
import copy
class Iceberg :
def __init__(self,**_args):
"""
providing catalog meta information (you must get this from apache iceberg)
"""
#
# Turning off logging (it's annoying & un-professional)
#
# _spconf = SparkContext()
# _spconf.setLogLevel("ERROR")
#
# @TODO:
# Make arrangements for additional configuration elements
#
self._session = SparkSession.builder.getOrCreate()
# self._session.sparkContext.setLogLevel("ERROR")
self._catalog = self._session.catalog
self._table = _args['table'] if 'table' in _args else None
if 'catalog' in _args :
#
# Let us set the default catalog
self._catalog.setCurrentCatalog(_args['catalog'])
else:
# No current catalog has been set ...
pass
if 'database' in _args :
self._database = _args['database']
self._catalog.setCurrentDatabase(self._database)
else:
#
# Should we set the default as the first one if available ?
#
pass
self._catalogName = self._catalog.currentCatalog()
self._databaseName = self._catalog.currentDatabase()
def meta (self,**_args) :
"""
This function should return the schema of a table (only)
"""
_schema = []
try:
_tableName = self._getPrefix(**_args) + f".{_args['table']}"
print (_tableName)
_tmp = self._session.table(_tableName).schema
_schema = _tmp.jsonValue()['fields']
for _item in _schema :
del _item['nullable'],_item['metadata']
except Exception as e:
pass
return _schema
def _getPrefix (self,**_args):
_catName = self._catalogName if 'catalog' not in _args else _args['catalog']
_datName = self._databaseName if 'database' not in _args else _args['database']
return '.'.join([_catName,_datName])
def has (self,**_args):
try:
_prefix = self._getPrefix(**_args)
if _prefix.endswith('.') :
return False
return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)]
except Exception as e:
print (e)
return False
def apply(self,sql):
pass
class Reader(Iceberg) :
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args):
_table = self._table
_prefix = self._getPrefix(**_args)
if 'table' in _args or _table:
_table = _args['table'] if 'table' in _args else _table
_table = _prefix + f'.{_table}'
return self._session.table(_table).toPandas()
else:
sql = _args['sql']
return self._session.sql(sql).toPandas()
pass
class Writer (Iceberg):
"""
Writing data to an Apache Iceberg data warehouse (using pyspark)
"""
def __init__(self,**_args):
super().__init__(**_args)
self._mode = 'append' if 'mode' not in _args else _args['mode']
self._table = None if 'table' not in _args else _args['table']
def write(self,_data,**_args):
_prefix = self._getPrefix(**_args)
if 'table' not in _args and not self._table :
raise Exception (f"Table Name should be specified for catalog/database {_prefix}")
rdd = self._session.createDataFrame(_data)
_mode = self._mode if 'mode' not in _args else _args['mode']
_table = self._table if 'table' not in _args else _args['table']
if not self.has(table=_table) :
_mode = 'overwrite'
rdd.write.format('iceberg').mode(_mode).saveAsTable(_table)
else:
_table = f'{_prefix}.{_table}'
rdd.write.format('iceberg').mode(_mode).save(_table)
Loading…
Cancel
Save