Compare commits

..

31 Commits
main ... v2.2.0

Author SHA1 Message Date
Steve Nyemba 4c2efc2892 documentation ... readme
5 months ago
Steve Nyemba a31481e196 fix
6 months ago
Steve Nyemba 89d762f39a bug fixes: conditional imports
6 months ago
Steve Nyemba 6e753a1fcd bug fixes
6 months ago
Steve Nyemba 18c54d7664 bug fixes
6 months ago
Steve Nyemba f06d26f9b6 bug fixes:installer & imports
6 months ago
Steve Nyemba be10ae17d7 bug fixes: installer & registry
6 months ago
Steve Nyemba befdf453f5 bug fix: crash with etl & process
6 months ago
Steve Nyemba fbdb4a4931 bug fix: registry and emails
6 months ago
Steve Nyemba 6e1c420952 project file specification
6 months ago
Steve Nyemba 66d881fdda upgrade pyproject.toml, bug fix with registry
6 months ago
Steve Nyemba de4e065ca6 bug fix with newer setuptools
6 months ago
Steve Nyemba e035f5eba0 windows bug fix, environment variable
6 months ago
Steve Nyemba 6f8019f582 bug fix
6 months ago
Steve Nyemba b0cd0b85dc bug fix: logger issue
7 months ago
Steve Nyemba 4b34c746ae bug fix: missing table
9 months ago
Steve Nyemba 0977ad1b18 setup fixes
10 months ago
Steve Nyemba 98ef8a848e bug fixes and dependencies
10 months ago
Steve Nyemba 469c6f89a2 fixes with plugin handler
10 months ago
Steve Nyemba dd10f6db78 bug fix: version & cli
10 months ago
Steve Nyemba dad2956a8c version update
10 months ago
Steve Nyemba eaa2b99a2d bug fix: schema (postgresql) construct
10 months ago
Steve Nyemba a1b5f2743c bug fixes ...
10 months ago
Steve Nyemba afa442ea8d versioning update edition
10 months ago
Steve Nyemba 30645e46bd bug fix: readonly for duckdb
10 months ago
Steve Nyemba cdf783143e ...
10 months ago
Steve Nyemba 1a8112f152 adding iceberg notebook
11 months ago
Steve Nyemba 49ebd4a432 bug fix: close & etl
11 months ago
Steve Nyemba c3627586b3 fix: refactor cli switches
12 months ago
Steve Nyemba 2a72de4cd6 bug fixes: registry and handling cli parameters as well as adding warehousing
12 months ago
Steve Nyemba d0e655e7e3 update, community edition baseline
1 year ago

@ -13,28 +13,20 @@ Data transport is a simple framework that:
## Installation
Within the virtual environment perform the following (the following will install everything):
Within the virtual environment perform the following :
pip install data-transport[all]@git+https://github.com/lnyemba/data-transport.git
pip install git+https://github.com/lnyemba/data-transport.git
Options to install components in square brackets are **nosql**; **cloud**; **other** and **warehouse**
Options to install components in square brackets
pip install data-transport[nosql,cloud,other, warehouse,all]@git+https://github.com/lnyemba/data-transport.git
pip install data-transport[nosql,cloud,warehouse,all]@git+https://github.com/lnyemba/data-transport.git
The components available:
0. sql by default netezza; mysql; postgresql; duckdb; sqlite3; sqlserver
1. nosql mongodb/ferretdb; couchdb
2. cloud s3; bigquery; databricks
3. other files; http; rabbitmq
4. warehouse apache drill; apache iceberg
## Additional features
- Reads are separated from writes to avoid accidental writes.
- Streaming (for large volumes of data) by specifying chunksize
- In addition to read/write, there is support for functions for pre/post processing
- CLI interface to add to registry, run ETL
- Implements best-pracices for collaborative environments like apache zeppelin; jupyterhub; SageMaker; ...
- scales and integrates into shared environments like apache zeppelin; jupyterhub; SageMaker; ...
## Learn More

@ -53,8 +53,10 @@ def wait(jobs):
while jobs :
jobs = [thread for thread in jobs if thread.is_alive()]
time.sleep(1)
def job (_args):
pass
# def wait (jobs):
# while jobs :
# jobs = [pthread for pthread in jobs if pthread.is_alive()]
@app_e.command(name="run")
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"),
@ -157,10 +159,6 @@ def initregistry (email:Annotated[str,typer.Argument(help="email")],
_msg = f"{TIMES_MARK} {e}"
print (_msg)
print ()
@app_r.command(name="add")
def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")],
auth_file:Annotated[str,typer.Argument(help="path of the auth_file")],
@ -179,36 +177,8 @@ def register (label:Annotated[str,typer.Argument(help="unique label that will be
except Exception as e:
_msg = f"""{TIMES_MARK} {e}"""
print (_msg)
@app_r.command(name="template")
def template(name:Annotated[str,typer.Argument(help="database technology provider" ) ]):
"""
This function will generate a template entry for the registry (content of an auth file)
"""
#
# retrieve the provider and display the template if it has one
for _module in ['sql','cloud','warehouse','nosql','other'] :
ref = getattr(transport,_module) if hasattr(transport,_module) else None
_entry = {}
if ref :
if hasattr(ref,name) :
_pointer = getattr(ref,name)
_entry = dict({'provider':name},**_pointer.template()) if hasattr(_pointer,'template') else {}
break
#
#
print ( json.dumps(_entry))
pass
@app_r.command(name="list")
def register_list ():
"""
This function will list existing registry entries and basic information {label,vendor}
"""
# print (transport.registry.DATA)
_reg = transport.registry.DATA
_data = [{'label':key,'provider':_reg[key]['provider']} for key in _reg if 'provider' in _reg[key]]
_data = pd.DataFrame(_data)
print (_data)
pass
@app_x.command(name='add')
def register_plugs (
alias:Annotated[str,typer.Argument(help="unique function name within a file")],

@ -1,2 +0,0 @@
cd /D "%~dp0"
python transport %1 %2 %3 %4 %5 %6

@ -1,8 +1,8 @@
__app_name__ = 'data-transport'
__author__ = 'The Phi Technology'
__version__= '2.4.30'
__edition__= 'enterprise'
__version__= '2.2.22'
__email__ = "info@the-phi.com"
__edition__= 'community'
__license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba
@ -20,6 +20,4 @@ __whatsnew__=f"""version {__version__},
3. support for streaming data, important to use this with large volumes of data
"""

@ -0,0 +1,138 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Writing to Apache Iceberg\n",
"\n",
"1. Insure you have a Google Bigquery service account key on disk\n",
"2. The service key location is set as an environment variable **BQ_KEY**\n",
"3. The dataset will be automatically created within the project associated with the service key\n",
"\n",
"The cell below creates a dataframe that will be stored within Google Bigquery"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['data transport version ', '2.4.0']\n"
]
}
],
"source": [
"#\n",
"# Writing to Google Bigquery database\n",
"#\n",
"import transport\n",
"from transport import providers\n",
"import pandas as pd\n",
"import os\n",
"\n",
"PRIVATE_KEY = os.environ['BQ_KEY'] #-- location of the service key\n",
"DATASET = 'demo'\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"# bqw = transport.get.writer(provider=providers.ICEBERG,catalog='mz',database='edw.mz',table='friends')\n",
"bqw = transport.get.writer(provider=providers.ICEBERG,table='edw.mz.friends')\n",
"bqw.write(_data,if_exists='replace') #-- default is append\n",
"print (['data transport version ', transport.__version__])\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Reading from Google Bigquery\n",
"\n",
"The cell below reads the data that has been written by the cell above and computes the average age within a Google Bigquery (simple query). \n",
"\n",
"- Basic read of the designated table (friends) created above\n",
"- Execute an aggregate SQL against the table\n",
"\n",
"**NOTE**\n",
"\n",
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" name age\n",
"0 James Bond 55\n",
"1 Steve Rogers 150\n",
"2 Steve Nyemba 44\n",
"--------- STATISTICS ------------\n"
]
}
],
"source": [
"\n",
"import transport\n",
"from transport import providers\n",
"import os\n",
"PRIVATE_KEY=os.environ['BQ_KEY']\n",
"pgr = transport.get.reader(provider=providers.ICEBERG,database='edw.mz')\n",
"_df = pgr.read(table='friends')\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
"_sdf = pgr.read(sql=_query)\n",
"print (_df)\n",
"print ('--------- STATISTICS ------------')\n",
"# print (_sdf)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"\n",
"1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"\n",
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -14,7 +14,7 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 1,
"metadata": {},
"outputs": [
{
@ -58,7 +58,7 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 2,
"metadata": {},
"outputs": [
{
@ -103,7 +103,7 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 4,
"metadata": {},
"outputs": [
{
@ -131,28 +131,16 @@
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Streaming Large Volumes of Data\n",
"\n",
"It is recommended for large volumes of data to stream the data using **chunksize** as a parameter \n",
"\n",
"1. in the **read** method \n",
"2. or **transport.get.reader(\\*\\*...,chunksize=1000)**\n",
"\n",
"Use streaming because with large volumes of data some databases limit the volume of data for a single transaction in order to efficiently guarantee maintain **data integrity**"
]
},
{
"cell_type": "markdown",
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "python (3.10.12)",
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
@ -166,7 +154,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
"version": "3.9.7"
}
},
"nbformat": 4,

@ -39,14 +39,14 @@ Homepage = "https://healthcareio.the-phi.com/git/code/transport.git"
[tool.setuptools]
include-package-data = true
zip-safe = false
script-files = ["bin/transport","bin/transport.cmd"]
script-files = ["bin/transport"]
[tool.setuptools.packages.find]
include = [ "transport", "transport.*"]
include = ["info","info.*", "transport", "transport.*"]
[tool.setuptools.dynamic]
version = {attr = "transport.info.__version__"}
#authors = {attr = "transport.__author__"}
version = {attr = "info.__version__"}
#authors = {attr = "meta.__author__"}
# If you have a info.py file, you might also want to include the author dynamically:
# [tool.setuptools.dynamic]

@ -31,17 +31,18 @@ except Exception as e:
try:
from transport import warehouse
except Exception as e:
warehouse= {}
warehouse = {}
try:
from transport import other
except Exception as e:
other = {}
import pandas as pd
import json
import os
from transport.info import __version__,__author__,__email__,__license__,__app_name__,__whatsnew__,__edition__
from info import __version__,__author__,__email__,__license__,__app_name__,__whatsnew__,__edition__
from transport.iowrapper import IWriter, IReader, IETL
from transport.plugins import PluginLoader
from transport import providers

@ -14,11 +14,7 @@ import numpy as np
import time
MAX_CHUNK = 2000000
def template ():
return {'provider':'bigquery','private_key':'path-to-key','dataset':'name-of-dataset','table':'table','chunksize':MAX_CHUNK}
class BigQuery:
__template__= {"private_key":None,"dataset":None,"table":None}
def __init__(self,**_args):
path = _args['service_key'] if 'service_key' in _args else _args['private_key']
self.credentials = service_account.Credentials.from_service_account_file(path)
@ -27,7 +23,6 @@ class BigQuery:
self.dtypes = _args['dtypes'] if 'dtypes' in _args else None
self.table = _args['table'] if 'table' in _args else None
self.client = bq.Client.from_service_account_json(self.path)
self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None
def meta(self,**_args):
"""
This function returns meta data for a given table or query with dataset/table properly formatted
@ -86,13 +81,6 @@ class Reader (BigQuery):
if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset:
SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset)
_info = {'credentials':self.credentials,'dialect':'standard'}
#
# @Ent-Feature : adding streaming capability here
#
if 'chunksize' in _args :
self._chunksize = int(_args['chunksize'])
if self._chunksize :
_info['chunksize'] = self._chunksize
return pd_gbq.read_gbq(SQL,**_info) if SQL else None
# return self.client.query(SQL).to_dataframe() if SQL else None

@ -17,8 +17,6 @@ import sqlalchemy
# from transport.common import Reader,Writer
import pandas as pd
def template ():
return {'provider':'databricks','host':'fqn-host','token':'token','cluster_path':'path-of-cluster','catalog':'name-of-catalog','database':'schema-or-database','table':'table','chunksize':10000}
class Bricks:
"""
@ -28,7 +26,6 @@ class Bricks:
:cluster_path
:table
"""
__template__ = {"host":None,"token":None,"cluster_path":None,"catalog":None,"schema":None}
def __init__(self,**_args):
_host = _args['host']
_token= _args['token']
@ -44,7 +41,6 @@ class Bricks:
_uri = f'''databricks+connector://token:{_token}@{_host}?http_path={_cluster_path}&catalog={_catalog}&schema={self._schema}'''
self._engine = sqlalchemy.create_engine (_uri)
self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None
pass
def meta(self,**_args):
table = _args['table'] if 'table' in _args else self._table
@ -67,14 +63,7 @@ class Bricks:
def apply(self,_sql):
try:
if _sql.lower().startswith('select') :
#
# @ENT-Feature: adding streaming functions/variables
if not self._chunksize :
return pd.read_sql(_sql,self._engine)
else:
return pd.read_sql(_sql,self._engine,chunksize=self._chunksize)
return pd.read_sql(_sql,self._engine)
except Exception as e:
pass
@ -94,10 +83,7 @@ class Reader(Bricks):
sql = f'SELECT * FROM {table}'
if limit :
sql = sql + f' LIMIT {limit}'
#
# @ENT-Feature: adding streaming functions/variables
if 'chunksize' in _args :
self._chunksize = int(_args['chunksize'])
if 'sql' in _args or 'table' in _args :
return self.apply(sql)
else:

@ -8,10 +8,8 @@ import pandas as pd
from io import StringIO
import json
import nextcloud_client as nextcloud
def template():
return {"url":None,"token":None,"uid":None,"file":None}
class Nextcloud :
__template__={"url":None,"token":None,"uid":None,"file":None}
def __init__(self,**_args):
pass
self._delimiter = None

@ -20,14 +20,10 @@ from io import StringIO
import pandas as pd
import json
def template():
return {'access_key':'access-key','secret_key':'secret-key','region':'region','bucket':'name-of-bucket','file':'file-name','chunksize':10000}
class s3 :
"""
@TODO: Implement a search function for a file given a bucket??
"""
__template__={"access_key":None,"secret_key":None,"bucket":None,"file":None,"region":None}
def __init__(self,**args) :
"""
This function will extract a file or set of files from s3 bucket provided
@ -41,7 +37,6 @@ class s3 :
self._bucket_name = args['bucket']
self._file_name = args['file']
self._region = args['region']
self._chunksize = int(args['chunksize']) if 'chunksize' in args else None
except Exception as e :
print (e)
pass
@ -93,10 +88,7 @@ class Reader(s3) :
if not _stream :
return None
if _object['ContentType'] in ['text/csv'] :
if not self._chunksize :
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")))
else:
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")),chunksize=self._chunksize)
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")))
else:
return _stream

@ -0,0 +1,19 @@
"""
This file will be intended to handle duckdb database
"""
import duckdb
from transport.common import Reader,Writer
class Duck(Reader):
def __init__(self,**_args):
super().__init__(**_args)
self._path = None if 'path' not in _args else _args['path']
self._handler = duckdb.connect() if not self._path else duckdb.connect(self._path)
class DuckReader(Duck) :
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args) :
pass

@ -1,25 +0,0 @@
__app_name__ = 'data-transport'
__author__ = 'Steve L. Nyemba'
__version__= '2.4.34'
__edition__= 'enterprise'
__email__ = "info@the-phi.com"
__license__=f"""
Copyright 2010 - 2024, Steve L. Nyemba
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the Software), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED AS IS, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
__whatsnew__=f"""version {__version__},
1. Added support for read/write logs as well as plugins (when applied)
2. Bug fix with duckdb (adding readonly) for readers because there are issues with threads & processes
3. support for streaming data, important to use this with large volumes of data
"""

@ -5,113 +5,42 @@ NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading o
- upon initialization we will load plugins
- on read/write we apply a pipeline (if passed as an argument)
"""
from transport.plugins import PluginLoader
from transport.plugins import Plugin, PluginLoader
import transport
from transport import providers
from multiprocessing import Process, RLock
from multiprocessing import Process
import time
import types
from . import registry
from datetime import datetime
import pandas as pd
import numpy as np
import os
import sys
import itertools
import json
import plugin_ix
class BaseIO :
def __init__(self,**_args):
self._logger = _args['logger'] if 'logger' in _args else None
self._logTable = 'logs' if 'logTable' not in _args else _args['logTable']
def setLogger(self,_logger):
self._logger = _logger
def log (self,**_args):
if self._logger :
_date = str(datetime.now())
_data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args)
for key in _data :
if type(_data[key]) == list :
_data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]]
_data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key])
self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
class IO(BaseIO):
class IO:
"""
Base wrapper class for read/write and support for logs
"""
def __init__(self,**_args):
#
# We need to initialize the logger here ...
#
super().__init__(**_args)
_agent = _args['agent']
plugins = _args['plugins'] if 'plugins' else None
# _logger = _args['logger'] if 'logger' in _args else None
# self._logger = _logger if not type(_agent) in [IReader,IWriter] else _agent._logger #transport.get.writer(label='logger') #if registry.has('logger') else None
# if not _logger and hasattr(_agent,'_logger') :
# self._logger = getattr(_agent,'_logger')
plugins = _args['plugins'] if 'plugins' in _args else None
self._agent = _agent
_date = _date = str(datetime.now())
# self._ixloader = plugin_ix.Loader () #-- must indicate where the plugin registry file is
self._ixloader = plugin_ix.Loader (registry=plugin_ix.Registry(folder=transport.registry.REGISTRY_PATH))
# self._logTable = 'logs' #'_'.join(['logs',_date[:10]+_date[11:19]]).replace(':','').replace('-','_')
if plugins :
self.init_plugins(plugins)
# def setLogger(self,_logger):
# self._logger = _logger
# def log (self,**_args):
# if self._logger :
# _date = str(datetime.now())
# _data = dict({'pid':os.getpid(),'date':_date[:10],'time':_date[11:19]},**_args)
# for key in _data :
# if type(_data[key]) == list :
# _data[key] = [_item.__name__ if type(_item).__name__== 'function' else _item for _item in _data[key]]
# _data[key] = str(_data[key]) if type(_data[key]) not in [list,dict] else json.dumps(_data[key])
# self._logger.write(pd.DataFrame([_data])) #,table=self._logTable)
# def _init_plugins(self,_items):
# """
# This function will load pipelined functions as a plugin loader
# """
# registry.plugins.init()
# self._plugins = PluginLoader(registry=registry.plugins)
# [self._plugins.set(_name) for _name in _items]
# self.log(action='init-plugins',object=self.getClassName(self),input =[_name for _name in _items])
# # if 'path' in _args and 'names' in _args :
# # self._plugins = PluginLoader(**_args)
# # else:
# # self._plugins = PluginLoader(registry=registry.plugins)
# # [self._plugins.set(_pointer) for _pointer in _args]
# #
# # @TODO: We should have a way to log what plugins are loaded and ready to use
def meta (self,**_args):
if hasattr(self._agent,'meta') :
return self._agent.meta(**_args)
return []
def getClassName (self,_object):
return '.'.join([_object.__class__.__module__,_object.__class__.__name__])
def close(self):
if hasattr(self._agent,'close') :
self._agent.close()
def apply(self):
"""
applying pre/post conditions given a pipeline expression
"""
for _pointer in self._plugins :
_data = _pointer(_data)
time.sleep(1)
# def apply(self):
# """
# applying pre/post conditions given a pipeline expression
# """
# for _pointer in self._plugins :
# _data = _pointer(_data)
def apply(self,_query):
if hasattr(self._agent,'apply') :
return self._agent.apply(_query)
@ -133,158 +62,70 @@ class IReader(IO):
"""
def __init__(self,**_args):
super().__init__(**_args)
self._args = _args['args']if 'args' in _args else None
def _stream (self,_data ):
# self.log(action='streaming',object=self._agent._engine.name, input= type(_data).__name__)
_shape = []
for _segment in _data :
_shape += list(_segment.shape)
if self._plugins :
# yield self._plugins.apply(_segment,self.log)
yield self._ixloader.visitor(_data,self.log)
else:
yield _segment
_objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__])
_input = {'shape':_shape}
if hasattr(self._agent,'_table') :
_input['table'] = self._agent._table
self.log(action='streaming',object=_objectName, input= _input)
def read(self,**_args):
if 'plugins' in _args :
self.init_plugins(_args['plugins'])
if self._args :
_data = self._agent.read(**self._args)
else:
_data = self._agent.read(**_args)
_objectName = '.'.join([self._agent.__class__.__module__,self._agent.__class__.__name__])
if types.GeneratorType == type(_data):
return self._stream(_data)
# if self._plugins :
# return self._stream(_data)
# else:
# _count = 0
# for _segment in _data :
# _count += 1
# yield _segment
# self.log(action='streaming',object=_objectName, input= {'segments':_count})
# return _data
elif type(_data) == pd.DataFrame :
_shape = _data.shape #[0,0] if not _data.shape[] else list(_data.shape)
_input = {'shape':_shape}
if hasattr(self._agent,'_table') :
_input['table'] = self._agent._table
self.log(action='read',object=_objectName, input=_input)
_data = self._ixloader.visitor(_data)
return _data
_data = self._agent.read(**_args)
# if self._plugins and self._plugins.ratio() > 0 :
# _data = self._plugins.apply(_data)
#
# output data
#
# applying the the design pattern
_data = self._ixloader.visitor(_data)
return _data
class IWriter(IO):
lock = RLock()
def __init__(self,**_args):
super().__init__(**_args)
def __init__(self,**_args): #_agent,pipeline=None):
super().__init__(**_args) #_agent,pipeline)
def write(self,_data,**_args):
# if 'plugins' in _args :
# self._init_plugins(_args['plugins'])
if 'plugins' in _args :
self._init_plugins(_args['plugins'])
# if self._plugins and self._plugins.ratio() > 0 :
# _logs = []
# _data = self._plugins.apply(_data,_logs,self.log)
self.init_plugins(_args['plugins'])
# [self.log(**_item) for _item in _logs]
try:
# IWriter.lock.acquire()
_data = self._ixloader.visitor(_data)
self._agent.write(_data,**_args)
finally:
# IWriter.lock.release()
pass
self._ixloader.visitor(_data)
self._agent.write(_data,**_args)
#
# The ETL object in its simplest form is an aggregation of read/write objects
# @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process
class IETL(BaseIO) :
class IETL(IReader) :
"""
This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
"""
def __init__(self,**_args):
super().__init__()
self._source = _args['source']
self._targets= _args['target'] if type(_args['target']) == list else [_args['target']]
#
# ETL Initialization, we should provide some measure of context ...
#
# def run(self) :
# """
# We should apply the etl here, if we are in multiprocessing mode
# """
# return self.read()
def run(self,**_args):
# _data = super().read(**_args) if not self._sourceArgs else super().read(**self._sourceArgs)
# self._targets = [transport.get.writer(**_kwargs) for _kwargs in self._targets]
_reader = transport.get.reader(**self._source)
if hasattr(_reader,'_logger') :
self.setLogger(_reader._logger)
self.log(action='init-etl',input={'source':self._source,'target':self._targets})
_data = _reader.read(**self._source['args'])if 'args' in self._source else _reader.read()
_reader.close()
_writers = [transport.get.writer(**_kwargs) for _kwargs in self._targets]
# _schema = [] if not getattr(_reader._agent,'_table') else _reader.meta()
_schema = [] if not hasattr(_reader._agent,'_table') else _reader.meta()
if types.GeneratorType == type(_data):
_index = 0
for _segment in _data :
_index += 1
for _writer in _writers :
self.post(_segment,writer=_writer,index=_index,schema=_schema)
time.sleep(1)
super().__init__(agent=transport.get.reader(**_args['source']),plugins=None)
if 'target' in _args:
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
else:
self._targets = []
self.jobs = []
#
# If the parent is already multiprocessing
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
def read(self,**_args):
_data = super().read(**_args)
_schema = super().meta()
for _kwargs in self._targets :
if _schema :
_kwargs['schema'] = _schema
self.post(_data,**_kwargs)
for _writer in _writers :
self.post(_data,writer=_writer,schema=_schema)
# pass
return _data
# return _data
def run(self) :
return self.read()
def post (self,_data,**_args) :
"""
This function returns an instance of a process that will perform the write operation
:_args parameters associated with writer object
"""
#writer = transport.get.writer(**_args)
_input = {}
try:
_action = 'post'
_shape = dict(zip(['rows','columns'],_data.shape))
_index = _args['index'] if 'index' in _args else 0
writer = _args['writer']
_schema= _args['schema']
#
# -- things to log
_input = {'shape':_shape,'segment':_index}
if hasattr(writer._agent,'_table'):
_input['table'] = writer._agent._table
for _item in _schema :
if _item['type'] in ['INTEGER','BIGINT','INT'] :
_column = _item['name']
_data[_column] = _data[_column].copy().fillna(0).astype(np.int64)
writer = transport.get.writer(**_args)
if 'schema' in _args :
writer.write(_data,schema=_args['schema'])
else:
writer.write(_data)
except Exception as e:
_action = 'post-error'
_input['error'] = str(e)
print ([e])
pass
self.log(action=_action,object=writer._agent.__module__, input= _input)
writer.close()

@ -11,8 +11,7 @@ import sys
# from transport.common import Reader, Writer
from datetime import datetime
def template():
return {'dbname':'database','doc':'document','username':'username','password':'password','url':'url-with-port'}
class Couch:
"""
This class is a wrapper for read/write against couchdb. The class captures common operations for read/write.
@ -20,7 +19,6 @@ class Couch:
@param doc user id involved
@param dbname database name (target)
"""
__template__={"url":None,"doc":None,"dbname":None,"username":None,"password":None}
def __init__(self,**args):
url = args['url'] if 'url' in args else 'http://localhost:5984'
self._id = args['doc']

@ -20,15 +20,11 @@ import re
from multiprocessing import Lock, RLock
from transport.common import IEncoder
def template():
return {'provider':'mongodb','host':'localhost','port':27017,'db':'db-name','collection':'collection-name','username':'username','password':'password','mechanism':'SCRAM-SHA-256'}
class Mongo :
lock = RLock()
"""
Basic mongodb functions are captured here
"""
__template__={"db":None,"collection":None,"host":None,"port":None,"username":None,"password":None}
def __init__(self,**args):
"""
:dbname database name/identifier

@ -9,7 +9,7 @@ import numpy as np
import pandas as pd
class Writer :
lock = None
lock = Lock()
_queue = {'default':queue.Queue()}
def __init__(self,**_args):
self._cache = {}

@ -4,10 +4,6 @@ This file is a wrapper around pandas built-in functionalities to handle characte
import pandas as pd
import numpy as np
import os
def template():
return {'path':None,'delimiter':None}
class File :
def __init__(self,**params):
"""
@ -16,7 +12,7 @@ class File :
"""
self.path = params['path'] if 'path' in params else None
self.delimiter = params['delimiter'] if 'delimiter' in params else ','
self._chunksize = None if 'chunksize' not in params else int(params['chunksize'])
def isready(self):
return os.path.exists(self.path)
def meta(self,**_args):
@ -30,19 +26,11 @@ class Reader (File):
def __init__(self,**_args):
super().__init__(**_args)
def _stream(self,path) :
reader = pd.read_csv(path,sep=self.delimiter,chunksize=self._chunksize,low_memory=False)
for segment in reader :
yield segment
def read(self,**args):
_path = self.path if 'path' not in args else args['path']
_delimiter = self.delimiter if 'delimiter' not in args else args['delimiter']
_df = pd.read_csv(_path,sep=self.delimiter) if not self._chunksize else self._stream(_path)
if 'query' in args :
_query = args['query']
_df = _df.query(_query)
return _df
return pd.read_csv(_path,delimiter=self.delimiter)
def stream(self,**args):
raise Exception ("streaming needs to be implemented")
class Writer (File):

@ -7,8 +7,6 @@ import requests
from io import StringIO
import pandas as pd
def template():
return {'url':None,'headers':{'key':'value'}}
class Reader:
"""

@ -17,10 +17,6 @@ import sys
# from common import Reader, Writer
import json
from multiprocessing import RLock
def template():
return {'port':5672,'host':'localhost','queue':None,'vhost':None,'username':None,'password':None}
class MessageQueue:
"""
This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)

@ -59,6 +59,9 @@ class PluginLoader :
pass
def load (self,**_args):
"""
This function loads a plugin
"""
self._modules = {}
self._names = []
path = _args ['path']

@ -1,6 +1,6 @@
import os
import json
from transport.info import __version__
from info import __version__
import copy
import transport
import importlib
@ -12,7 +12,6 @@ from io import StringIO
This class manages data from the registry and allows (read only)
@TODO: add property to the DATA attribute
"""
if 'HOME' in os.environ :
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
else:
@ -26,6 +25,7 @@ if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
REGISTRY_FILE= 'transport-registry.json'
DATA = {}
def isloaded ():
return DATA not in [{},None]
def exists (path=REGISTRY_PATH,_file=REGISTRY_FILE) :

@ -3,7 +3,7 @@ This namespace/package wrap the sql functionalities for a certain data-stores
- netezza, postgresql, mysql and sqlite
- mariadb, redshift (also included)
"""
from . import postgresql, mysql, netezza, sqlite3, sqlserver, duckdb
from . import postgresql, mysql, netezza, sqlite, sqlserver, duckdb
#
@ -11,7 +11,7 @@ from . import postgresql, mysql, netezza, sqlite3, sqlserver, duckdb
#
mariadb = mysql
redshift = postgresql
# sqlite3 = sqlite
sqlite3 = sqlite
# from transport import sql

@ -1,14 +1,11 @@
"""
This file encapsulates common operations associated with SQL databases via SQLAlchemy
@ENT:
- To support streaming (with generators) we the parameter chunksize which essentially enables streaming
"""
import sqlalchemy as sqa
from sqlalchemy import text , MetaData, inspect
import pandas as pd
def template():
return {'host':'localhost','database':'database','table':'table'}
class Base:
def __init__(self,**_args):
@ -23,7 +20,6 @@ class Base:
_uri,_kwargs = _uri
self._engine= sqa.create_engine(_uri,**_kwargs,future=True)
self._chunksize = int(_args['chunksize']) if 'chunksize' in _args else None
def _set_uri(self,**_args) :
"""
:provider provider
@ -71,7 +67,6 @@ class Base:
# else:
return []
def has(self,**_args):
return self.meta(**_args)
def apply(self,sql):
@ -83,14 +78,10 @@ class Base:
"""
if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'):
if not self._chunksize:
return pd.read_sql(sql,self._engine)
else:
return pd.read_sql(sql,self._engine,chunksize=self._chunksize)
return pd.read_sql(sql,self._engine)
else:
_handler = self._engine.connect()
_handler.execute(text(sql.strip()))
_handler.execute(text(sql))
_handler.commit ()
_handler.close()
return None
@ -141,21 +132,18 @@ class BaseReader(SQLBase):
if self._schema and type(self._schema) == str :
_table = f'{self._schema}.{_table}'
sql = f'SELECT * FROM {_table}'
if 'chunksize' in _args :
self._chunksize = int(_args['chunksize'])
return self.apply(sql)
class BaseWriter (SQLBase):
"""
This class implements SQLAlchemy support for Writting to a data-store (RDBMS)
"""
def __init__(self,**_args):
super().__init__(**_args)
def write(self,_data,**_args):
if type(_data) == dict :
_df = pd.DataFrame([_data])
elif type(_data) == list :

@ -3,9 +3,6 @@ This module implements the handler for duckdb (in memory or not)
"""
from transport.sql.common import Base, BaseReader, BaseWriter
def template ():
return {'database':'path-to-database','table':'table'}
class Duck :
def __init__(self,**_args):
#

@ -1,11 +1,8 @@
"""
This file implements support for mysql and maria db (with drivers mysql+mysql)
"""
from transport.sql.common import BaseReader, BaseWriter, template as _template
from transport.sql.common import BaseReader, BaseWriter
# import mysql.connector as my
def template ():
return dict(_template(),**{'port':3306})
class MYSQL:
def get_provider(self):

@ -1,8 +1,5 @@
import nzpy as nz
from transport.sql.common import BaseReader, BaseWriter , template as _template
def template ():
return dict(_template(),**{'port':5480,'chunksize':10000})
from transport.sql.common import BaseReader, BaseWriter
class Netezza:
def get_provider(self):

@ -1,10 +1,7 @@
from transport.sql.common import BaseReader , BaseWriter, template as _template
from transport.sql.common import BaseReader , BaseWriter
from psycopg2.extensions import register_adapter, AsIs
import numpy as np
def template ():
return dict(_template(),**{'port':5432,'chunksize':10000})
register_adapter(np.int64, AsIs)

@ -1,11 +1,7 @@
import sqlalchemy
import pandas as pd
from transport.sql.common import Base, BaseReader, BaseWriter
from multiprocessing import RLock
def template():
return {'database':'path-to-database','table':'table'}
class SQLite3 :
lock = RLock()
class SQLite (BaseReader):
def __init__(self,**_args):
super().__init__(**_args)
if 'path' in _args :
@ -16,7 +12,7 @@ class SQLite3 :
path = self._database
return f'sqlite:///{path}' # ensure this is the correct path for the sqlite file.
class Reader(SQLite3,BaseReader):
class Reader(SQLite,BaseReader):
def __init__(self,**_args):
super().__init__(**_args)
# def read(self,**_args):
@ -24,12 +20,6 @@ class Reader(SQLite3,BaseReader):
# return pd.read_sql(sql,self._engine)
class Writer (SQLite3,BaseWriter):
class Writer (SQLite,BaseWriter):
def __init__(self,**_args):
super().__init__(**_args)
def write(self,_data,**_kwargs):
try:
SQLite3.lock.acquire()
super().write(_data,**_kwargs)
finally:
SQLite3.lock.release()

@ -3,15 +3,10 @@ Handling Microsoft SQL Server via pymssql driver/connector
"""
import sqlalchemy
import pandas as pd
from transport.sql.common import Base, BaseReader, BaseWriter, template as _template
def template ():
return dict(_template(),**{'port':1433})
from transport.sql.common import Base, BaseReader, BaseWriter
class MsSQLServer:
def __init__(self,**_args) :
super().__init__(**_args)
pass

@ -3,8 +3,6 @@ import pandas as pd
from .. sql.common import BaseReader , BaseWriter
import sqlalchemy as sqa
def template():
return {'host':'localhost','port':8047,'ssl':False,'table':None,'database':None}
class Drill :
__template = {'host':None,'port':None,'ssl':None,'table':None,'database':None}
def __init__(self,**_args):

@ -11,10 +11,6 @@ from pyspark.sql.types import *
from pyspark.sql.functions import col, to_date, to_timestamp
import copy
def template():
return {'catalog':None,'database':None,'table':None}
class Iceberg :
def __init__(self,**_args):
"""

Loading…
Cancel
Save