Compare commits

..

60 Commits
v2.2.0 ... main

Author SHA1 Message Date
Steve L. Nyemba db923e8ec1 Merge pull request 'v2.4' (#5) from v2.4 into main
5 days ago
Steve Nyemba 9d00a459eb version update, etl bug fixes
5 days ago
Steve Nyemba b80c076ec9 bug fixes: ETL creating a template functions
5 days ago
Steve L. Nyemba 9bf19be8e9 Merge pull request 'forogot info file' (#4) from v2.4 into main
3 weeks ago
Steve Nyemba 8381f4cbc0 forogot info file
3 weeks ago
Steve L. Nyemba 2054a8b095 Merge pull request 'bug fix: refactored info folder' (#3) from v2.4 into main
3 weeks ago
Steve Nyemba 7aec155334 bug fix: refactored info folder
3 weeks ago
Steve L. Nyemba 3267f93331 Merge pull request 'v2.4' (#2) from v2.4 into main
2 months ago
Steve Nyemba a065ac8f12 bug fixes (indentation)
2 months ago
Steve L. Nyemba 72ab0d1708 Merge pull request 'bug fix & added windows runner' (#1) from v2.4 into main
2 months ago
Steve Nyemba 2021abd1da windows runner
2 months ago
Steve Nyemba 977aa91045 bug fix & added windows runner
2 months ago
Steve Nyemba d154ac3cd0 bug fix:
5 months ago
Steve Nyemba e0e48a3d02 bug fix: installer & registry
6 months ago
Steve Nyemba aba887ec29 bug fix: crash on concurrency
6 months ago
Steve Nyemba 570f2294b9 bug fix with registry
6 months ago
Steve Nyemba e3efc70c01 upgrade pyproject.toml, bug fix with registry
6 months ago
Steve Nyemba 6ffc7ed7b5 bug fix
6 months ago
Steve Nyemba 3025e6571b bug fix
6 months ago
Steve Nyemba a42ee59129 bug fix: logger
7 months ago
Steve Nyemba b1975d6a42 bug fix: missing table (sql)
9 months ago
Steve Nyemba c7a5d42f42 bug fix: dependency
9 months ago
Steve Nyemba 2eee726191 bug fix: version update
9 months ago
Steve Nyemba bf32c54cd4 bug fix: setup file
10 months ago
Steve Nyemba 4fbf2d495a adding dependency plugin-ix
10 months ago
Steve Nyemba 73fa9d90a9 adding plugin handler (enhancement)
10 months ago
Steve Nyemba fce888606c bug fix: double entries (yikes)
10 months ago
Steve Nyemba 2d359db5fa bug fix & version update, using schemas read/write
10 months ago
Steve Nyemba cdeebd3ce4 bug fix & version update, using schemas read/write
10 months ago
Steve Nyemba c1bc167b7f bug fix & version update
10 months ago
Steve Nyemba d8dd50ab47 bug fix
10 months ago
Steve Nyemba a0b0a8a26f verison update
10 months ago
Steve Nyemba 97c5ae6fb3 bug fix: duckdb readonly, version update with edition
10 months ago
Steve Nyemba a6da232d5f bug fix ...
10 months ago
Steve Nyemba e7df1e967f bug fix sqlalchemy dispose connection
11 months ago
Steve Nyemba a4b4a453bb bug fix: etl & logger
11 months ago
Steve Nyemba a022bdf92f bug fix: etl & logger
11 months ago
Steve Nyemba 93537095a4 bug fix: etl & logger
11 months ago
Steve Nyemba 329a575f89 bug fix: issue with sqlalchemy & python 3.12
11 months ago
Steve Nyemba 0dbb0a38e5 bug fix: issue with sqlalchemy & python 3.12
11 months ago
Steve Nyemba 6637405898 bug fix: issue with sqlalchemy & python 3.12
11 months ago
Steve Nyemba e82145690b bug fix: version
12 months ago
Steve Nyemba 5c423205c5 bug fixes and enhancements, iceberg casting, typer parameters, etl throtling
12 months ago
Steve Nyemba b2a2e49858 bug fix
1 year ago
Steve Nyemba 0fd29207cf bug fixes ...
1 year ago
Steve Nyemba 5ee0186e58 bug fixes ...
1 year ago
Steve Nyemba 541ae41786 bug fix
1 year ago
Steve Nyemba dc9329218a bug fixes: read, with source that accepts an sql query
1 year ago
Steve Nyemba 990bb343a4 bug fixes: read, with source that accepts an sql query
1 year ago
Steve Nyemba bcf25a4e27 bug fixes: drill & iceberg, etl
1 year ago
Steve Nyemba a2ab60660e bug fix
1 year ago
Steve Nyemba 92db89daaf bug fix
1 year ago
Steve Nyemba a28848194a bug fixes: stream, drill,iceberg
1 year ago
Steve Nyemba 5dbe541025 adding templates to the class hierarchies, helps with wizard
1 year ago
Steve Nyemba ea1cb7b1bb bug fixes with ETL and added properties to perform parameter validation and provide input template
1 year ago
Steve Nyemba 8904c7184a warehouse support, plugin registry and streaming
1 year ago
Steve Nyemba 4e97b32530 feat: streaming support on reads
1 year ago
Steve Nyemba a1cf78a889 bug fixes: drill inheritance and met data function
1 year ago
Steve Nyemba 685aac7d6b bug fix: write when table doesn't exist
1 year ago
Steve Nyemba 07be81bace adding warehouse support (iceberg)
1 year ago

@ -13,20 +13,28 @@ Data transport is a simple framework that:
## Installation ## Installation
Within the virtual environment perform the following : Within the virtual environment perform the following (the following will install everything):
pip install git+https://github.com/lnyemba/data-transport.git pip install data-transport[all]@git+https://github.com/lnyemba/data-transport.git
Options to install components in square brackets Options to install components in square brackets are **nosql**; **cloud**; **other** and **warehouse**
pip install data-transport[nosql,cloud,warehouse,all]@git+https://github.com/lnyemba/data-transport.git pip install data-transport[nosql,cloud,other, warehouse,all]@git+https://github.com/lnyemba/data-transport.git
The components available:
0. sql by default netezza; mysql; postgresql; duckdb; sqlite3; sqlserver
1. nosql mongodb/ferretdb; couchdb
2. cloud s3; bigquery; databricks
3. other files; http; rabbitmq
4. warehouse apache drill; apache iceberg
## Additional features ## Additional features
- In addition to read/write, there is support for functions for pre/post processing - Reads are separated from writes to avoid accidental writes.
- Streaming (for large volumes of data) by specifying chunksize
- CLI interface to add to registry, run ETL - CLI interface to add to registry, run ETL
- scales and integrates into shared environments like apache zeppelin; jupyterhub; SageMaker; ... - Implements best-pracices for collaborative environments like apache zeppelin; jupyterhub; SageMaker; ...
## Learn More ## Learn More

@ -53,10 +53,8 @@ def wait(jobs):
while jobs : while jobs :
jobs = [thread for thread in jobs if thread.is_alive()] jobs = [thread for thread in jobs if thread.is_alive()]
time.sleep(1) time.sleep(1)
# def wait (jobs): def job (_args):
# while jobs : pass
# jobs = [pthread for pthread in jobs if pthread.is_alive()]
@app_e.command(name="run") @app_e.command(name="run")
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")], def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"), index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"),
@ -159,6 +157,10 @@ def initregistry (email:Annotated[str,typer.Argument(help="email")],
_msg = f"{TIMES_MARK} {e}" _msg = f"{TIMES_MARK} {e}"
print (_msg) print (_msg)
print () print ()
@app_r.command(name="add") @app_r.command(name="add")
def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")], def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")],
auth_file:Annotated[str,typer.Argument(help="path of the auth_file")], auth_file:Annotated[str,typer.Argument(help="path of the auth_file")],
@ -177,8 +179,36 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be
except Exception as e: except Exception as e:
_msg = f"""{TIMES_MARK} {e}""" _msg = f"""{TIMES_MARK} {e}"""
print (_msg) print (_msg)
@app_r.command(name="template")
def template(name:Annotated[str,typer.Argument(help="database technology provider" ) ]):
"""
This function will generate a template entry for the registry (content of an auth file)
"""
#
# retrieve the provider and display the template if it has one
for _module in ['sql','cloud','warehouse','nosql','other'] :
ref = getattr(transport,_module) if hasattr(transport,_module) else None
_entry = {}
if ref :
if hasattr(ref,name) :
_pointer = getattr(ref,name)
_entry = dict({'provider':name},**_pointer.template()) if hasattr(_pointer,'template') else {}
break
#
#
print ( json.dumps(_entry))
pass pass
@app_r.command(name="list")
def register_list ():
"""
This function will list existing registry entries and basic information {label,vendor}
"""
# print (transport.registry.DATA)
_reg = transport.registry.DATA
_data = [{'label':key,'provider':_reg[key]['provider']} for key in _reg if 'provider' in _reg[key]]
_data = pd.DataFrame(_data)
print (_data)
@app_x.command(name='add') @app_x.command(name='add')
def register_plugs ( def register_plugs (
alias:Annotated[str,typer.Argument(help="unique function name within a file")], alias:Annotated[str,typer.Argument(help="unique function name within a file")],

@ -0,0 +1,2 @@
cd /D "%~dp0"
python transport %1 %2 %3 %4 %5 %6

@ -1,8 +1,8 @@
__app_name__ = 'data-transport' __app_name__ = 'data-transport'
__author__ = 'The Phi Technology' __author__ = 'The Phi Technology'
__version__= '2.2.22' __version__= '2.4.30'
__edition__= 'enterprise'
__email__ = "info@the-phi.com" __email__ = "info@the-phi.com"
__edition__= 'community'
__license__=f""" __license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba Copyright 2010 - 2024, Steve L. Nyemba
@ -20,4 +20,6 @@ __whatsnew__=f"""version {__version__},
3. support for streaming data, important to use this with large volumes of data 3. support for streaming data, important to use this with large volumes of data
""" """

@ -1,138 +0,0 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Writing to Apache Iceberg\n",
"\n",
"1. Insure you have a Google Bigquery service account key on disk\n",
"2. The service key location is set as an environment variable **BQ_KEY**\n",
"3. The dataset will be automatically created within the project associated with the service key\n",
"\n",
"The cell below creates a dataframe that will be stored within Google Bigquery"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['data transport version ', '2.4.0']\n"
]
}
],
"source": [
"#\n",
"# Writing to Google Bigquery database\n",
"#\n",
"import transport\n",
"from transport import providers\n",
"import pandas as pd\n",
"import os\n",
"\n",
"PRIVATE_KEY = os.environ['BQ_KEY'] #-- location of the service key\n",
"DATASET = 'demo'\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"# bqw = transport.get.writer(provider=providers.ICEBERG,catalog='mz',database='edw.mz',table='friends')\n",
"bqw = transport.get.writer(provider=providers.ICEBERG,table='edw.mz.friends')\n",
"bqw.write(_data,if_exists='replace') #-- default is append\n",
"print (['data transport version ', transport.__version__])\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Reading from Google Bigquery\n",
"\n",
"The cell below reads the data that has been written by the cell above and computes the average age within a Google Bigquery (simple query). \n",
"\n",
"- Basic read of the designated table (friends) created above\n",
"- Execute an aggregate SQL against the table\n",
"\n",
"**NOTE**\n",
"\n",
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" name age\n",
"0 James Bond 55\n",
"1 Steve Rogers 150\n",
"2 Steve Nyemba 44\n",
"--------- STATISTICS ------------\n"
]
}
],
"source": [
"\n",
"import transport\n",
"from transport import providers\n",
"import os\n",
"PRIVATE_KEY=os.environ['BQ_KEY']\n",
"pgr = transport.get.reader(provider=providers.ICEBERG,database='edw.mz')\n",
"_df = pgr.read(table='friends')\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
"_sdf = pgr.read(sql=_query)\n",
"print (_df)\n",
"print ('--------- STATISTICS ------------')\n",
"# print (_sdf)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"\n",
"1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"\n",
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -14,7 +14,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 1, "execution_count": null,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -58,7 +58,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 2, "execution_count": null,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -103,7 +103,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 4, "execution_count": null,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -131,16 +131,28 @@
] ]
}, },
{ {
"cell_type": "code", "cell_type": "markdown",
"execution_count": null, "metadata": {},
"source": [
"#### Streaming Large Volumes of Data\n",
"\n",
"It is recommended for large volumes of data to stream the data using **chunksize** as a parameter \n",
"\n",
"1. in the **read** method \n",
"2. or **transport.get.reader(\\*\\*...,chunksize=1000)**\n",
"\n",
"Use streaming because with large volumes of data some databases limit the volume of data for a single transaction in order to efficiently guarantee maintain **data integrity**"
]
},
{
"cell_type": "markdown",
"metadata": {}, "metadata": {},
"outputs": [],
"source": [] "source": []
} }
], ],
"metadata": { "metadata": {
"kernelspec": { "kernelspec": {
"display_name": "Python 3", "display_name": "python (3.10.12)",
"language": "python", "language": "python",
"name": "python3" "name": "python3"
}, },
@ -154,7 +166,7 @@
"name": "python", "name": "python",
"nbconvert_exporter": "python", "nbconvert_exporter": "python",
"pygments_lexer": "ipython3", "pygments_lexer": "ipython3",
"version": "3.9.7" "version": "3.10.12"
} }
}, },
"nbformat": 4, "nbformat": 4,

@ -39,14 +39,14 @@ Homepage = "https://healthcareio.the-phi.com/git/code/transport.git"
[tool.setuptools] [tool.setuptools]
include-package-data = true include-package-data = true
zip-safe = false zip-safe = false
script-files = ["bin/transport"] script-files = ["bin/transport","bin/transport.cmd"]
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
include = ["info","info.*", "transport", "transport.*"] include = [ "transport", "transport.*"]
[tool.setuptools.dynamic] [tool.setuptools.dynamic]
version = {attr = "info.__version__"} version = {attr = "transport.info.__version__"}
#authors = {attr = "meta.__author__"} #authors = {attr = "transport.__author__"}
# If you have a info.py file, you might also want to include the author dynamically: # If you have a info.py file, you might also want to include the author dynamically:
# [tool.setuptools.dynamic] # [tool.setuptools.dynamic]

@ -38,11 +38,10 @@ except Exception as e:
other = {} other = {}
import pandas as pd import pandas as pd
import json import json
import os import os
from info import __version__,__author__,__email__,__license__,__app_name__,__whatsnew__,__edition__ from transport.info import __version__,__author__,__email__,__license__,__app_name__,__whatsnew__,__edition__
from transport.iowrapper import IWriter, IReader, IETL from transport.iowrapper import IWriter, IReader, IETL
from transport.plugins import PluginLoader from transport.plugins import PluginLoader
from transport import providers from transport import providers

@ -14,7 +14,11 @@ import numpy as np
import time import time
MAX_CHUNK = 2000000 MAX_CHUNK = 2000000
def template ():
return {'provider':'bigquery','private_key':'path-to-key','dataset':'name-of-dataset','table':'table','chunksize':MAX_CHUNK}
class BigQuery: class BigQuery:
__template__= {"private_key":None,"dataset":None,"table":None}
def __init__(self,**_args): def __init__(self,**_args):
path = _args['service_key'] if 'service_key' in _args else _args['private_key'] path = _args['service_key'] if 'service_key' in _args else _args['private_key']
self.credentials = service_account.Credentials.from_service_account_file(path) self.credentials = service_account.Credentials.from_service_account_file(path)
@ -23,6 +27,7 @@ class BigQuery:
self.dtypes = _args['dtypes'] if 'dtypes' in _args else None self.dtypes = _args['dtypes'] if 'dtypes' in _args else None
self.table = _args['table'] if 'table' in _args else None self.table = _args['table'] if 'table' in _args else None
self.client = bq.Client.from_service_account_json(self.path) self.client = bq.Client.from_service_account_json(self.path)
self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None
def meta(self,**_args): def meta(self,**_args):
""" """
This function returns meta data for a given table or query with dataset/table properly formatted This function returns meta data for a given table or query with dataset/table properly formatted
@ -81,6 +86,13 @@ class Reader (BigQuery):
if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset: if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset:
SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset) SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset)
_info = {'credentials':self.credentials,'dialect':'standard'} _info = {'credentials':self.credentials,'dialect':'standard'}
#
# @Ent-Feature : adding streaming capability here
#
if 'chunksize' in _args :
self._chunksize = int(_args['chunksize'])
if self._chunksize :
_info['chunksize'] = self._chunksize
return pd_gbq.read_gbq(SQL,**_info) if SQL else None return pd_gbq.read_gbq(SQL,**_info) if SQL else None
# return self.client.query(SQL).to_dataframe() if SQL else None # return self.client.query(SQL).to_dataframe() if SQL else None

@ -17,6 +17,8 @@ import sqlalchemy
# from transport.common import Reader,Writer # from transport.common import Reader,Writer
import pandas as pd import pandas as pd
def template ():
return {'provider':'databricks','host':'fqn-host','token':'token','cluster_path':'path-of-cluster','catalog':'name-of-catalog','database':'schema-or-database','table':'table','chunksize':10000}
class Bricks: class Bricks:
""" """
@ -26,6 +28,7 @@ class Bricks:
:cluster_path :cluster_path
:table :table
""" """
__template__ = {"host":None,"token":None,"cluster_path":None,"catalog":None,"schema":None}
def __init__(self,**_args): def __init__(self,**_args):
_host = _args['host'] _host = _args['host']
_token= _args['token'] _token= _args['token']
@ -41,6 +44,7 @@ class Bricks:
_uri = f'''databricks+connector://token:{_token}@{_host}?http_path={_cluster_path}&catalog={_catalog}&schema={self._schema}''' _uri = f'''databricks+connector://token:{_token}@{_host}?http_path={_cluster_path}&catalog={_catalog}&schema={self._schema}'''
self._engine = sqlalchemy.create_engine (_uri) self._engine = sqlalchemy.create_engine (_uri)
self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None
pass pass
def meta(self,**_args): def meta(self,**_args):
table = _args['table'] if 'table' in _args else self._table table = _args['table'] if 'table' in _args else self._table
@ -63,7 +67,14 @@ class Bricks:
def apply(self,_sql): def apply(self,_sql):
try: try:
if _sql.lower().startswith('select') : if _sql.lower().startswith('select') :
#
# @ENT-Feature: adding streaming functions/variables
if not self._chunksize :
return pd.read_sql(_sql,self._engine) return pd.read_sql(_sql,self._engine)
else:
return pd.read_sql(_sql,self._engine,chunksize=self._chunksize)
except Exception as e: except Exception as e:
pass pass
@ -83,7 +94,10 @@ class Reader(Bricks):
sql = f'SELECT * FROM {table}' sql = f'SELECT * FROM {table}'
if limit : if limit :
sql = sql + f' LIMIT {limit}' sql = sql + f' LIMIT {limit}'
#
# @ENT-Feature: adding streaming functions/variables
if 'chunksize' in _args :
self._chunksize = int(_args['chunksize'])
if 'sql' in _args or 'table' in _args : if 'sql' in _args or 'table' in _args :
return self.apply(sql) return self.apply(sql)
else: else:

@ -8,8 +8,10 @@ import pandas as pd
from io import StringIO from io import StringIO
import json import json
import nextcloud_client as nextcloud import nextcloud_client as nextcloud
def template():
return {"url":None,"token":None,"uid":None,"file":None}
class Nextcloud : class Nextcloud :
__template__={"url":None,"token":None,"uid":None,"file":None}
def __init__(self,**_args): def __init__(self,**_args):
pass pass
self._delimiter = None self._delimiter = None

@ -20,10 +20,14 @@ from io import StringIO
import pandas as pd import pandas as pd
import json import json
def template():
return {'access_key':'access-key','secret_key':'secret-key','region':'region','bucket':'name-of-bucket','file':'file-name','chunksize':10000}
class s3 : class s3 :
""" """
@TODO: Implement a search function for a file given a bucket?? @TODO: Implement a search function for a file given a bucket??
""" """
__template__={"access_key":None,"secret_key":None,"bucket":None,"file":None,"region":None}
def __init__(self,**args) : def __init__(self,**args) :
""" """
This function will extract a file or set of files from s3 bucket provided This function will extract a file or set of files from s3 bucket provided
@ -37,6 +41,7 @@ class s3 :
self._bucket_name = args['bucket'] self._bucket_name = args['bucket']
self._file_name = args['file'] self._file_name = args['file']
self._region = args['region'] self._region = args['region']
self._chunksize = int(args['chunksize']) if 'chunksize' in args else None
except Exception as e : except Exception as e :
print (e) print (e)
pass pass
@ -88,7 +93,10 @@ class Reader(s3) :
if not _stream : if not _stream :
return None return None
if _object['ContentType'] in ['text/csv'] : if _object['ContentType'] in ['text/csv'] :
if not self._chunksize :
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'",""))) return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")))
else:
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")),chunksize=self._chunksize)
else: else:
return _stream return _stream

@ -1,19 +0,0 @@
"""
This file will be intended to handle duckdb database
"""
import duckdb
from transport.common import Reader,Writer
class Duck(Reader):
def __init__(self,**_args):
super().__init__(**_args)
self._path = None if 'path' not in _args else _args['path']
self._handler = duckdb.connect() if not self._path else duckdb.connect(self._path)
class DuckReader(Duck) :
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args) :
pass

@ -0,0 +1,25 @@
__app_name__ = 'data-transport'
__author__ = 'Steve L. Nyemba'
__version__= '2.4.34'
__edition__= 'enterprise'
__email__ = "info@the-phi.com"
__license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the Software), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED AS IS, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
__whatsnew__=f"""version {__version__},
1. Added support for read/write logs as well as plugins (when applied)
2. Bug fix with duckdb (adding readonly) for readers because there are issues with threads & processes
3. support for streaming data, important to use this with large volumes of data
"""

@ -5,42 +5,113 @@ NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading o
- upon initialization we will load plugins - upon initialization we will load plugins
- on read/write we apply a pipeline (if passed as an argument) - on read/write we apply a pipeline (if passed as an argument)
""" """
from transport.plugins import Plugin, PluginLoader from transport.plugins import PluginLoader
import transport import transport
from transport import providers from transport import providers
from multiprocessing import Process from multiprocessing import Process, RLock
import time import time
import types
from . import registry
from datetime import datetime
import pandas as pd
import numpy as np
import os
import sys
import itertools
import json
import plugin_ix import plugin_ix
class IO: class BaseIO :
def __init__(self,**_args):
self._logger = _args['logger'] if 'logger' in _args else None
self._logTable = 'logs' if 'logTable' not in _args else _args['logTable']
def setLogger(self,_logger):
self._logger = _logger
def log (self,**_args):
if self._logger :
_date = str(datetime.now())
_data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args)
for key in _data :
if type(_data[key]) == list :
_data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]]
_data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key])
self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
class IO(BaseIO):
""" """
Base wrapper class for read/write and support for logs Base wrapper class for read/write and support for logs
""" """
def __init__(self,**_args): def __init__(self,**_args):
_agent = _args['agent']
plugins = _args['plugins'] if 'plugins' in _args else None
#
# We need to initialize the logger here ...
#
super().__init__(**_args)
_agent = _args['agent']
plugins = _args['plugins'] if 'plugins' else None
# _logger = _args['logger'] if 'logger' in _args else None
# self._logger = _logger if not type(_agent) in [IReader,IWriter] else _agent._logger #transport.get.writer(label='logger') #if registry.has('logger') else None
# if not _logger and hasattr(_agent,'_logger') :
# self._logger = getattr(_agent,'_logger')
self._agent = _agent self._agent = _agent
# self._ixloader = plugin_ix.Loader () #-- must indicate where the plugin registry file is _date = _date = str(datetime.now())
self._ixloader = plugin_ix.Loader (registry=plugin_ix.Registry(folder=transport.registry.REGISTRY_PATH)) self._ixloader = plugin_ix.Loader (registry=plugin_ix.Registry(folder=transport.registry.REGISTRY_PATH))
# self._logTable = 'logs' #'_'.join(['logs',_date[:10]+_date[11:19]]).replace(':','').replace('-','_')
if plugins : if plugins :
self.init_plugins(plugins) self.init_plugins(plugins)
# def setLogger(self,_logger):
# self._logger = _logger
# def log (self,**_args):
# if self._logger :
# _date = str(datetime.now())
# _data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args)
# for key in _data :
# if type(_data[key]) == list :
# _data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]]
# _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key])
# self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
# def _init_plugins(self,_items):
# """
# This function will load pipelined functions as a plugin loader
# """
# registry.plugins.init()
# self._plugins = PluginLoader(registry=registry.plugins)
# [self._plugins.set(_name) for _name in _items]
# self.log(action='init-plugins',object=self.getClassName(self),input =[_name for _name in _items])
# # if 'path' in _args and 'names' in _args :
# # self._plugins = PluginLoader(**_args)
# # else:
# # self._plugins = PluginLoader(registry=registry.plugins)
# # [self._plugins.set(_pointer) for _pointer in _args]
# #
# # @TODO: We should have a way to log what plugins are loaded and ready to use
def meta (self,**_args): def meta (self,**_args):
if hasattr(self._agent,'meta') : if hasattr(self._agent,'meta') :
return self._agent.meta(**_args) return self._agent.meta(**_args)
return [] return []
def getClassName (self,_object):
return '.'.join([_object.__class__.__module__,_object.__class__.__name__])
def close(self): def close(self):
if hasattr(self._agent,'close') : if hasattr(self._agent,'close') :
self._agent.close() self._agent.close()
# def apply(self): def apply(self):
# """ """
# applying pre/post conditions given a pipeline expression applying pre/post conditions given a pipeline expression
# """ """
# for _pointer in self._plugins : for _pointer in self._plugins :
# _data = _pointer(_data) _data = _pointer(_data)
time.sleep(1)
def apply(self,_query): def apply(self,_query):
if hasattr(self._agent,'apply') : if hasattr(self._agent,'apply') :
return self._agent.apply(_query) return self._agent.apply(_query)
@ -62,70 +133,158 @@ class IReader(IO):
""" """
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
self._args = _args['args']if 'args' in _args else None
def _stream (self,_data ):
# self.log(action='streaming',object=self._agent._engine.name, input= type(_data).__name__)
_shape = []
for _segment in _data :
_shape += list(_segment.shape)
if self._plugins :
# yield self._plugins.apply(_segment,self.log)
yield self._ixloader.visitor(_data,self.log)
else:
yield _segment
_objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__])
_input = {'shape':_shape}
if hasattr(self._agent,'_table') :
_input['table'] = self._agent._table
self.log(action='streaming',object=_objectName, input= _input)
def read(self,**_args): def read(self,**_args):
if 'plugins' in _args : if 'plugins' in _args :
self.init_plugins(_args['plugins']) self.init_plugins(_args['plugins'])
if self._args :
_data = self._agent.read(**self._args)
else:
_data = self._agent.read(**_args) _data = self._agent.read(**_args)
# if self._plugins and self._plugins.ratio() > 0 :
# _data = self._plugins.apply(_data)
#
# output data
# _objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__])
# applying the the design pattern if types.GeneratorType == type(_data):
return self._stream(_data)
# if self._plugins :
# return self._stream(_data)
# else:
# _count = 0
# for _segment in _data :
# _count += 1
# yield _segment
# self.log(action='streaming',object=_objectName, input= {'segments':_count})
# return _data
elif type(_data) == pd.DataFrame :
_shape = _data.shape #[0,0] if not _data.shape[] else list(_data.shape)
_input = {'shape':_shape}
if hasattr(self._agent,'_table') :
_input['table'] = self._agent._table
self.log(action='read',object=_objectName, input=_input)
_data = self._ixloader.visitor(_data) _data = self._ixloader.visitor(_data)
return _data return _data
class IWriter(IO): class IWriter(IO):
def __init__(self,**_args): #_agent,pipeline=None): lock = RLock()
super().__init__(**_args) #_agent,pipeline) def __init__(self,**_args):
super().__init__(**_args)
def write(self,_data,**_args): def write(self,_data,**_args):
# if 'plugins' in _args :
# self._init_plugins(_args['plugins'])
if 'plugins' in _args : if 'plugins' in _args :
self.init_plugins(_args['plugins']) self._init_plugins(_args['plugins'])
# if self._plugins and self._plugins.ratio() > 0 :
# _logs = []
# _data = self._plugins.apply(_data,_logs,self.log)
self._ixloader.visitor(_data) # [self.log(**_item) for _item in _logs]
try:
# IWriter.lock.acquire()
_data = self._ixloader.visitor(_data)
self._agent.write(_data,**_args) self._agent.write(_data,**_args)
finally:
# IWriter.lock.release()
pass
# #
# The ETL object in its simplest form is an aggregation of read/write objects # The ETL object in its simplest form is an aggregation of read/write objects
# @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process # @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process
class IETL(IReader) : class IETL(BaseIO) :
""" """
This class performs an ETL operation by ineriting a read and adding writes as pipeline functions This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
""" """
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(agent=transport.get.reader(**_args['source']),plugins=None)
if 'target' in _args: super().__init__()
self._source = _args['source']
self._targets= _args['target'] if type(_args['target']) == list else [_args['target']] self._targets= _args['target'] if type(_args['target']) == list else [_args['target']]
else:
self._targets = []
self.jobs = []
# #
# If the parent is already multiprocessing # ETL Initialization, we should provide some measure of context ...
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess'] #
def read(self,**_args):
_data = super().read(**_args)
_schema = super().meta()
for _kwargs in self._targets :
if _schema :
_kwargs['schema'] = _schema
self.post(_data,**_kwargs)
# def run(self) :
# """
# We should apply the etl here, if we are in multiprocessing mode
# """
# return self.read()
def run(self,**_args):
# _data = super().read(**_args) if not self._sourceArgs else super().read(**self._sourceArgs)
# self._targets = [transport.get.writer(**_kwargs) for _kwargs in self._targets]
_reader = transport.get.reader(**self._source)
if hasattr(_reader,'_logger') :
self.setLogger(_reader._logger)
self.log(action='init-etl',input={'source':self._source,'target':self._targets})
_data = _reader.read(**self._source['args'])if 'args' in self._source else _reader.read()
_reader.close()
_writers = [transport.get.writer(**_kwargs) for _kwargs in self._targets]
# _schema = [] if not getattr(_reader._agent,'_table') else _reader.meta()
_schema = [] if not hasattr(_reader._agent,'_table') else _reader.meta()
if types.GeneratorType == type(_data):
_index = 0
for _segment in _data :
_index += 1
for _writer in _writers :
self.post(_segment,writer=_writer,index=_index,schema=_schema)
time.sleep(1)
else:
for _writer in _writers :
self.post(_data,writer=_writer,schema=_schema)
# pass
return _data return _data
def run(self) : # return _data
return self.read()
def post (self,_data,**_args) : def post (self,_data,**_args) :
""" """
This function returns an instance of a process that will perform the write operation This function returns an instance of a process that will perform the write operation
:_args parameters associated with writer object :_args parameters associated with writer object
""" """
writer = transport.get.writer(**_args) #writer = transport.get.writer(**_args)
if 'schema' in _args : _input = {}
writer.write(_data,schema=_args['schema']) try:
else: _action = 'post'
_shape = dict(zip(['rows','columns'],_data.shape))
_index = _args['index'] if 'index' in _args else 0
writer = _args['writer']
_schema= _args['schema']
#
# -- things to log
_input = {'shape':_shape,'segment':_index}
if hasattr(writer._agent,'_table'):
_input['table'] = writer._agent._table
for _item in _schema :
if _item['type'] in ['INTEGER','BIGINT','INT'] :
_column = _item['name']
_data[_column] = _data[_column].copy().fillna(0).astype(np.int64)
writer.write(_data) writer.write(_data)
writer.close() except Exception as e:
_action = 'post-error'
_input['error'] = str(e)
print ([e])
pass
self.log(action=_action,object=writer._agent.__module__, input= _input)

@ -11,7 +11,8 @@ import sys
# from transport.common import Reader, Writer # from transport.common import Reader, Writer
from datetime import datetime from datetime import datetime
def template():
return {'dbname':'database','doc':'document','username':'username','password':'password','url':'url-with-port'}
class Couch: class Couch:
""" """
This class is a wrapper for read/write against couchdb. The class captures common operations for read/write. This class is a wrapper for read/write against couchdb. The class captures common operations for read/write.
@ -19,6 +20,7 @@ class Couch:
@param doc user id involved @param doc user id involved
@param dbname database name (target) @param dbname database name (target)
""" """
__template__={"url":None,"doc":None,"dbname":None,"username":None,"password":None}
def __init__(self,**args): def __init__(self,**args):
url = args['url'] if 'url' in args else 'http://localhost:5984' url = args['url'] if 'url' in args else 'http://localhost:5984'
self._id = args['doc'] self._id = args['doc']

@ -20,11 +20,15 @@ import re
from multiprocessing import Lock, RLock from multiprocessing import Lock, RLock
from transport.common import IEncoder from transport.common import IEncoder
def template():
return {'provider':'mongodb','host':'localhost','port':27017,'db':'db-name','collection':'collection-name','username':'username','password':'password','mechanism':'SCRAM-SHA-256'}
class Mongo : class Mongo :
lock = RLock() lock = RLock()
""" """
Basic mongodb functions are captured here Basic mongodb functions are captured here
""" """
__template__={"db":None,"collection":None,"host":None,"port":None,"username":None,"password":None}
def __init__(self,**args): def __init__(self,**args):
""" """
:dbname database name/identifier :dbname database name/identifier

@ -9,7 +9,7 @@ import numpy as np
import pandas as pd import pandas as pd
class Writer : class Writer :
lock = Lock() lock = None
_queue = {'default':queue.Queue()} _queue = {'default':queue.Queue()}
def __init__(self,**_args): def __init__(self,**_args):
self._cache = {} self._cache = {}

@ -4,6 +4,10 @@ This file is a wrapper around pandas built-in functionalities to handle characte
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import os import os
def template():
return {'path':None,'delimiter':None}
class File : class File :
def __init__(self,**params): def __init__(self,**params):
""" """
@ -12,7 +16,7 @@ class File :
""" """
self.path = params['path'] if 'path' in params else None self.path = params['path'] if 'path' in params else None
self.delimiter = params['delimiter'] if 'delimiter' in params else ',' self.delimiter = params['delimiter'] if 'delimiter' in params else ','
self._chunksize = None if 'chunksize' not in params else int(params['chunksize'])
def isready(self): def isready(self):
return os.path.exists(self.path) return os.path.exists(self.path)
def meta(self,**_args): def meta(self,**_args):
@ -26,11 +30,19 @@ class Reader (File):
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
def _stream(self,path) :
reader = pd.read_csv(path,sep=self.delimiter,chunksize=self._chunksize,low_memory=False)
for segment in reader :
yield segment
def read(self,**args): def read(self,**args):
_path = self.path if 'path' not in args else args['path'] _path = self.path if 'path' not in args else args['path']
_delimiter = self.delimiter if 'delimiter' not in args else args['delimiter'] _delimiter = self.delimiter if 'delimiter' not in args else args['delimiter']
return pd.read_csv(_path,delimiter=self.delimiter)
_df = pd.read_csv(_path,sep=self.delimiter) if not self._chunksize else self._stream(_path)
if 'query' in args :
_query = args['query']
_df = _df.query(_query)
return _df
def stream(self,**args): def stream(self,**args):
raise Exception ("streaming needs to be implemented") raise Exception ("streaming needs to be implemented")
class Writer (File): class Writer (File):

@ -7,6 +7,8 @@ import requests
from io import StringIO from io import StringIO
import pandas as pd import pandas as pd
def template():
return {'url':None,'headers':{'key':'value'}}
class Reader: class Reader:
""" """

@ -17,6 +17,10 @@ import sys
# from common import Reader, Writer # from common import Reader, Writer
import json import json
from multiprocessing import RLock from multiprocessing import RLock
def template():
return {'port':5672,'host':'localhost','queue':None,'vhost':None,'username':None,'password':None}
class MessageQueue: class MessageQueue:
""" """
This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq) This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)

@ -59,9 +59,6 @@ class PluginLoader :
pass pass
def load (self,**_args): def load (self,**_args):
"""
This function loads a plugin
"""
self._modules = {} self._modules = {}
self._names = [] self._names = []
path = _args ['path'] path = _args ['path']

@ -1,6 +1,6 @@
import os import os
import json import json
from info import __version__ from transport.info import __version__
import copy import copy
import transport import transport
import importlib import importlib
@ -12,6 +12,7 @@ from io import StringIO
This class manages data from the registry and allows (read only) This class manages data from the registry and allows (read only)
@TODO: add property to the DATA attribute @TODO: add property to the DATA attribute
""" """
if 'HOME' in os.environ : if 'HOME' in os.environ :
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport']) REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
else: else:
@ -25,7 +26,6 @@ if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
REGISTRY_FILE= 'transport-registry.json' REGISTRY_FILE= 'transport-registry.json'
DATA = {} DATA = {}
def isloaded (): def isloaded ():
return DATA not in [{},None] return DATA not in [{},None]
def exists (path=REGISTRY_PATH,_file=REGISTRY_FILE) : def exists (path=REGISTRY_PATH,_file=REGISTRY_FILE) :

@ -3,7 +3,7 @@ This namespace/package wrap the sql functionalities for a certain data-stores
- netezza, postgresql, mysql and sqlite - netezza, postgresql, mysql and sqlite
- mariadb, redshift (also included) - mariadb, redshift (also included)
""" """
from . import postgresql, mysql, netezza, sqlite, sqlserver, duckdb from . import postgresql, mysql, netezza, sqlite3, sqlserver, duckdb
# #
@ -11,7 +11,7 @@ from . import postgresql, mysql, netezza, sqlite, sqlserver, duckdb
# #
mariadb = mysql mariadb = mysql
redshift = postgresql redshift = postgresql
sqlite3 = sqlite # sqlite3 = sqlite
# from transport import sql # from transport import sql

@ -1,11 +1,14 @@
""" """
This file encapsulates common operations associated with SQL databases via SQLAlchemy This file encapsulates common operations associated with SQL databases via SQLAlchemy
@ENT:
- To support streaming (with generators) we the parameter chunksize which essentially enables streaming
""" """
import sqlalchemy as sqa import sqlalchemy as sqa
from sqlalchemy import text , MetaData, inspect from sqlalchemy import text , MetaData, inspect
import pandas as pd import pandas as pd
def template():
return {'host':'localhost','database':'database','table':'table'}
class Base: class Base:
def __init__(self,**_args): def __init__(self,**_args):
@ -20,6 +23,7 @@ class Base:
_uri,_kwargs = _uri _uri,_kwargs = _uri
self._engine= sqa.create_engine(_uri,**_kwargs,future=True) self._engine= sqa.create_engine(_uri,**_kwargs,future=True)
self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None
def _set_uri(self,**_args) : def _set_uri(self,**_args) :
""" """
:provider provider :provider provider
@ -67,6 +71,7 @@ class Base:
# else: # else:
return [] return []
def has(self,**_args): def has(self,**_args):
return self.meta(**_args) return self.meta(**_args)
def apply(self,sql): def apply(self,sql):
@ -78,10 +83,14 @@ class Base:
""" """
if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'): if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'):
if not self._chunksize:
return pd.read_sql(sql,self._engine) return pd.read_sql(sql,self._engine)
else:
return pd.read_sql(sql,self._engine,chunksize=self._chunksize)
else: else:
_handler = self._engine.connect() _handler = self._engine.connect()
_handler.execute(text(sql)) _handler.execute(text(sql.strip()))
_handler.commit () _handler.commit ()
_handler.close() _handler.close()
return None return None
@ -132,18 +141,21 @@ class BaseReader(SQLBase):
if self._schema and type(self._schema) == str : if self._schema and type(self._schema) == str :
_table = f'{self._schema}.{_table}' _table = f'{self._schema}.{_table}'
sql = f'SELECT * FROM {_table}' sql = f'SELECT * FROM {_table}'
if 'chunksize' in _args :
self._chunksize = int(_args['chunksize'])
return self.apply(sql) return self.apply(sql)
class BaseWriter (SQLBase): class BaseWriter (SQLBase):
""" """
This class implements SQLAlchemy support for Writting to a data-store (RDBMS) This class implements SQLAlchemy support for Writting to a data-store (RDBMS)
""" """
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
def write(self,_data,**_args): def write(self,_data,**_args):
if type(_data) == dict : if type(_data) == dict :
_df = pd.DataFrame([_data]) _df = pd.DataFrame([_data])
elif type(_data) == list : elif type(_data) == list :

@ -3,6 +3,9 @@ This module implements the handler for duckdb (in memory or not)
""" """
from transport.sql.common import Base, BaseReader, BaseWriter from transport.sql.common import Base, BaseReader, BaseWriter
def template ():
return {'database':'path-to-database','table':'table'}
class Duck : class Duck :
def __init__(self,**_args): def __init__(self,**_args):
# #

@ -1,8 +1,11 @@
""" """
This file implements support for mysql and maria db (with drivers mysql+mysql) This file implements support for mysql and maria db (with drivers mysql+mysql)
""" """
from transport.sql.common import BaseReader, BaseWriter from transport.sql.common import BaseReader, BaseWriter, template as _template
# import mysql.connector as my # import mysql.connector as my
def template ():
return dict(_template(),**{'port':3306})
class MYSQL: class MYSQL:
def get_provider(self): def get_provider(self):

@ -1,5 +1,8 @@
import nzpy as nz import nzpy as nz
from transport.sql.common import BaseReader, BaseWriter from transport.sql.common import BaseReader, BaseWriter , template as _template
def template ():
return dict(_template(),**{'port':5480,'chunksize':10000})
class Netezza: class Netezza:
def get_provider(self): def get_provider(self):

@ -1,7 +1,10 @@
from transport.sql.common import BaseReader , BaseWriter from transport.sql.common import BaseReader , BaseWriter, template as _template
from psycopg2.extensions import register_adapter, AsIs from psycopg2.extensions import register_adapter, AsIs
import numpy as np import numpy as np
def template ():
return dict(_template(),**{'port':5432,'chunksize':10000})
register_adapter(np.int64, AsIs) register_adapter(np.int64, AsIs)

@ -1,7 +1,11 @@
import sqlalchemy import sqlalchemy
import pandas as pd import pandas as pd
from transport.sql.common import Base, BaseReader, BaseWriter from transport.sql.common import Base, BaseReader, BaseWriter
class SQLite (BaseReader): from multiprocessing import RLock
def template():
return {'database':'path-to-database','table':'table'}
class SQLite3 :
lock = RLock()
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
if 'path' in _args : if 'path' in _args :
@ -12,7 +16,7 @@ class SQLite (BaseReader):
path = self._database path = self._database
return f'sqlite:///{path}' # ensure this is the correct path for the sqlite file. return f'sqlite:///{path}' # ensure this is the correct path for the sqlite file.
class Reader(SQLite,BaseReader): class Reader(SQLite3,BaseReader):
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
# def read(self,**_args): # def read(self,**_args):
@ -20,6 +24,12 @@ class Reader(SQLite,BaseReader):
# return pd.read_sql(sql,self._engine) # return pd.read_sql(sql,self._engine)
class Writer (SQLite,BaseWriter): class Writer (SQLite3,BaseWriter):
def __init__(self,**_args): def __init__(self,**_args):
super().__init__(**_args) super().__init__(**_args)
def write(self,_data,**_kwargs):
try:
SQLite3.lock.acquire()
super().write(_data,**_kwargs)
finally:
SQLite3.lock.release()

@ -3,10 +3,15 @@ Handling Microsoft SQL Server via pymssql driver/connector
""" """
import sqlalchemy import sqlalchemy
import pandas as pd import pandas as pd
from transport.sql.common import Base, BaseReader, BaseWriter from transport.sql.common import Base, BaseReader, BaseWriter, template as _template
def template ():
return dict(_template(),**{'port':1433})
class MsSQLServer: class MsSQLServer:
def __init__(self,**_args) : def __init__(self,**_args) :
super().__init__(**_args) super().__init__(**_args)
pass pass

@ -3,6 +3,8 @@ import pandas as pd
from .. sql.common import BaseReader , BaseWriter from .. sql.common import BaseReader , BaseWriter
import sqlalchemy as sqa import sqlalchemy as sqa
def template():
return {'host':'localhost','port':8047,'ssl':False,'table':None,'database':None}
class Drill : class Drill :
__template = {'host':None,'port':None,'ssl':None,'table':None,'database':None} __template = {'host':None,'port':None,'ssl':None,'table':None,'database':None}
def __init__(self,**_args): def __init__(self,**_args):

@ -11,6 +11,10 @@ from pyspark.sql.types import *
from pyspark.sql.functions import col, to_date, to_timestamp from pyspark.sql.functions import col, to_date, to_timestamp
import copy import copy
def template():
return {'catalog':None,'database':None,'table':None}
class Iceberg : class Iceberg :
def __init__(self,**_args): def __init__(self,**_args):
""" """

Loading…
Cancel
Save