Compare commits

..

11 Commits
v2.4 ... master

Author SHA1 Message Date
Steve L. Nyemba 492dc8f374 Merge pull request 'new provider console and bug fixes with applied commands' (#25) from v2.2.0 into master
1 month ago
Steve L. Nyemba e848367378 Merge pull request 'bug fix, duckdb in-memory handling' (#24) from v2.2.0 into master
1 month ago
Steve L. Nyemba c872ba8cc2 Merge pull request 'v2.2.0 - Bug fixes with mongodb, console' (#23) from v2.2.0 into master
1 month ago
Steve L. Nyemba baa8164f16 Merge pull request 'aws s3 notebook, brief example' (#22) from v2.2.0 into master
3 months ago
Steve L. Nyemba 31556ebd32 Merge pull request 'v2.2.0 bug fix - AWS-S3' (#21) from v2.2.0 into master
3 months ago
Steve L. Nyemba 1e7839198a Merge pull request 'v2.2.0 - shared environment support and duckdb support' (#20) from v2.2.0 into master
3 months ago
Steve L. Nyemba dce50a967e Merge pull request 'documentation ...' (#19) from v2.0.4 into master
4 months ago
Steve L. Nyemba 5ccb073865 Merge pull request 'refactor: etl,better reusability & streamlined and threaded' (#18) from v2.0.4 into master
4 months ago
Steve L. Nyemba 3081fb98e7 Merge pull request 'version 2.0 - Refactored, Plugins support' (#17) from v2.0 into master
6 months ago
Steve L. Nyemba 58959359ad Merge pull request 'bug fix: psycopg2 with numpy' (#14) from dev into master
8 months ago
Steve L. Nyemba 68b8f6af5f Merge pull request 'fixes 2024 pandas-gbq and sqlalchemy' (#10) from dev into master
8 months ago

@ -24,8 +24,7 @@ from multiprocessing import Process
import os
import transport
# from transport import etl
from transport.iowrapper import IETL
from transport import etl
# from transport import providers
import typer
from typing_extensions import Annotated
@ -61,13 +60,10 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil
_config = [_config[ int(index)]]
jobs = []
for _args in _config :
# pthread = etl.instance(**_args) #-- automatically starts the process
_worker = IETL(**_args)
pthread = Process(target=_worker.run)
pthread.start()
pthread = etl.instance(**_args) #-- automatically starts the process
jobs.append(pthread)
#
# @TODO: Log the number of processes started and estfrom transport impfrom transport impimated time
# @TODO: Log the number of processes started and estimated time
while jobs :
jobs = [pthread for pthread in jobs if pthread.is_alive()]
time.sleep(1)
@ -92,7 +88,6 @@ def version():
"""
print (transport.__app_name__,'version ',transport.__version__)
print ()
print (transport.__license__)
@app.command()

@ -1,6 +1,6 @@
__app_name__ = 'data-transport'
__author__ = 'The Phi Technology'
__version__= '2.4.0'
__version__= '2.2.6'
__email__ = "info@the-phi.com"
__license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba
@ -15,9 +15,7 @@ THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR I
__whatsnew__=f"""version {__version__}, focuses on collaborative environments like jupyter-base servers (apache zeppelin; jupyter notebook, jupyterlab, jupyterhub)
1. support for apache iceberg data warehouse using spark
2. Improved ETL & performance
3. bug fixes: mongodb
1. simpler syntax to create readers/writers
2. auth-file registry that can be referenced using a label
3. duckdb support
"""

@ -19,7 +19,7 @@ args = {
"packages": find_packages(include=['info','transport', 'transport.*'])}
args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite']
args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql','pyspark']
args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql']
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
args['scripts'] = ['bin/transport']
# if sys.version_info[0] == 2 :

@ -18,7 +18,7 @@ Source Code is available under MIT License:
"""
import numpy as np
from transport import sql, nosql, cloud, other, warehouse
from transport import sql, nosql, cloud, other
import pandas as pd
import json
import os
@ -33,7 +33,7 @@ PROVIDERS = {}
def init():
global PROVIDERS
for _module in [cloud,sql,nosql,other,warehouse] :
for _module in [cloud,sql,nosql,other] :
for _provider_name in dir(_module) :
if _provider_name.startswith('__') or _provider_name == 'common':
continue

@ -0,0 +1,19 @@
"""
This file will be intended to handle duckdb database
"""
import duckdb
from transport.common import Reader,Writer
class Duck(Reader):
def __init__(self,**_args):
super().__init__(**_args)
self._path = None if 'path' not in _args else _args['path']
self._handler = duckdb.connect() if not self._path else duckdb.connect(self._path)
class DuckReader(Duck) :
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args) :
pass

@ -103,14 +103,6 @@ class IETL(IReader) :
#
# If the parent is already multiprocessing
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
def run(self) :
"""
We should apply the etl here, if we are in multiprocessing mode
"""
_data = super().read()
for _kwargs in self._targets :
self.post(_data,**_kwargs)
def read(self,**_args):
_data = super().read(**_args)

@ -44,9 +44,7 @@ PGSQL = POSTGRESQL
AWS_S3 = 's3'
RABBIT = RABBITMQ
ICEBERG='iceberg'
APACHE_ICEBERG = 'iceberg'
DRILL = 'drill'
APACHE_DRILL = 'drill'
# QLISTENER = 'qlistener'

@ -9,13 +9,11 @@ import pandas as pd
class Base:
def __init__(self,**_args):
# print ([' ## ',_args])
self._host = _args['host'] if 'host' in _args else 'localhost'
self._port = None if 'port' not in _args else _args['port']
self._port = None
self._database = _args['database']
self._table = _args['table'] if 'table' in _args else None
self._engine= sqa.create_engine(self._get_uri(**_args),future=True)
self._chunksize = 0 if 'chunksize' not in _args else _args['chunksize']
def _set_uri(self,**_args) :
"""
:provider provider
@ -61,9 +59,7 @@ class Base:
@TODO: Execution of stored procedures
"""
if sql.lower().startswith('select') or sql.lower().startswith('with') :
if self._chunksize :
return pd.read_sql(sql,self._engine,chunksize=self._chunksize)
else:
return pd.read_sql(sql,self._engine)
else:
_handler = self._engine.connect()
@ -109,7 +105,6 @@ class BaseReader(SQLBase):
if 'sql' in _args :
sql = _args['sql']
else:
# print (dir (self))
_table = _args['table'] if 'table' in _args else self._table
sql = f'SELECT * FROM {_table}'
return self.apply(sql)

@ -1,7 +0,0 @@
"""
This namespace/package is intended to handle read/writes against data warehouse solutions like :
- apache iceberg
- clickhouse (...)
"""
from . import iceberg, drill

@ -1,115 +0,0 @@
"""
dependency:
- spark and SPARK_HOME environment variable must be set
"""
from pyspark.sql import SparkSession
from pyspark import SparkContext
import copy
class Iceberg :
def __init__(self,**_args):
"""
providing catalog meta information (you must get this from apache iceberg)
"""
#
# Turning off logging (it's annoying & un-professional)
#
# _spconf = SparkContext()
# _spconf.setLogLevel("ERROR")
#
# @TODO:
# Make arrangements for additional configuration elements
#
self._session = SparkSession.builder.getOrCreate()
# self._session.sparkContext.setLogLevel("ERROR")
self._catalog = self._session.catalog
self._table = _args['table'] if 'table' in _args else None
if 'catalog' in _args :
#
# Let us set the default catalog
self._catalog.setCurrentCatalog(_args['catalog'])
else:
# No current catalog has been set ...
pass
if 'database' in _args :
self._database = _args['database']
self._catalog.setCurrentDatabase(self._database)
else:
#
# Should we set the default as the first one if available ?
#
pass
self._catalogName = self._catalog.currentCatalog()
self._databaseName = self._catalog.currentDatabase()
def meta (self,**_args) :
"""
This function should return the schema of a table (only)
"""
_schema = []
try:
_tableName = self._getPrefix(**_args) + f".{_args['table']}"
print (_tableName)
_tmp = self._session.table(_tableName).schema
_schema = _tmp.jsonValue()['fields']
for _item in _schema :
del _item['nullable'],_item['metadata']
except Exception as e:
pass
return _schema
def _getPrefix (self,**_args):
_catName = self._catalogName if 'catalog' not in _args else _args['catalog']
_datName = self._databaseName if 'database' not in _args else _args['database']
return '.'.join([_catName,_datName])
def has (self,**_args):
try:
_prefix = self._getPrefix(**_args)
if _prefix.endswith('.') :
return False
return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)]
except Exception as e:
print (e)
return False
def apply(self,sql):
pass
class Reader(Iceberg) :
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args):
_table = self._table
_prefix = self._getPrefix(**_args)
if 'table' in _args or _table:
_table = _args['table'] if 'table' in _args else _table
_table = _prefix + f'.{_table}'
return self._session.table(_table).toPandas()
else:
sql = _args['sql']
return self._session.sql(sql).toPandas()
pass
class Writer (Iceberg):
"""
Writing data to an Apache Iceberg data warehouse (using pyspark)
"""
def __init__(self,**_args):
super().__init__(**_args)
self._mode = 'append' if 'mode' not in _args else _args['mode']
self._table = None if 'table' not in _args else _args['table']
def write(self,_data,**_args):
_prefix = self._getPrefix(**_args)
if 'table' not in _args and not self._table :
raise Exception (f"Table Name should be specified for catalog/database {_prefix}")
rdd = self._session.createDataFrame(_data)
_mode = self._mode if 'mode' not in _args else _args['mode']
_table = self._table if 'table' not in _args else _args['table']
if not self.has(table=_table) :
_mode = 'overwrite'
rdd.write.format('iceberg').mode(_mode).saveAsTable(_table)
else:
_table = f'{_prefix}.{_table}'
rdd.write.format('iceberg').mode(_mode).save(_table)
Loading…
Cancel
Save