v2.4 #5

Merged
steve merged 2 commits from v2.4 into main 5 days ago

@ -13,20 +13,28 @@ Data transport is a simple framework that:
## Installation ## Installation
Within the virtual environment perform the following : Within the virtual environment perform the following (the following will install everything):
pip install git+https://github.com/lnyemba/data-transport.git pip install data-transport[all]@git+https://github.com/lnyemba/data-transport.git
Options to install components in square brackets Options to install components in square brackets are **nosql**; **cloud**; **other** and **warehouse**
pip install data-transport[nosql,cloud,warehouse,all]@git+https://github.com/lnyemba/data-transport.git pip install data-transport[nosql,cloud,other, warehouse,all]@git+https://github.com/lnyemba/data-transport.git
The components available:
0. sql by default netezza; mysql; postgresql; duckdb; sqlite3; sqlserver
1. nosql mongodb/ferretdb; couchdb
2. cloud s3; bigquery; databricks
3. other files; http; rabbitmq
4. warehouse apache drill; apache iceberg
## Additional features ## Additional features
- In addition to read/write, there is support for functions for pre/post processing - Reads are separated from writes to avoid accidental writes.
- Streaming (for large volumes of data) by specifying chunksize
- CLI interface to add to registry, run ETL - CLI interface to add to registry, run ETL
- scales and integrates into shared environments like apache zeppelin; jupyterhub; SageMaker; ... - Implements best-pracices for collaborative environments like apache zeppelin; jupyterhub; SageMaker; ...
## Learn More ## Learn More

@ -53,6 +53,8 @@ def wait(jobs):
while jobs : while jobs :
jobs = [thread for thread in jobs if thread.is_alive()] jobs = [thread for thread in jobs if thread.is_alive()]
time.sleep(1) time.sleep(1)
def job (_args):
pass
@app_e.command(name="run") @app_e.command(name="run")
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")], def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"), index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"),
@ -177,7 +179,25 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be
except Exception as e: except Exception as e:
_msg = f"""{TIMES_MARK} {e}""" _msg = f"""{TIMES_MARK} {e}"""
print (_msg) print (_msg)
@app_r.command(name="template")
def template(name:Annotated[str,typer.Argument(help="database technology provider" ) ]):
"""
This function will generate a template entry for the registry (content of an auth file)
"""
#
# retrieve the provider and display the template if it has one
for _module in ['sql','cloud','warehouse','nosql','other'] :
ref = getattr(transport,_module) if hasattr(transport,_module) else None
_entry = {}
if ref :
if hasattr(ref,name) :
_pointer = getattr(ref,name)
_entry = dict({'provider':name},**_pointer.template()) if hasattr(_pointer,'template') else {}
break
#
#
print ( json.dumps(_entry))
pass
@app_r.command(name="list") @app_r.command(name="list")
def register_list (): def register_list ():
""" """

@ -14,7 +14,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 1, "execution_count": null,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -58,7 +58,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 2, "execution_count": null,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -103,7 +103,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 4, "execution_count": null,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -131,16 +131,28 @@
] ]
}, },
{ {
"cell_type": "code", "cell_type": "markdown",
"execution_count": null, "metadata": {},
"source": [
"#### Streaming Large Volumes of Data\n",
"\n",
"It is recommended for large volumes of data to stream the data using **chunksize** as a parameter \n",
"\n",
"1. in the **read** method \n",
"2. or **transport.get.reader(\\*\\*...,chunksize=1000)**\n",
"\n",
"Use streaming because with large volumes of data some databases limit the volume of data for a single transaction in order to efficiently guarantee maintain **data integrity**"
]
},
{
"cell_type": "markdown",
"metadata": {}, "metadata": {},
"outputs": [],
"source": [] "source": []
} }
], ],
"metadata": { "metadata": {
"kernelspec": { "kernelspec": {
"display_name": "Python 3", "display_name": "python (3.10.12)",
"language": "python", "language": "python",
"name": "python3" "name": "python3"
}, },
@ -154,7 +166,7 @@
"name": "python", "name": "python",
"nbconvert_exporter": "python", "nbconvert_exporter": "python",
"pygments_lexer": "ipython3", "pygments_lexer": "ipython3",
"version": "3.9.7" "version": "3.10.12"
} }
}, },
"nbformat": 4, "nbformat": 4,

@ -14,6 +14,9 @@ import numpy as np
import time import time
MAX_CHUNK = 2000000 MAX_CHUNK = 2000000
def template ():
return {'provider':'bigquery','private_key':'path-to-key','dataset':'name-of-dataset','table':'table','chunksize':MAX_CHUNK}
class BigQuery: class BigQuery:
__template__= {"private_key":None,"dataset":None,"table":None} __template__= {"private_key":None,"dataset":None,"table":None}
def __init__(self,**_args): def __init__(self,**_args):
@ -24,6 +27,7 @@ class BigQuery:
self.dtypes = _args['dtypes'] if 'dtypes' in _args else None self.dtypes = _args['dtypes'] if 'dtypes' in _args else None
self.table = _args['table'] if 'table' in _args else None self.table = _args['table'] if 'table' in _args else None
self.client = bq.Client.from_service_account_json(self.path) self.client = bq.Client.from_service_account_json(self.path)
self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None
def meta(self,**_args): def meta(self,**_args):
""" """
This function returns meta data for a given table or query with dataset/table properly formatted This function returns meta data for a given table or query with dataset/table properly formatted
@ -82,6 +86,13 @@ class Reader (BigQuery):
if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset: if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset:
SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset) SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset)
_info = {'credentials':self.credentials,'dialect':'standard'} _info = {'credentials':self.credentials,'dialect':'standard'}
#
# @Ent-Feature : adding streaming capability here
#
if 'chunksize' in _args :
self._chunksize = int(_args['chunksize'])
if self._chunksize :
_info['chunksize'] = self._chunksize
return pd_gbq.read_gbq(SQL,**_info) if SQL else None return pd_gbq.read_gbq(SQL,**_info) if SQL else None
# return self.client.query(SQL).to_dataframe() if SQL else None # return self.client.query(SQL).to_dataframe() if SQL else None

@ -17,6 +17,8 @@ import sqlalchemy
# from transport.common import Reader,Writer # from transport.common import Reader,Writer
import pandas as pd import pandas as pd
def template ():
return {'provider':'databricks','host':'fqn-host','token':'token','cluster_path':'path-of-cluster','catalog':'name-of-catalog','database':'schema-or-database','table':'table','chunksize':10000}
class Bricks: class Bricks:
""" """
@ -42,6 +44,7 @@ class Bricks:
_uri = f'''databricks+connector://token:{_token}@{_host}?http_path={_cluster_path}&catalog={_catalog}&schema={self._schema}''' _uri = f'''databricks+connector://token:{_token}@{_host}?http_path={_cluster_path}&catalog={_catalog}&schema={self._schema}'''
self._engine = sqlalchemy.create_engine (_uri) self._engine = sqlalchemy.create_engine (_uri)
self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None
pass pass
def meta(self,**_args): def meta(self,**_args):
table = _args['table'] if 'table' in _args else self._table table = _args['table'] if 'table' in _args else self._table
@ -64,7 +67,14 @@ class Bricks:
def apply(self,_sql): def apply(self,_sql):
try: try:
if _sql.lower().startswith('select') : if _sql.lower().startswith('select') :
#
# @ENT-Feature: adding streaming functions/variables
if not self._chunksize :
return pd.read_sql(_sql,self._engine) return pd.read_sql(_sql,self._engine)
else:
return pd.read_sql(_sql,self._engine,chunksize=self._chunksize)
except Exception as e: except Exception as e:
pass pass
@ -84,7 +94,10 @@ class Reader(Bricks):
sql = f'SELECT * FROM {table}' sql = f'SELECT * FROM {table}'
if limit : if limit :
sql = sql + f' LIMIT {limit}' sql = sql + f' LIMIT {limit}'
#
# @ENT-Feature: adding streaming functions/variables
if 'chunksize' in _args :
self._chunksize = int(_args['chunksize'])
if 'sql' in _args or 'table' in _args : if 'sql' in _args or 'table' in _args :
return self.apply(sql) return self.apply(sql)
else: else:

@ -8,7 +8,8 @@ import pandas as pd
from io import StringIO from io import StringIO
import json import json
import nextcloud_client as nextcloud import nextcloud_client as nextcloud
def template():
return {"url":None,"token":None,"uid":None,"file":None}
class Nextcloud : class Nextcloud :
__template__={"url":None,"token":None,"uid":None,"file":None} __template__={"url":None,"token":None,"uid":None,"file":None}
def __init__(self,**_args): def __init__(self,**_args):

@ -20,6 +20,9 @@ from io import StringIO
import pandas as pd import pandas as pd
import json import json
def template():
return {'access_key':'access-key','secret_key':'secret-key','region':'region','bucket':'name-of-bucket','file':'file-name','chunksize':10000}
class s3 : class s3 :
""" """
@TODO: Implement a search function for a file given a bucket?? @TODO: Implement a search function for a file given a bucket??
@ -38,6 +41,7 @@ class s3 :
self._bucket_name = args['bucket'] self._bucket_name = args['bucket']
self._file_name = args['file'] self._file_name = args['file']
self._region = args['region'] self._region = args['region']
self._chunksize = int(args['chunksize']) if 'chunksize' in args else None
except Exception as e : except Exception as e :
print (e) print (e)
pass pass
@ -89,7 +93,10 @@ class Reader(s3) :
if not _stream : if not _stream :
return None return None
if _object['ContentType'] in ['text/csv'] : if _object['ContentType'] in ['text/csv'] :
if not self._chunksize :
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'",""))) return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")))
else:
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")),chunksize=self._chunksize)
else: else:
return _stream return _stream

@ -1,6 +1,6 @@
__app_name__ = 'data-transport' __app_name__ = 'data-transport'
__author__ = 'Steve L. Nyemba' __author__ = 'Steve L. Nyemba'
__version__= '2.4.32' __version__= '2.4.34'
__edition__= 'enterprise' __edition__= 'enterprise'
__email__ = "info@the-phi.com" __email__ = "info@the-phi.com"
__license__=f""" __license__=f"""

@ -163,6 +163,7 @@ class IReader(IO):
_objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__]) _objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__])
if types.GeneratorType == type(_data): if types.GeneratorType == type(_data):
return self._stream(_data) return self._stream(_data)
# if self._plugins : # if self._plugins :
# return self._stream(_data) # return self._stream(_data)
@ -278,7 +279,7 @@ class IETL(BaseIO) :
if _item['type'] in ['INTEGER','BIGINT','INT'] : if _item['type'] in ['INTEGER','BIGINT','INT'] :
_column = _item['name'] _column = _item['name']
_data[_column] = _data[_column].copy().fillna(0).astype(np.int64) _data[_column] = _data[_column].copy().fillna(0).astype(np.int64)
writer.write(_data,schema=_schema) writer.write(_data)
except Exception as e: except Exception as e:
_action = 'post-error' _action = 'post-error'
_input['error'] = str(e) _input['error'] = str(e)

@ -11,7 +11,8 @@ import sys
# from transport.common import Reader, Writer # from transport.common import Reader, Writer
from datetime import datetime from datetime import datetime
def template():
return {'dbname':'database','doc':'document','username':'username','password':'password','url':'url-with-port'}
class Couch: class Couch:
""" """
This class is a wrapper for read/write against couchdb. The class captures common operations for read/write. This class is a wrapper for read/write against couchdb. The class captures common operations for read/write.

@ -20,6 +20,9 @@ import re
from multiprocessing import Lock, RLock from multiprocessing import Lock, RLock
from transport.common import IEncoder from transport.common import IEncoder
def template():
return {'provider':'mongodb','host':'localhost','port':27017,'db':'db-name','collection':'collection-name','username':'username','password':'password','mechanism':'SCRAM-SHA-256'}
class Mongo : class Mongo :
lock = RLock() lock = RLock()
""" """

@ -4,6 +4,10 @@ This file is a wrapper around pandas built-in functionalities to handle characte
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import os import os
def template():
return {'path':None,'delimiter':None}
class File : class File :
def __init__(self,**params): def __init__(self,**params):
""" """

@ -7,6 +7,8 @@ import requests
from io import StringIO from io import StringIO
import pandas as pd import pandas as pd
def template():
return {'url':None,'headers':{'key':'value'}}
class Reader: class Reader:
""" """

@ -17,6 +17,10 @@ import sys
# from common import Reader, Writer # from common import Reader, Writer
import json import json
from multiprocessing import RLock from multiprocessing import RLock
def template():
return {'port':5672,'host':'localhost','queue':None,'vhost':None,'username':None,'password':None}
class MessageQueue: class MessageQueue:
""" """
This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq) This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)

@ -1,11 +1,14 @@
""" """
This file encapsulates common operations associated with SQL databases via SQLAlchemy This file encapsulates common operations associated with SQL databases via SQLAlchemy
@ENT:
- To support streaming (with generators) we the parameter chunksize which essentially enables streaming
""" """
import sqlalchemy as sqa import sqlalchemy as sqa
from sqlalchemy import text , MetaData, inspect from sqlalchemy import text , MetaData, inspect
import pandas as pd import pandas as pd
def template():
return {'host':'localhost','database':'database','table':'table'}
class Base: class Base:
def __init__(self,**_args): def __init__(self,**_args):
@ -20,6 +23,7 @@ class Base:
_uri,_kwargs = _uri _uri,_kwargs = _uri
self._engine= sqa.create_engine(_uri,**_kwargs,future=True) self._engine= sqa.create_engine(_uri,**_kwargs,future=True)
self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None
def _set_uri(self,**_args) : def _set_uri(self,**_args) :
""" """
:provider provider :provider provider
@ -78,7 +82,12 @@ class Base:
@TODO: Execution of stored procedures @TODO: Execution of stored procedures
""" """
if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'): if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'):
if not self._chunksize:
return pd.read_sql(sql,self._engine) return pd.read_sql(sql,self._engine)
else:
return pd.read_sql(sql,self._engine,chunksize=self._chunksize)
else: else:
_handler = self._engine.connect() _handler = self._engine.connect()
_handler.execute(text(sql.strip())) _handler.execute(text(sql.strip()))
@ -132,9 +141,14 @@ class BaseReader(SQLBase):
if self._schema and type(self._schema) == str : if self._schema and type(self._schema) == str :
_table = f'{self._schema}.{_table}' _table = f'{self._schema}.{_table}'
sql = f'SELECT * FROM {_table}' sql = f'SELECT * FROM {_table}'
if 'chunksize' in _args :
self._chunksize = int(_args['chunksize'])
return self.apply(sql) return self.apply(sql)
class BaseWriter (SQLBase): class BaseWriter (SQLBase):
""" """
This class implements SQLAlchemy support for Writting to a data-store (RDBMS) This class implements SQLAlchemy support for Writting to a data-store (RDBMS)

@ -3,6 +3,9 @@ This module implements the handler for duckdb (in memory or not)
""" """
from transport.sql.common import Base, BaseReader, BaseWriter from transport.sql.common import Base, BaseReader, BaseWriter
def template ():
return {'database':'path-to-database','table':'table'}
class Duck : class Duck :
def __init__(self,**_args): def __init__(self,**_args):
# #

@ -1,8 +1,11 @@
""" """
This file implements support for mysql and maria db (with drivers mysql+mysql) This file implements support for mysql and maria db (with drivers mysql+mysql)
""" """
from transport.sql.common import BaseReader, BaseWriter from transport.sql.common import BaseReader, BaseWriter, template as _template
# import mysql.connector as my # import mysql.connector as my
def template ():
return dict(_template(),**{'port':3306})
class MYSQL: class MYSQL:
def get_provider(self): def get_provider(self):

@ -1,5 +1,8 @@
import nzpy as nz import nzpy as nz
from transport.sql.common import BaseReader, BaseWriter from transport.sql.common import BaseReader, BaseWriter , template as _template
def template ():
return dict(_template(),**{'port':5480,'chunksize':10000})
class Netezza: class Netezza:
def get_provider(self): def get_provider(self):

@ -1,7 +1,10 @@
from transport.sql.common import BaseReader , BaseWriter from transport.sql.common import BaseReader , BaseWriter, template as _template
from psycopg2.extensions import register_adapter, AsIs from psycopg2.extensions import register_adapter, AsIs
import numpy as np import numpy as np
def template ():
return dict(_template(),**{'port':5432,'chunksize':10000})
register_adapter(np.int64, AsIs) register_adapter(np.int64, AsIs)

@ -2,6 +2,8 @@ import sqlalchemy
import pandas as pd import pandas as pd
from transport.sql.common import Base, BaseReader, BaseWriter from transport.sql.common import Base, BaseReader, BaseWriter
from multiprocessing import RLock from multiprocessing import RLock
def template():
return {'database':'path-to-database','table':'table'}
class SQLite3 : class SQLite3 :
lock = RLock() lock = RLock()
def __init__(self,**_args): def __init__(self,**_args):

@ -3,7 +3,11 @@ Handling Microsoft SQL Server via pymssql driver/connector
""" """
import sqlalchemy import sqlalchemy
import pandas as pd import pandas as pd
from transport.sql.common import Base, BaseReader, BaseWriter from transport.sql.common import Base, BaseReader, BaseWriter, template as _template
def template ():
return dict(_template(),**{'port':1433})
class MsSQLServer: class MsSQLServer:

@ -3,6 +3,8 @@ import pandas as pd
from .. sql.common import BaseReader , BaseWriter from .. sql.common import BaseReader , BaseWriter
import sqlalchemy as sqa import sqlalchemy as sqa
def template():
return {'host':'localhost','port':8047,'ssl':False,'table':None,'database':None}
class Drill : class Drill :
__template = {'host':None,'port':None,'ssl':None,'table':None,'database':None} __template = {'host':None,'port':None,'ssl':None,'table':None,'database':None}
def __init__(self,**_args): def __init__(self,**_args):

@ -11,6 +11,10 @@ from pyspark.sql.types import *
from pyspark.sql.functions import col, to_date, to_timestamp from pyspark.sql.functions import col, to_date, to_timestamp
import copy import copy
def template():
return {'catalog':None,'database':None,'table':None}
class Iceberg : class Iceberg :
def __init__(self,**_args): def __init__(self,**_args):
""" """

Loading…
Cancel
Save