diff --git a/bin/transport b/bin/transport index 37aa82c..c350958 100755 --- a/bin/transport +++ b/bin/transport @@ -53,6 +53,8 @@ def wait(jobs): while jobs : jobs = [thread for thread in jobs if thread.is_alive()] time.sleep(1) +def job (_args): + pass @app_e.command(name="run") def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")], index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"), @@ -177,7 +179,25 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be except Exception as e: _msg = f"""{TIMES_MARK} {e}""" print (_msg) - +@app_r.command(name="template") +def template(name:Annotated[str,typer.Argument(help="database technology provider" ) ]): + """ + This function will generate a template entry for the registry (content of an auth file) + """ + # + # retrieve the provider and display the template if it has one + for _module in ['sql','cloud','warehouse','nosql','other'] : + ref = getattr(transport,_module) if hasattr(transport,_module) else None + _entry = {} + if ref : + if hasattr(ref,name) : + _pointer = getattr(ref,name) + _entry = dict({'provider':name},**_pointer.template()) if hasattr(_pointer,'template') else {} + break + # + # + print ( json.dumps(_entry)) + pass @app_r.command(name="list") def register_list (): """ diff --git a/notebooks/postgresql.ipynb b/notebooks/postgresql.ipynb index 85f4322..057f20b 100644 --- a/notebooks/postgresql.ipynb +++ b/notebooks/postgresql.ipynb @@ -14,7 +14,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -58,7 +58,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -103,7 +103,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -131,16 +131,28 @@ ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Streaming Large Volumes of Data\n", + "\n", + "It is recommended for large volumes of data to stream the data using **chunksize** as a parameter \n", + "\n", + "1. in the **read** method \n", + "2. or **transport.get.reader(\\*\\*...,chunksize=1000)**\n", + "\n", + "Use streaming because with large volumes of data some databases limit the volume of data for a single transaction in order to efficiently guarantee maintain **data integrity**" + ] + }, + { + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [] } ], "metadata": { "kernelspec": { - "display_name": "Python 3", + "display_name": "python (3.10.12)", "language": "python", "name": "python3" }, @@ -154,7 +166,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.7" + "version": "3.10.12" } }, "nbformat": 4, diff --git a/transport/cloud/bigquery.py b/transport/cloud/bigquery.py index cf2e3d6..09910de 100644 --- a/transport/cloud/bigquery.py +++ b/transport/cloud/bigquery.py @@ -14,6 +14,9 @@ import numpy as np import time MAX_CHUNK = 2000000 +def template (): + return {'provider':'bigquery','private_key':'path-to-key','dataset':'name-of-dataset','table':'table','chunksize':MAX_CHUNK} + class BigQuery: __template__= {"private_key":None,"dataset":None,"table":None} def __init__(self,**_args): @@ -24,6 +27,7 @@ class BigQuery: self.dtypes = _args['dtypes'] if 'dtypes' in _args else None self.table = _args['table'] if 'table' in _args else None self.client = bq.Client.from_service_account_json(self.path) + self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None def meta(self,**_args): """ This function returns meta data for a given table or query with dataset/table properly formatted @@ -81,7 +85,14 @@ class Reader (BigQuery): SQL += " LIMIT "+str(_args['limit']) if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset: SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset) - _info = {'credentials':self.credentials,'dialect':'standard'} + _info = {'credentials':self.credentials,'dialect':'standard'} + # + # @Ent-Feature : adding streaming capability here + # + if 'chunksize' in _args : + self._chunksize = int(_args['chunksize']) + if self._chunksize : + _info['chunksize'] = self._chunksize return pd_gbq.read_gbq(SQL,**_info) if SQL else None # return self.client.query(SQL).to_dataframe() if SQL else None diff --git a/transport/cloud/databricks.py b/transport/cloud/databricks.py index a5fa4c0..7190136 100644 --- a/transport/cloud/databricks.py +++ b/transport/cloud/databricks.py @@ -17,6 +17,8 @@ import sqlalchemy # from transport.common import Reader,Writer import pandas as pd +def template (): + return {'provider':'databricks','host':'fqn-host','token':'token','cluster_path':'path-of-cluster','catalog':'name-of-catalog','database':'schema-or-database','table':'table','chunksize':10000} class Bricks: """ @@ -42,6 +44,7 @@ class Bricks: _uri = f'''databricks+connector://token:{_token}@{_host}?http_path={_cluster_path}&catalog={_catalog}&schema={self._schema}''' self._engine = sqlalchemy.create_engine (_uri) + self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None pass def meta(self,**_args): table = _args['table'] if 'table' in _args else self._table @@ -64,7 +67,14 @@ class Bricks: def apply(self,_sql): try: if _sql.lower().startswith('select') : - return pd.read_sql(_sql,self._engine) + # + # @ENT-Feature: adding streaming functions/variables + + if not self._chunksize : + return pd.read_sql(_sql,self._engine) + else: + return pd.read_sql(_sql,self._engine,chunksize=self._chunksize) + except Exception as e: pass @@ -84,7 +94,10 @@ class Reader(Bricks): sql = f'SELECT * FROM {table}' if limit : sql = sql + f' LIMIT {limit}' - + # + # @ENT-Feature: adding streaming functions/variables + if 'chunksize' in _args : + self._chunksize = int(_args['chunksize']) if 'sql' in _args or 'table' in _args : return self.apply(sql) else: diff --git a/transport/cloud/nextcloud.py b/transport/cloud/nextcloud.py index 16a37af..5b5ed46 100644 --- a/transport/cloud/nextcloud.py +++ b/transport/cloud/nextcloud.py @@ -8,7 +8,8 @@ import pandas as pd from io import StringIO import json import nextcloud_client as nextcloud - +def template(): + return {"url":None,"token":None,"uid":None,"file":None} class Nextcloud : __template__={"url":None,"token":None,"uid":None,"file":None} def __init__(self,**_args): diff --git a/transport/cloud/s3.py b/transport/cloud/s3.py index 4d7c5b9..a61bd05 100644 --- a/transport/cloud/s3.py +++ b/transport/cloud/s3.py @@ -20,6 +20,9 @@ from io import StringIO import pandas as pd import json +def template(): + return {'access_key':'access-key','secret_key':'secret-key','region':'region','bucket':'name-of-bucket','file':'file-name','chunksize':10000} + class s3 : """ @TODO: Implement a search function for a file given a bucket?? @@ -38,6 +41,7 @@ class s3 : self._bucket_name = args['bucket'] self._file_name = args['file'] self._region = args['region'] + self._chunksize = int(args['chunksize']) if 'chunksize' in args else None except Exception as e : print (e) pass @@ -89,7 +93,10 @@ class Reader(s3) : if not _stream : return None if _object['ContentType'] in ['text/csv'] : - return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'",""))) + if not self._chunksize : + return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'",""))) + else: + return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")),chunksize=self._chunksize) else: return _stream diff --git a/transport/iowrapper.py b/transport/iowrapper.py index 54daf13..4b47e99 100644 --- a/transport/iowrapper.py +++ b/transport/iowrapper.py @@ -163,6 +163,7 @@ class IReader(IO): _objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__]) if types.GeneratorType == type(_data): + return self._stream(_data) # if self._plugins : # return self._stream(_data) @@ -278,7 +279,7 @@ class IETL(BaseIO) : if _item['type'] in ['INTEGER','BIGINT','INT'] : _column = _item['name'] _data[_column] = _data[_column].copy().fillna(0).astype(np.int64) - writer.write(_data,schema=_schema) + writer.write(_data) except Exception as e: _action = 'post-error' _input['error'] = str(e) diff --git a/transport/nosql/couchdb.py b/transport/nosql/couchdb.py index 9449947..072a59a 100644 --- a/transport/nosql/couchdb.py +++ b/transport/nosql/couchdb.py @@ -11,7 +11,8 @@ import sys # from transport.common import Reader, Writer from datetime import datetime - +def template(): + return {'dbname':'database','doc':'document','username':'username','password':'password','url':'url-with-port'} class Couch: """ This class is a wrapper for read/write against couchdb. The class captures common operations for read/write. diff --git a/transport/nosql/mongodb.py b/transport/nosql/mongodb.py index 4c216bd..ec6d1c5 100644 --- a/transport/nosql/mongodb.py +++ b/transport/nosql/mongodb.py @@ -20,6 +20,9 @@ import re from multiprocessing import Lock, RLock from transport.common import IEncoder +def template(): + return {'provider':'mongodb','host':'localhost','port':27017,'db':'db-name','collection':'collection-name','username':'username','password':'password','mechanism':'SCRAM-SHA-256'} + class Mongo : lock = RLock() """ diff --git a/transport/other/files.py b/transport/other/files.py index 6389012..6e18721 100644 --- a/transport/other/files.py +++ b/transport/other/files.py @@ -4,6 +4,10 @@ This file is a wrapper around pandas built-in functionalities to handle characte import pandas as pd import numpy as np import os + +def template(): + return {'path':None,'delimiter':None} + class File : def __init__(self,**params): """ diff --git a/transport/other/http.py b/transport/other/http.py index d92e334..bed93f5 100644 --- a/transport/other/http.py +++ b/transport/other/http.py @@ -7,6 +7,8 @@ import requests from io import StringIO import pandas as pd +def template(): + return {'url':None,'headers':{'key':'value'}} class Reader: """ diff --git a/transport/other/rabbitmq.py b/transport/other/rabbitmq.py index f56800d..0fa2b0c 100644 --- a/transport/other/rabbitmq.py +++ b/transport/other/rabbitmq.py @@ -17,6 +17,10 @@ import sys # from common import Reader, Writer import json from multiprocessing import RLock + +def template(): + return {'port':5672,'host':'localhost','queue':None,'vhost':None,'username':None,'password':None} + class MessageQueue: """ This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq) diff --git a/transport/sql/common.py b/transport/sql/common.py index 048f80e..b3b8712 100644 --- a/transport/sql/common.py +++ b/transport/sql/common.py @@ -1,11 +1,14 @@ """ This file encapsulates common operations associated with SQL databases via SQLAlchemy - +@ENT: + - To support streaming (with generators) we the parameter chunksize which essentially enables streaming """ import sqlalchemy as sqa from sqlalchemy import text , MetaData, inspect import pandas as pd +def template(): + return {'host':'localhost','database':'database','table':'table'} class Base: def __init__(self,**_args): @@ -20,6 +23,7 @@ class Base: _uri,_kwargs = _uri self._engine= sqa.create_engine(_uri,**_kwargs,future=True) + self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None def _set_uri(self,**_args) : """ :provider provider @@ -78,7 +82,12 @@ class Base: @TODO: Execution of stored procedures """ if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'): - return pd.read_sql(sql,self._engine) + + if not self._chunksize: + return pd.read_sql(sql,self._engine) + else: + return pd.read_sql(sql,self._engine,chunksize=self._chunksize) + else: _handler = self._engine.connect() _handler.execute(text(sql.strip())) @@ -132,7 +141,12 @@ class BaseReader(SQLBase): if self._schema and type(self._schema) == str : _table = f'{self._schema}.{_table}' sql = f'SELECT * FROM {_table}' + if 'chunksize' in _args : + self._chunksize = int(_args['chunksize']) + + return self.apply(sql) + class BaseWriter (SQLBase): diff --git a/transport/sql/duckdb.py b/transport/sql/duckdb.py index 97fb3fa..c7d095f 100644 --- a/transport/sql/duckdb.py +++ b/transport/sql/duckdb.py @@ -3,6 +3,9 @@ This module implements the handler for duckdb (in memory or not) """ from transport.sql.common import Base, BaseReader, BaseWriter +def template (): + return {'database':'path-to-database','table':'table'} + class Duck : def __init__(self,**_args): # diff --git a/transport/sql/mysql.py b/transport/sql/mysql.py index 320eb68..0e89ac0 100644 --- a/transport/sql/mysql.py +++ b/transport/sql/mysql.py @@ -1,8 +1,11 @@ """ This file implements support for mysql and maria db (with drivers mysql+mysql) """ -from transport.sql.common import BaseReader, BaseWriter +from transport.sql.common import BaseReader, BaseWriter, template as _template # import mysql.connector as my +def template (): + return dict(_template(),**{'port':3306}) + class MYSQL: def get_provider(self): diff --git a/transport/sql/netezza.py b/transport/sql/netezza.py index 6d53164..e39aa6a 100644 --- a/transport/sql/netezza.py +++ b/transport/sql/netezza.py @@ -1,5 +1,8 @@ import nzpy as nz -from transport.sql.common import BaseReader, BaseWriter +from transport.sql.common import BaseReader, BaseWriter , template as _template + +def template (): + return dict(_template(),**{'port':5480,'chunksize':10000}) class Netezza: def get_provider(self): diff --git a/transport/sql/postgresql.py b/transport/sql/postgresql.py index 0831291..238a21c 100644 --- a/transport/sql/postgresql.py +++ b/transport/sql/postgresql.py @@ -1,7 +1,10 @@ -from transport.sql.common import BaseReader , BaseWriter +from transport.sql.common import BaseReader , BaseWriter, template as _template + from psycopg2.extensions import register_adapter, AsIs import numpy as np +def template (): + return dict(_template(),**{'port':5432,'chunksize':10000}) register_adapter(np.int64, AsIs) diff --git a/transport/sql/sqlite3.py b/transport/sql/sqlite3.py index d71441e..0816123 100644 --- a/transport/sql/sqlite3.py +++ b/transport/sql/sqlite3.py @@ -2,6 +2,8 @@ import sqlalchemy import pandas as pd from transport.sql.common import Base, BaseReader, BaseWriter from multiprocessing import RLock +def template(): + return {'database':'path-to-database','table':'table'} class SQLite3 : lock = RLock() def __init__(self,**_args): diff --git a/transport/sql/sqlserver.py b/transport/sql/sqlserver.py index 6cc9e38..70b88f4 100644 --- a/transport/sql/sqlserver.py +++ b/transport/sql/sqlserver.py @@ -3,7 +3,11 @@ Handling Microsoft SQL Server via pymssql driver/connector """ import sqlalchemy import pandas as pd -from transport.sql.common import Base, BaseReader, BaseWriter +from transport.sql.common import Base, BaseReader, BaseWriter, template as _template + +def template (): + return dict(_template(),**{'port':1433}) + class MsSQLServer: diff --git a/transport/warehouse/drill.py b/transport/warehouse/drill.py index 71f0e64..d2afbbd 100644 --- a/transport/warehouse/drill.py +++ b/transport/warehouse/drill.py @@ -3,6 +3,8 @@ import pandas as pd from .. sql.common import BaseReader , BaseWriter import sqlalchemy as sqa +def template(): + return {'host':'localhost','port':8047,'ssl':False,'table':None,'database':None} class Drill : __template = {'host':None,'port':None,'ssl':None,'table':None,'database':None} def __init__(self,**_args): diff --git a/transport/warehouse/iceberg.py b/transport/warehouse/iceberg.py index 3def181..aa6f5d3 100644 --- a/transport/warehouse/iceberg.py +++ b/transport/warehouse/iceberg.py @@ -11,6 +11,10 @@ from pyspark.sql.types import * from pyspark.sql.functions import col, to_date, to_timestamp import copy + +def template(): + return {'catalog':None,'database':None,'table':None} + class Iceberg : def __init__(self,**_args): """