Compare commits

..

No commits in common. 'master' and 'v2.0' have entirely different histories.
master ... v2.0

@ -8,7 +8,7 @@ Mostly data scientists that don't really care about the underlying database and
1. Familiarity with **pandas data-frames**
2. Connectivity **drivers** are included
3. Reading/Writing data from various sources
3. Mining data from various sources
4. Useful for data migrations or **ETL**
@ -18,20 +18,6 @@ Within the virtual environment perform the following :
pip install git+https://github.com/lnyemba/data-transport.git
## Features
- read/write from over a dozen databases
- run ETL jobs seamlessly
- scales and integrates into shared environments like apache zeppelin; jupyterhub; SageMaker; ...
## What's new
Unlike older versions 2.0 and under, we focus on collaborative environments like jupyter-x servers; apache zeppelin:
1. Simpler syntax to create reader or writer
2. auth-file registry that can be referenced using a label
3. duckdb support
## Learn More

@ -13,6 +13,29 @@ The above copyright notice and this permission notice shall be included in all c
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Usage :
transport help -- will print this page
transport move <path> [index]
<path> path to the configuration file
<index> optional index within the configuration file
e.g: configuration file (JSON formatted)
- single source to a single target
{"source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"}
"target":{"provider":"sqlite3","path":"transport-demo.sqlite","table":"agreement"}
}
- single source to multiple targets
{
"source":{"provider":"http","url":"https://cdn.wsform.com/wp-content/uploads/2020/06/agreement.csv"},
"target":[
{"provider":"sqlite3","path":"transport-demo.sqlite","table":"agreement},
{"provider":"mongodb","db":"transport-demo","collection":"agreement"}
]
}
"""
import pandas as pd
import numpy as np
@ -21,31 +44,15 @@ import sys
import transport
import time
from multiprocessing import Process
import typer
import os
import transport
# from transport import etl
from transport.iowrapper import IETL
from transport import etl
# from transport import providers
import typer
from typing_extensions import Annotated
from typing import Optional
import time
from termcolor import colored
from enum import Enum
from rich import print
import plugin_ix as pix
app = typer.Typer()
app_e = typer.Typer() #-- handles etl (run, generate)
app_x = typer.Typer() #-- handles plugins (list,add, test)
app_i = typer.Typer() #-- handles information (version, license)
app_r = typer.Typer() #-- handles registry
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
REGISTRY_FILE= 'transport-registry.json'
CHECK_MARK = '[ [green]\u2713[/green] ]' #' '.join(['[',colored(u'\u2713', 'green'),']'])
TIMES_MARK= '[ [red]\u2717[/red] ]' #' '.join(['[',colored(u'\u2717','red'),']'])
# @app.command()
def help() :
print (__doc__)
@ -53,51 +60,30 @@ def wait(jobs):
while jobs :
jobs = [thread for thread in jobs if thread.is_alive()]
time.sleep(1)
def wait (jobs):
while jobs :
jobs = [pthread for pthread in jobs if pthread.is_alive()]
@app_e.command(name="run")
def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
index:int = typer.Option(default= None, help="index of the item of interest, otherwise everything in the file will be processed"),
batch:int = typer.Option(default=5, help="The number of parallel processes to run at once")
):
@app.command(name="apply")
def apply (path,index=None):
"""
This function applies data transport ETL feature to read data from one source to write it one or several others
This function applies data transport from one source to one or several others
:path path of the configuration file
:index index of the _item of interest (otherwise everything will be processed)
"""
# _proxy = lambda _object: _object.write(_object.read())
_proxy = lambda _object: _object.write(_object.read())
if os.path.exists(path):
file = open(path)
_config = json.loads (file.read() )
file.close()
if index is not None:
_config = [_config[ int(index)]]
jobs = []
for _args in _config :
# pthread = etl.instance(**_args) #-- automatically starts the process
def bootup ():
_worker = IETL(**_args)
_worker.run()
pthread = Process(target=bootup)
pthread.start()
jobs.append(pthread)
if len(jobs) == batch :
wait(jobs)
jobs = []
if jobs :
wait (jobs)
#
# @TODO: Log the number of processes started and estfrom transport impfrom transport impimated time
# while jobs :
# jobs = [pthread for pthread in jobs if pthread.is_alive()]
# time.sleep(1)
#
# @TODO: Log the job termination here ...
@app_i.command(name="supported")
def supported (format:Annotated[str,typer.Argument(help="format of the output, supported formats are (list,table,json)")]="table") :
if index :
_config = _config[ int(index)]
etl.instance(**_config)
else:
etl.instance(config=_config)
@app.command(name="providers")
def supported (format:str="table") :
"""
This function will print supported database technologies
This function will print supported providers and their associated classifications
"""
_df = (transport.supported())
if format in ['list','json'] :
@ -105,27 +91,12 @@ def supported (format:Annotated[str,typer.Argument(help="format of the output, s
else:
print (_df)
print ()
@app_i.command(name="version")
def version ():
"""
This function will return the version of the data-transport
"""
print()
print (f'[bold] {transport.__app_name__} ,[blue] {transport.__edition__} edition [/blue], version {transport.__version__}[/bold]')
print ()
@app_i.command(name="license")
def info():
"""
This function will display version and license information
"""
print()
print (f'[bold] {transport.__app_name__} ,{transport.__edition__}, version {transport.__version__}[/bold]')
print ()
print (transport.__license__)
@app_e.command()
def generate (path:Annotated[str,typer.Argument(help="path of the ETL configuration file template (name included)")]):
@app.command()
def version():
print (transport.version.__version__)
@app.command()
def generate (path:str):
"""
This function will generate a configuration template to give a sense of how to create one
"""
@ -133,115 +104,44 @@ def generate (path:Annotated[str,typer.Argument(help="path of the ETL configurat
{
"source":{"provider":"http","url":"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv"},
"target":
[{"provider":"files","path":"addresses.csv","delimiter":","},{"provider":"sqlite3","database":"sample.db3","table":"addresses"}]
[{"provider":"file","path":"addresses.csv","delimiter":"csv"},{"provider":"sqlite","database":"sample.db3","table":"addresses"}]
}
]
file = open(path,'w')
file.write(json.dumps(_config))
file.close()
print (f"""{CHECK_MARK} Successfully generated a template ETL file at [bold]{path}[/bold]""" )
print ("""NOTE: Each line (source or target) is the content of an auth-file""")
@app_r.command(name="reset")
def initregistry (email:Annotated[str,typer.Argument(help="email")],
path:str=typer.Option(default=REGISTRY_PATH,help="path or location of the configuration file"),
override:bool=typer.Option(default=False,help="override existing configuration or not")):
"""
This functiion will initialize the data-transport registry and have both application and calling code loading the database parameters by a label
"""
try:
transport.registry.init(email=email, path=path, override=override)
_msg = f"""{CHECK_MARK} Successfully wrote configuration to [bold]{path}[/bold] from [bold]{email}[/bold]"""
except Exception as e:
_msg = f"{TIMES_MARK} {e}"
print (_msg)
print ()
@app_r.command(name="add")
def register (label:Annotated[str,typer.Argument(help="unique label that will be used to load the parameters of the database")],
auth_file:Annotated[str,typer.Argument(help="path of the auth_file")],
default:bool=typer.Option(default=False,help="set the auth_file as default"),
path:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")):
"""
This function add a database label for a given auth-file. which allows access to the database using a label of your choice.
"""
try:
if transport.registry.exists(path) :
transport.registry.set(label=label,auth_file=auth_file, default=default, path=path)
_msg = f"""{CHECK_MARK} Successfully added label [bold]"{label}"[/bold] to data-transport registry"""
else:
_msg = f"""{TIMES_MARK} Registry is not initialized, please initialize the registry (check help)"""
except Exception as e:
_msg = f"""{TIMES_MARK} {e}"""
print (_msg)
pass
@app_x.command(name='add')
def register_plugs (
alias:Annotated[str,typer.Argument(help="unique function name within a file")],
path:Annotated[str,typer.Argument(help="path of the python file, that contains functions")],
folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder"),
):
"""
This function will register a file and the functions within we are interested in using
"""
if ',' in alias :
alias = [_name.strip() for _name in alias.split(',') if _name.strip() != '' ]
else:
alias = [alias.strip()]
_pregistry = pix.Registry(folder=folder,plugin_folder='plugins/code')
_log = _pregistry.set(path,alias)
# transport.registry.plugins.init()
# _log = transport.registry.plugins.add(alias,path)
_mark = TIMES_MARK if not _log else CHECK_MARK
_msg = f"""Could NOT add the [bold]{alias}[/bold]to the registry""" if not _log else f""" successfully added {alias}, {_log} functions registered"""
print (f"""{_mark} {_msg}""")
@app_x.command(name="list")
def registry_list (folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport configuration folder")):
"""
This function will list all the plugins (python functions/files) that are registered and can be reused
"""
_pregistry = pix.Registry(folder=folder)
_df = _pregistry.stats()
if _df.empty :
print (f"{TIMES_MARK} registry at {folder} is not ready")
else:
print (_df)
@app_x.command ("has")
def registry_has (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry file")) :
_pregistry = pix.Registry(folder=folder)
if _pregistry.has(alias) :
_msg = f"{CHECK_MARK} {alias} was [bold] found [/bold] in registry "
else:
_msg = f"{TIMES_MARK} {alias} was [bold] NOT found [/bold] in registry "
print (_msg)
@app_x.command(name="test")
def registry_test (alias:Annotated[str,typer.Argument(help="alias of a function function@file or file.function")],
folder:str=typer.Option(default=REGISTRY_PATH,help="path of the data-transport registry folder")) :
_pregistry = pix.Registry(folder=folder)
"""
This function allows to test syntax for a plugin i.e in terms of alias@function
"""
# _item = transport.registry.plugins.has(key=key)
_pointer = _pregistry.get(alias) if _pregistry.has(alias) else None
if _pointer:
print (f"""{CHECK_MARK} successfully loaded [bold] {alias}[/bold] found in {folder}""")
else:
print (f"{TIMES_MARK} unable to load {alias}. Make sure it is registered")
app.add_typer(app_e,name='etl',help="This function will run etl or generate a template etl configuration file")
app.add_typer(app_r,name='registry',help='This function allows labeling database access information')
app.add_typer(app_i,name="info",help="This function will print either license or supported database technologies")
app.add_typer(app_x, name="plugins",help="This function enables add/list/test of plugins in the registry")
@app.command()
def usage():
print (__doc__)
if __name__ == '__main__' :
app()
# #
# # Load information from the file ...
# if 'help' in SYS_ARGS :
# print (__doc__)
# else:
# try:
# _info = json.loads(open(SYS_ARGS['config']).read())
# if 'index' in SYS_ARGS :
# _index = int(SYS_ARGS['index'])
# _info = [_item for _item in _info if _info.index(_item) == _index]
# pass
# elif 'id' in SYS_ARGS :
# _info = [_item for _item in _info if 'id' in _item and _item['id'] == SYS_ARGS['id']]
# procs = 1 if 'procs' not in SYS_ARGS else int(SYS_ARGS['procs'])
# jobs = transport.factory.instance(provider='etl',info=_info,procs=procs)
# print ([len(jobs),' Jobs are running'])
# N = len(jobs)
# while jobs :
# x = len(jobs)
# jobs = [_job for _job in jobs if _job.is_alive()]
# if x != len(jobs) :
# print ([len(jobs),'... jobs still running'])
# time.sleep(1)
# print ([N,' Finished running'])
# except Exception as e:
# print (e)

@ -1,9 +1,8 @@
__app_name__ = 'data-transport'
__author__ = 'The Phi Technology'
__version__= '2.2.22'
__email__ = "info@the-phi.com"
__edition__= 'community'
__license__=f"""
__version__= '2.0.2'
__license__="""
Copyright 2010 - 2024, Steve L. Nyemba
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the Software), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
@ -12,12 +11,4 @@ The above copyright notice and this permission notice shall be included in all c
THE SOFTWARE IS PROVIDED AS IS, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
__whatsnew__=f"""version {__version__},
1. Added support for read/write logs as well as plugins (when applied)
2. Bug fix with duckdb (adding readonly) for readers because there are issues with threads & processes
3. support for streaming data, important to use this with large volumes of data
"""

@ -15,21 +15,21 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"100%|██████████| 1/1 [00:00<00:00, 10106.76it/s]\n"
"100%|██████████| 1/1 [00:00<00:00, 5440.08it/s]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"['data transport version ', '2.0.4']\n"
"['data transport version ', '2.0.0']\n"
]
}
],
@ -45,7 +45,7 @@
"PRIVATE_KEY = os.environ['BQ_KEY'] #-- location of the service key\n",
"DATASET = 'demo'\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"bqw = transport.get.writer(provider=providers.BIGQUERY,dataset=DATASET,table='friends',private_key=PRIVATE_KEY)\n",
"bqw = transport.factory.instance(provider=providers.BIGQUERY,dataset=DATASET,table='friends',context='write',private_key=PRIVATE_KEY)\n",
"bqw.write(_data,if_exists='replace') #-- default is append\n",
"print (['data transport version ', transport.__version__])\n"
]
@ -63,8 +63,7 @@
"\n",
"**NOTE**\n",
"\n",
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
"It is possible to use **transport.factory.instance** or **transport.instance** they are the same. It allows the maintainers to know that we used a factory design pattern."
]
},
{
@ -94,7 +93,7 @@
"from transport import providers\n",
"import os\n",
"PRIVATE_KEY=os.environ['BQ_KEY']\n",
"pgr = transport.get.reader(provider=providers.BIGQUERY,dataset='demo',table='friends',private_key=PRIVATE_KEY)\n",
"pgr = transport.instance(provider=providers.BIGQUERY,dataset='demo',table='friends',private_key=PRIVATE_KEY)\n",
"_df = pgr.read()\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from demo.friends'\n",
"_sdf = pgr.read(sql=_query)\n",
@ -107,13 +106,35 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"The cell bellow show the content of an auth_file, in this case if the dataset/table in question is not to be shared then you can use auth_file with information associated with the parameters.\n",
"\n",
"**NOTE**:\n",
"\n",
"1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"The auth_file is intended to be **JSON** formatted"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'dataset': 'demo', 'table': 'friends'}"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"\n",
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
"{\n",
" \n",
" \"dataset\":\"demo\",\"table\":\"friends\"\n",
"}"
]
},
{

@ -1,188 +0,0 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Extract Transform Load (ETL) from Code\n",
"\n",
"The example below reads data from an http source (github) and will copy the data to a csv file and to a database. This example illustrates the one-to-many ETL features.\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>location_id</th>\n",
" <th>address_1</th>\n",
" <th>address_2</th>\n",
" <th>city</th>\n",
" <th>state_province</th>\n",
" <th>postal_code</th>\n",
" <th>country</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" <td>2600 Middlefield Road</td>\n",
" <td>NaN</td>\n",
" <td>Redwood City</td>\n",
" <td>CA</td>\n",
" <td>94063</td>\n",
" <td>US</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2</td>\n",
" <td>2</td>\n",
" <td>24 Second Avenue</td>\n",
" <td>NaN</td>\n",
" <td>San Mateo</td>\n",
" <td>CA</td>\n",
" <td>94401</td>\n",
" <td>US</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>3</td>\n",
" <td>3</td>\n",
" <td>24 Second Avenue</td>\n",
" <td>NaN</td>\n",
" <td>San Mateo</td>\n",
" <td>CA</td>\n",
" <td>94403</td>\n",
" <td>US</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>4</td>\n",
" <td>4</td>\n",
" <td>24 Second Avenue</td>\n",
" <td>NaN</td>\n",
" <td>San Mateo</td>\n",
" <td>CA</td>\n",
" <td>94401</td>\n",
" <td>US</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>5</td>\n",
" <td>5</td>\n",
" <td>24 Second Avenue</td>\n",
" <td>NaN</td>\n",
" <td>San Mateo</td>\n",
" <td>CA</td>\n",
" <td>94401</td>\n",
" <td>US</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id location_id address_1 address_2 city \\\n",
"0 1 1 2600 Middlefield Road NaN Redwood City \n",
"1 2 2 24 Second Avenue NaN San Mateo \n",
"2 3 3 24 Second Avenue NaN San Mateo \n",
"3 4 4 24 Second Avenue NaN San Mateo \n",
"4 5 5 24 Second Avenue NaN San Mateo \n",
"\n",
" state_province postal_code country \n",
"0 CA 94063 US \n",
"1 CA 94401 US \n",
"2 CA 94403 US \n",
"3 CA 94401 US \n",
"4 CA 94401 US "
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"#\n",
"# Writing to Google Bigquery database\n",
"#\n",
"import transport\n",
"from transport import providers\n",
"import pandas as pd\n",
"import os\n",
"\n",
"#\n",
"#\n",
"source = {\"provider\": \"http\", \"url\": \"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv\"}\n",
"target = [{\"provider\": \"files\", \"path\": \"addresses.csv\", \"delimiter\": \",\"}, {\"provider\": \"sqlite\", \"database\": \"sample.db3\", \"table\": \"addresses\"}]\n",
"\n",
"_handler = transport.get.etl (source=source,target=target)\n",
"_data = _handler.read() #-- all etl begins with data being read\n",
"_data.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Extract Transform Load (ETL) from CLI\n",
"\n",
"The documentation for this is available at https://healthcareio.the-phi.com/data-transport \"Docs\" -> \"Terminal CLI\"\n",
"\n",
"The entire process is documented including how to generate an ETL configuration file."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -1,138 +0,0 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Writing to Apache Iceberg\n",
"\n",
"1. Insure you have a Google Bigquery service account key on disk\n",
"2. The service key location is set as an environment variable **BQ_KEY**\n",
"3. The dataset will be automatically created within the project associated with the service key\n",
"\n",
"The cell below creates a dataframe that will be stored within Google Bigquery"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['data transport version ', '2.4.0']\n"
]
}
],
"source": [
"#\n",
"# Writing to Google Bigquery database\n",
"#\n",
"import transport\n",
"from transport import providers\n",
"import pandas as pd\n",
"import os\n",
"\n",
"PRIVATE_KEY = os.environ['BQ_KEY'] #-- location of the service key\n",
"DATASET = 'demo'\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"# bqw = transport.get.writer(provider=providers.ICEBERG,catalog='mz',database='edw.mz',table='friends')\n",
"bqw = transport.get.writer(provider=providers.ICEBERG,table='edw.mz.friends')\n",
"bqw.write(_data,if_exists='replace') #-- default is append\n",
"print (['data transport version ', transport.__version__])\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Reading from Google Bigquery\n",
"\n",
"The cell below reads the data that has been written by the cell above and computes the average age within a Google Bigquery (simple query). \n",
"\n",
"- Basic read of the designated table (friends) created above\n",
"- Execute an aggregate SQL against the table\n",
"\n",
"**NOTE**\n",
"\n",
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" name age\n",
"0 James Bond 55\n",
"1 Steve Rogers 150\n",
"2 Steve Nyemba 44\n",
"--------- STATISTICS ------------\n"
]
}
],
"source": [
"\n",
"import transport\n",
"from transport import providers\n",
"import os\n",
"PRIVATE_KEY=os.environ['BQ_KEY']\n",
"pgr = transport.get.reader(provider=providers.ICEBERG,database='edw.mz')\n",
"_df = pgr.read(table='friends')\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
"_sdf = pgr.read(sql=_query)\n",
"print (_df)\n",
"print ('--------- STATISTICS ------------')\n",
"# print (_sdf)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"\n",
"1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"\n",
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -11,14 +11,14 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2.0.4\n"
"2.0.0\n"
]
}
],
@ -30,7 +30,7 @@
"from transport import providers\n",
"import pandas as pd\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"mgw = transport.get.writer(provider=providers.MONGODB,db='demo',collection='friends')\n",
"mgw = transport.factory.instance(provider=providers.MONGODB,db='demo',collection='friends',context='write')\n",
"mgw.write(_data)\n",
"print (transport.__version__)"
]
@ -48,13 +48,12 @@
"\n",
"**NOTE**\n",
"\n",
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
"It is possible to use **transport.factory.instance** or **transport.instance** they are the same. It allows the maintainers to know that we used a factory design pattern."
]
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 4,
"metadata": {},
"outputs": [
{
@ -74,7 +73,7 @@
"\n",
"import transport\n",
"from transport import providers\n",
"mgr = transport.get.reader(provider=providers.MONGODB,db='foo',collection='friends')\n",
"mgr = transport.instance(provider=providers.MONGODB,db='foo',collection='friends')\n",
"_df = mgr.read()\n",
"PIPELINE = [{\"$group\":{\"_id\":0,\"_counts\":{\"$sum\":1}, \"_mean\":{\"$avg\":\"$age\"}}}]\n",
"_sdf = mgr.read(aggregate='friends',pipeline=PIPELINE)\n",
@ -87,13 +86,41 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"The cell bellow show the content of an auth_file, in this case if the dataset/table in question is not to be shared then you can use auth_file with information associated with the parameters.\n",
"\n",
"1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"**NOTE**:\n",
"\n",
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
"The auth_file is intended to be **JSON** formatted"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'host': 'klingon.io',\n",
" 'port': 27017,\n",
" 'username': 'me',\n",
" 'password': 'foobar',\n",
" 'db': 'foo',\n",
" 'collection': 'friends',\n",
" 'authSource': '<authdb>',\n",
" 'mechamism': '<SCRAM-SHA-256|MONGODB-CR|SCRAM-SHA-1>'}"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"{\n",
" \"host\":\"klingon.io\",\"port\":27017,\"username\":\"me\",\"password\":\"foobar\",\"db\":\"foo\",\"collection\":\"friends\",\n",
" \"authSource\":\"<authdb>\",\"mechamism\":\"<SCRAM-SHA-256|MONGODB-CR|SCRAM-SHA-1>\"\n",
"}"
]
},
{

@ -17,9 +17,17 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 1,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['data transport version ', '2.0.0']\n"
]
}
],
"source": [
"#\n",
"# Writing to Google Bigquery database\n",
@ -33,7 +41,7 @@
"MSSQL_AUTH_FILE= os.sep.join([AUTH_FOLDER,'mssql.json'])\n",
"\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"msw = transport.get.writer(provider=providers.MSSQL,table='friends',auth_file=MSSQL_AUTH_FILE)\n",
"msw = transport.factory.instance(provider=providers.MSSQL,table='friends',context='write',auth_file=MSSQL_AUTH_FILE)\n",
"msw.write(_data,if_exists='replace') #-- default is append\n",
"print (['data transport version ', transport.__version__])\n"
]
@ -51,15 +59,30 @@
"\n",
"**NOTE**\n",
"\n",
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
"It is possible to use **transport.factory.instance** or **transport.instance** they are the same. It allows the maintainers to know that we used a factory design pattern."
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 5,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" name age\n",
"0 James Bond 55\n",
"1 Steve Rogers 150\n",
"2 Steve Nyemba 44\n",
"\n",
"--------- STATISTICS ------------\n",
"\n",
" _counts \n",
"0 3 83\n"
]
}
],
"source": [
"\n",
"import transport\n",
@ -68,7 +91,7 @@
"AUTH_FOLDER = os.environ['DT_AUTH_FOLDER'] #-- location of the service key\n",
"MSSQL_AUTH_FILE= os.sep.join([AUTH_FOLDER,'mssql.json'])\n",
"\n",
"msr = transport.get.reader(provider=providers.MSSQL,table='friends',auth_file=MSSQL_AUTH_FILE)\n",
"msr = transport.instance(provider=providers.MSSQL,table='friends',auth_file=MSSQL_AUTH_FILE)\n",
"_df = msr.read()\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
"_sdf = msr.read(sql=_query)\n",
@ -81,31 +104,25 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"The cell bellow show the content of an auth_file, in this case if the dataset/table in question is not to be shared then you can use auth_file with information associated with the parameters.\n",
"\n",
"1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"**NOTE**:\n",
"\n",
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
"The auth_file is intended to be **JSON** formatted"
]
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'provider': 'sqlserver',\n",
" 'dataset': 'demo',\n",
" 'table': 'friends',\n",
" 'username': '<username>',\n",
" 'password': '<password>'}"
"{'dataset': 'demo', 'table': 'friends'}"
]
},
"execution_count": 1,
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
@ -113,17 +130,10 @@
"source": [
"\n",
"{\n",
" \"provider\":\"sqlserver\",\n",
" \n",
" \"dataset\":\"demo\",\"table\":\"friends\",\"username\":\"<username>\",\"password\":\"<password>\"\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {

@ -14,14 +14,14 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2.0.4\n"
"2.0.0\n"
]
}
],
@ -33,7 +33,7 @@
"from transport import providers\n",
"import pandas as pd\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"myw = transport.get.writer(provider=providers.MYSQL,database='demo',table='friends',auth_file=\"/home/steve/auth-mysql.json\")\n",
"myw = transport.factory.instance(provider=providers.MYSQL,database='demo',table='friends',context='write',auth_file=\"/home/steve/auth-mysql.json\")\n",
"myw.write(_data,if_exists='replace') #-- default is append\n",
"print (transport.__version__)"
]
@ -51,13 +51,12 @@
"\n",
"**NOTE**\n",
"\n",
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
"It is possible to use **transport.factory.instance** or **transport.instance** they are the same. It allows the maintainers to know that we used a factory design pattern."
]
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": 9,
"metadata": {},
"outputs": [
{
@ -69,7 +68,7 @@
"1 Steve Rogers 150\n",
"2 Steve Nyemba 44\n",
"--------- STATISTICS ------------\n",
" _counts AVG(age)\n",
" _counts avg\n",
"0 3 83.0\n"
]
}
@ -78,7 +77,7 @@
"\n",
"import transport\n",
"from transport import providers\n",
"myr = transport.get.reader(provider=providers.MYSQL,database='demo',table='friends',auth_file='/home/steve/auth-mysql.json')\n",
"myr = transport.instance(provider=providers.POSTGRESQL,database='demo',table='friends',auth_file='/home/steve/auth-mysql.json')\n",
"_df = myr.read()\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
"_sdf = myr.read(sql=_query)\n",
@ -91,18 +90,16 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"The cell bellow show the content of an auth_file, in this case if the dataset/table in question is not to be shared then you can use auth_file with information associated with the parameters.\n",
"\n",
"1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"**NOTE**:\n",
"\n",
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
"The auth_file is intended to be **JSON** formatted"
]
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": 1,
"metadata": {},
"outputs": [
{
@ -112,29 +109,21 @@
" 'port': 3306,\n",
" 'username': 'me',\n",
" 'password': 'foobar',\n",
" 'provider': 'mysql',\n",
" 'database': 'demo',\n",
" 'table': 'friends'}"
]
},
"execution_count": 5,
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"{\n",
" \"host\":\"klingon.io\",\"port\":3306,\"username\":\"me\",\"password\":\"foobar\", \"provider\":\"mysql\",\n",
" \"host\":\"klingon.io\",\"port\":3306,\"username\":\"me\",\"password\":\"foobar\",\n",
" \"database\":\"demo\",\"table\":\"friends\"\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {

@ -1,149 +0,0 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Writing data-transport plugins\n",
"\n",
"The data-transport plugins are designed to automate pre/post processing i.e\n",
"\n",
" - Read -> Post processing\n",
" - Write-> Pre processing\n",
" \n",
"In this example we will assume, data and write both pre/post processing to any supported infrastructure. We will equally show how to specify the plugins within a configuration file"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"#\n",
"# Writing to Google Bigquery database\n",
"#\n",
"import transport\n",
"from transport import providers\n",
"import pandas as pd\n",
"import os\n",
"import shutil\n",
"#\n",
"#\n",
"\n",
"DATABASE = '/home/steve/tmp/demo.db3'\n",
"if os.path.exists(DATABASE) :\n",
" os.remove(DATABASE)\n",
"#\n",
"# \n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"litew = transport.get.writer(provider=providers.SQLITE,database=DATABASE)\n",
"litew.write(_data,table='friends')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Reading from SQLite\n",
"\n",
"The cell below reads the data that has been written by the cell above and computes the average age from a plugin function we will write. \n",
"\n",
"- Basic read of the designated table (friends) created above\n",
"- Read with pipeline functions defined in code\n",
"\n",
"**NOTE**\n",
"\n",
"It is possible to use **transport.factory.instance** or **transport.instance** or **transport.get.<[reader|writer]>** they are the same. It allows the maintainers to know that we used a factory design pattern."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" name age\n",
"0 James Bond 55\n",
"1 Steve Rogers 150\n",
"2 Steve Nyemba 44\n",
"\n",
"\n",
" name age autoinc\n",
"0 James Bond 5.5 0\n",
"1 Steve Rogers 15.0 1\n",
"2 Steve Nyemba 4.4 2\n"
]
}
],
"source": [
"\n",
"import transport\n",
"from transport import providers\n",
"import os\n",
"import numpy as np\n",
"def _autoincrement (_data,**kwargs) :\n",
" \"\"\"\n",
" This function will add an autoincrement field to the table\n",
" \"\"\"\n",
" _data['autoinc'] = np.arange(_data.shape[0])\n",
" \n",
" return _data\n",
"def reduce(_data,**_args) :\n",
" \"\"\"\n",
" This function will reduce the age of the data frame\n",
" \"\"\"\n",
" _data.age /= 10\n",
" return _data\n",
"reader = transport.get.reader(provider=providers.SQLITE,database=DATABASE,table='friends')\n",
"#\n",
"# basic read of the data created in the first cell\n",
"_df = reader.read()\n",
"print (_df)\n",
"print ()\n",
"print()\n",
"#\n",
"# read of the data with pipeline function provided to alter the database\n",
"print (reader.read(pipeline=[_autoincrement,reduce]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The parameters for instianciating a transport object (reader or writer) can be found at [data-transport home](https://healthcareio.the-phi.com/data-transport)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -14,14 +14,14 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2.0.4\n"
"2.0.0\n"
]
}
],
@ -33,7 +33,7 @@
"from transport import providers\n",
"import pandas as pd\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"pgw = transport.get.writer(provider=providers.POSTGRESQL,database='demo',table='friends')\n",
"pgw = transport.factory.instance(provider=providers.POSTGRESQL,database='demo',table='friends',context='write')\n",
"pgw.write(_data,if_exists='replace') #-- default is append\n",
"print (transport.__version__)"
]
@ -49,16 +49,14 @@
"- Basic read of the designated table (friends) created above\n",
"- Execute an aggregate SQL against the table\n",
"\n",
"\n",
"**NOTE**\n",
"\n",
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
"It is possible to use **transport.factory.instance** or **transport.instance** they are the same. It allows the maintainers to know that we used a factory design pattern."
]
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 6,
"metadata": {},
"outputs": [
{
@ -79,7 +77,7 @@
"\n",
"import transport\n",
"from transport import providers\n",
"pgr = transport.get.reader(provider=providers.POSTGRESQL,database='demo',table='friends')\n",
"pgr = transport.instance(provider=providers.POSTGRESQL,database='demo',table='friends')\n",
"_df = pgr.read()\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
"_sdf = pgr.read(sql=_query)\n",
@ -92,18 +90,16 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"The cell bellow show the content of an auth_file, in this case if the dataset/table in question is not to be shared then you can use auth_file with information associated with the parameters.\n",
"\n",
"1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"**NOTE**:\n",
"\n",
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
"The auth_file is intended to be **JSON** formatted"
]
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": 1,
"metadata": {},
"outputs": [
{
@ -113,19 +109,18 @@
" 'port': 5432,\n",
" 'username': 'me',\n",
" 'password': 'foobar',\n",
" 'provider': 'postgresql',\n",
" 'database': 'demo',\n",
" 'table': 'friends'}"
]
},
"execution_count": 4,
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"{\n",
" \"host\":\"klingon.io\",\"port\":5432,\"username\":\"me\",\"password\":\"foobar\", \"provider\":\"postgresql\",\n",
" \"host\":\"klingon.io\",\"port\":5432,\"username\":\"me\",\"password\":\"foobar\",\n",
" \"database\":\"demo\",\"table\":\"friends\"\n",
"}"
]

@ -1,131 +0,0 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Writing to AWS S3\n",
"\n",
"We have setup our demo environment with the label **aws** passed to reference our s3 access_key and secret_key and file (called friends.csv). In the cell below we will write the data to our aws s3 bucket named **com.phi.demo**"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2.2.1\n"
]
}
],
"source": [
"#\n",
"# Writing to mongodb database\n",
"#\n",
"import transport\n",
"from transport import providers\n",
"import pandas as pd\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"mgw = transport.get.writer(label='aws')\n",
"mgw.write(_data)\n",
"print (transport.__version__)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Reading from AWS S3\n",
"\n",
"The cell below reads the data that has been written by the cell above and computes the average age within a mongodb pipeline. The code in the background executes an aggregation using\n",
"\n",
"- Basic read of the designated file **friends.csv**\n",
"- Compute average age using standard pandas functions\n",
"\n",
"**NOTE**\n",
"\n",
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" bname age\n",
"0 James Bond 55\n",
"1 Steve Rogers 150\n",
"2 Steve Nyemba 44\n",
"--------- STATISTICS ------------\n",
"83.0\n"
]
}
],
"source": [
"\n",
"import transport\n",
"from transport import providers\n",
"import pandas as pd\n",
"\n",
"def cast(stream) :\n",
" print (stream)\n",
" return pd.DataFrame(str(stream))\n",
"mgr = transport.get.reader(label='aws')\n",
"_df = mgr.read()\n",
"print (_df)\n",
"print ('--------- STATISTICS ------------')\n",
"print (_df.age.mean())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"\n",
"1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"\n",
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

@ -18,7 +18,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"2.0.4\n"
"2.0.0\n"
]
}
],
@ -30,7 +30,7 @@
"from transport import providers\n",
"import pandas as pd\n",
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
"sqw = transport.get.writer(provider=providers.SQLITE,database='/home/steve/demo.db3',table='friends')\n",
"sqw = transport.factory.instance(provider=providers.SQLITE,database='/home/steve/demo.db3',table='friends',context='write')\n",
"sqw.write(_data,if_exists='replace') #-- default is append\n",
"print (transport.__version__)"
]
@ -46,11 +46,9 @@
"- Basic read of the designated table (friends) created above\n",
"- Execute an aggregate SQL against the table\n",
"\n",
"\n",
"**NOTE**\n",
"\n",
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
"It is possible to use **transport.factory.instance** or **transport.instance** they are the same. It allows the maintainers to know that we used a factory design pattern."
]
},
{
@ -76,10 +74,10 @@
"\n",
"import transport\n",
"from transport import providers\n",
"sqr = transport.get.reader(provider=providers.SQLITE,database='/home/steve/demo.db3',table='friends')\n",
"_df = sqr.read()\n",
"pgr = transport.instance(provider=providers.SQLITE,database='/home/steve/demo.db3',table='friends')\n",
"_df = pgr.read()\n",
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
"_sdf = sqr.read(sql=_query)\n",
"_sdf = pgr.read(sql=_query)\n",
"print (_df)\n",
"print ('--------- STATISTICS ------------')\n",
"print (_sdf)"
@ -89,13 +87,11 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"An **auth-file** is a file that contains database parameters used to access the database. \n",
"For code in shared environments, we recommend \n",
"The cell bellow show the content of an auth_file, in this case if the dataset/table in question is not to be shared then you can use auth_file with information associated with the parameters.\n",
"\n",
"1. Having the **auth-file** stored on disk \n",
"2. and the location of the file is set to an environment variable.\n",
"**NOTE**:\n",
"\n",
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
"The auth_file is intended to be **JSON** formatted. This is an overkill for SQLite ;-)"
]
},
{

@ -1,62 +0,0 @@
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "data-transport"
dynamic = ["version"]
authors = [
{name="Steve L. Nyemba" , email = "info@the-phi.com"},
]
description = ""
readme = "README.md"
license = {text = "LICENSE"}
keywords = ["mongodb","duckdb","couchdb","rabbitmq","file","read","write","s3","sqlite"]
classifiers = [
"License :: OSI Approved :: MIT License",
"Topic :: Utilities",
]
dependencies = [
"termcolor","sqlalchemy", "aiosqlite","duckdb-engine",
"typer","pandas","numpy","sqlalchemy","pyarrow",
"plugin-ix@git+https://github.com/lnyemba/plugins-ix"
]
[project.optional-dependencies]
sql = ["mysql-connector-python","psycopg2-binary","nzpy","pymssql","duckdb-engine","aiosqlite"]
nosql = ["pymongo","cloudant"]
cloud = ["pandas-gbq","google-cloud-bigquery","google-cloud-bigquery-storage", "databricks-sqlalchemy","pyncclient","boto3","boto","botocore"]
warehouse = ["pydrill","pyspark","sqlalchemy_drill"]
rabbitmq = ["pika"]
sqlite = ["aiosqlite"]
aws3 = ["boto3","boto","botocore"]
nextcloud = ["pyncclient"]
mongodb = ["pymongo"]
netezza = ["nzpy"]
mysql = ["mysql-connector-python"]
postgresql = ["psycopg2-binary"]
sqlserver = ["pymssql"]
http = ["flask-session"]
all = ["mysql-connector-python","psycopg2-binary","nzpy","pymssql","duckdb-engine","aiosqlite","pymongo","cloudant","pandas-gbq","google-cloud-bigquery","google-cloud-bigquery-storage", "databricks-sqlalchemy","pyncclient","boto3","boto","botocore","pydrill","pyspark","sqlalchemy_drill", "pika","aiosqlite","boto3","boto","botocore", "pyncclient"]
[project.urls]
Homepage = "https://healthcareio.the-phi.com/git/code/transport.git"
#[project.scripts]
#transport = "transport:main"
[tool.setuptools]
include-package-data = true
zip-safe = false
script-files = ["bin/transport"]
[tool.setuptools.packages.find]
include = ["info","info.*", "transport", "transport.*"]
[tool.setuptools.dynamic]
version = {attr = "info.__version__"}
#authors = {attr = "meta.__author__"}
# If you have a info.py file, you might also want to include the author dynamically:
# [tool.setuptools.dynamic]
# version = {attr = "info.__version__"}
# authors = {attr = "info.__author__"}

@ -0,0 +1,31 @@
"""
This is a build file for the
"""
from setuptools import setup, find_packages
import os
import sys
# from version import __version__,__author__
from info import __version__, __author__
# __author__ = 'The Phi Technology'
# __version__= '1.8.0'
def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = {
"name":"data-transport",
"version":__version__,
"author":__author__,"author_email":"info@the-phi.com",
"license":"MIT",
# "packages":["transport","info","transport/sql"]},
"packages": find_packages(include=['info','transport', 'transport.*'])}
args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite']
args["install_requires"] = ['pyncclient','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql']
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
args['scripts'] = ['bin/transport']
# if sys.version_info[0] == 2 :
# args['use_2to3'] = True
# args['use_2to3_exclude_fixers']=['lib2to3.fixes.fix_import']
setup(**args)

@ -18,74 +18,40 @@ Source Code is available under MIT License:
"""
import numpy as np
from transport import sql, nosql, cloud, other, warehouse
from transport import sql, nosql, cloud, other
import pandas as pd
import json
import os
from info import __version__,__author__,__email__,__license__,__app_name__,__whatsnew__,__edition__
from transport.iowrapper import IWriter, IReader, IETL
from info import __version__,__author__
from transport.iowrapper import IWriter, IReader
from transport.plugins import PluginLoader
from transport import providers
import copy
from transport import registry
from transport.plugins import Plugin
PROVIDERS = {}
PROVIDERS = {}
def init():
global PROVIDERS
for _module in [cloud,sql,nosql,other,warehouse] :
for _module in [cloud,sql,nosql,other] :
for _provider_name in dir(_module) :
if _provider_name.startswith('__') or _provider_name == 'common':
if _provider_name.startswith('__') :
continue
PROVIDERS[_provider_name] = {'module':getattr(_module,_provider_name),'type':_module.__name__}
#
# loading the registry
if not registry.isloaded() :
registry.load()
# def _getauthfile (path) :
# f = open(path)
# _object = json.loads(f.read())
# f.close()
# return _object
def instance (**_args):
"""
This function returns an object of to read or write from a supported database provider/vendor
@provider provider
@context read/write (default is read)
@auth_file: Optional if the database information provided is in a file. Useful for not sharing passwords
kwargs These are arguments that are provider/vendor specific
type:
read: true|false (default true)
auth_file
"""
global PROVIDERS
if 'auth_file' in _args:
if os.path.exists(_args['auth_file']) :
#
# @TODO: add encryption module and decryption to enable this to be secure
#
f = open(_args['auth_file'])
#_args = dict (_args,** json.loads(f.read()) )
#
# we overrite file parameters with arguments passed
_args = dict (json.loads(f.read()),**_args )
_args = dict (_args,** json.loads(f.read()) )
f.close()
else:
filename = _args['auth_file']
raise Exception(f" {filename} was not found or is invalid")
if 'provider' not in _args and 'auth_file' not in _args :
if not registry.isloaded () :
if ('path' in _args and registry.exists(_args['path'] )) or registry.exists():
registry.load() if 'path' not in _args else registry.load(_args['path'])
_info = {}
if 'label' in _args and registry.isloaded():
_info = registry.get(_args['label'])
else:
_info = registry.get()
if _info :
_args = dict(_info,**_args) #-- we can override the registry parameters with our own arguments
if 'provider' in _args and _args['provider'] in PROVIDERS :
if _args['provider'] in PROVIDERS :
_info = PROVIDERS[_args['provider']]
_module = _info['module']
if 'context' in _args :
@ -96,105 +62,22 @@ def instance (**_args):
_agent = _pointer (**_args)
#
loader = None
if 'plugins' in _args :
_params = _args['plugins']
#
# @TODO:
# define a logger object here that will used by the wrapper
# this would allow us to know what the data-transport is doing and where/how it fails
#
# if 'plugins' in _args :
# _params = _args['plugins']
if 'path' in _params and 'names' in _params :
loader = PluginLoader(**_params)
elif type(_params) == list:
loader = PluginLoader()
for _delegate in _params :
loader.set(_delegate)
# if 'path' in _params and 'names' in _params :
# loader = PluginLoader(**_params)
# elif type(_params) == list:
# loader = PluginLoader()
# for _delegate in _params :
# loader.set(_delegate)
_plugins = None if 'plugins' not in _args else _args['plugins']
return IReader(_agent,loader) if _context == 'read' else IWriter(_agent,loader)
# if registry.has('logger') :
# _kwa = registry.get('logger')
# _lmodule = getPROVIDERS[_kwa['provider']]
if ( ('label' in _args and _args['label'] != 'logger') and registry.has('logger')):
#
# We did not request label called logger, so we are setting up a logger if it is specified in the registry
#
_kwargs = registry.get('logger')
_kwargs['context'] = 'write'
_kwargs['table'] =_module.__name__.split('.')[-1]+'_logs'
# _logger = instance(**_kwargs)
_module = PROVIDERS[_kwargs['provider']]['module']
_logger = getattr(_module,'Writer')
_logger = _logger(**_kwargs)
else:
_logger = None
_kwargs = {'agent':_agent,'plugins':_plugins,'logger':_logger}
if 'args' in _args :
_kwargs['args'] = _args['args']
# _datatransport = IReader(_agent,_plugins,_logger) if _context == 'read' else IWriter(_agent,_plugins,_logger)
_datatransport = IReader(**_kwargs) if _context == 'read' else IWriter(**_kwargs)
return _datatransport
else:
#
# We can handle the case for an ETL object
#
raise Exception ("Missing or Unknown provider")
pass
class get :
"""
This class is just a wrapper to make the interface (API) more conversational and easy to understand
"""
@staticmethod
def reader (**_args):
if not _args or ('provider' not in _args and 'label' not in _args):
_args['label'] = 'default'
_args['context'] = 'read'
# return instance(**_args)
# _args['logger'] = instance(**{'label':'logger','context':'write','table':'logs'})
_handler = instance(**_args)
# _handler.setLogger(get.logger())
return _handler
@staticmethod
def writer(**_args):
"""
This function is a wrapper that will return a writer to a database. It disambiguates the interface
"""
if not _args or ('provider' not in _args and 'label' not in _args):
_args['label'] = 'default'
_args['context'] = 'write'
# _args['logger'] = instance(**{'label':'logger','context':'write','table':'logs'})
_handler = instance(**_args)
#
# Implementing logging with the 'eat-your-own-dog-food' approach
# Using dependency injection to set the logger (problem with imports)
#
# _handler.setLogger(get.logger())
return _handler
@staticmethod
def logger ():
if registry.has('logger') :
_args = registry.get('logger')
_args['context'] = 'write'
return instance(**_args)
return None
@staticmethod
def etl (**_args):
if 'source' in _args and 'target' in _args :
return IETL(**_args)
else:
raise Exception ("Malformed input found, object must have both 'source' and 'target' attributes")
def supported ():
_info = {}
for _provider in PROVIDERS :

@ -3,13 +3,10 @@ Data Transport - 1.0
Steve L. Nyemba, The Phi Technology LLC
This file is a wrapper around s3 bucket provided by AWS for reading and writing content
TODO:
- Address limitations that will properly read csv if it is stored with content type text/csv
"""
from datetime import datetime
import boto3
# from boto.s3.connection import S3Connection, OrdinaryCallingFormat
import boto
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
import numpy as np
import botocore
from smart_open import smart_open
@ -17,7 +14,6 @@ import sys
import json
from io import StringIO
import pandas as pd
import json
class s3 :
@ -33,37 +29,46 @@ class s3 :
@param filter filename or filtering elements
"""
try:
self._client = boto3.client('s3',aws_access_key_id=args['access_key'],aws_secret_access_key=args['secret_key'],region_name=args['region'])
self._bucket_name = args['bucket']
self._file_name = args['file']
self._region = args['region']
self.s3 = S3Connection(args['access_key'],args['secret_key'],calling_format=OrdinaryCallingFormat())
self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None
# self.path = args['path']
self.filter = args['filter'] if 'filter' in args else None
self.filename = args['file'] if 'file' in args else None
self.bucket_name = args['bucket'] if 'bucket' in args else None
except Exception as e :
self.s3 = None
self.bucket = None
print (e)
pass
def has(self,**_args):
_found = None
try:
if 'file' in _args and 'bucket' in _args:
_found = self.meta(**_args)
elif 'bucket' in _args and not 'file' in _args:
_found = self._client.list_objects(Bucket=_args['bucket'])
elif 'file' in _args and not 'bucket' in _args :
_found = self.meta(bucket=self._bucket_name,file = _args['file'])
except Exception as e:
_found = None
pass
return type(_found) == dict
def meta(self,**args):
"""
This function will return information either about the file in a given bucket
:name name of the bucket
"""
_bucket = self._bucket_name if 'bucket' not in args else args['bucket']
_file = self._file_name if 'file' not in args else args['file']
_data = self._client.get_object(Bucket=_bucket,Key=_file)
return _data['ResponseMetadata']
def close(self):
self._client.close()
info = self.list(**args)
[item.open() for item in info]
return [{"name":item.name,"size":item.size} for item in info]
def list(self,**args):
"""
This function will list the content of a bucket, the bucket must be provided by the name
:name name of the bucket
"""
return list(self.s3.get_bucket(args['name']).list())
def buckets(self):
#
# This function will return all buckets, not sure why but it should be used cautiously
# based on why the s3 infrastructure is used
#
return [item.name for item in self.s3.get_all_buckets()]
# def buckets(self):
pass
# """
# This function is a wrapper around the bucket list of buckets for s3
# """
# return self.s3.get_all_buckets()
class Reader(s3) :
"""
@ -72,66 +77,51 @@ class Reader(s3) :
- stream content if file is Not None
@TODO: support read from all buckets, think about it
"""
def __init__(self,**_args) :
super().__init__(**_args)
def _stream(self,**_args):
"""
At this point we should stream a file from a given bucket
"""
_object = self._client.get_object(Bucket=_args['bucket'],Key=_args['file'])
_stream = None
def __init__(self,**args) :
s3.__init__(self,**args)
def files(self):
r = []
try:
_stream = _object['Body'].read()
return [item.name for item in self.bucket if item.size > 0]
except Exception as e:
pass
if not _stream :
return None
if _object['ContentType'] in ['text/csv'] :
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")))
return r
def stream(self,limit=-1):
"""
At this point we should stream a file from a given bucket
"""
key = self.bucket.get_key(self.filename.strip())
if key is None :
yield None
else:
return _stream
count = 0
with smart_open(key) as remote_file:
for line in remote_file:
if count == limit and limit > 0 :
break
yield line
count += 1
def read(self,**args) :
_name = self._file_name if 'file' not in args else args['file']
_bucket = args['bucket'] if 'bucket' in args else self._bucket_name
return self._stream(bucket=_bucket,file=_name)
if self.filename is None :
#
# returning the list of files because no one file was specified.
return self.files()
else:
limit = args['size'] if 'size' in args else -1
return self.stream(limit)
class Writer(s3) :
"""
def __init__(self,**args) :
s3.__init__(self,**args)
def mkdir(self,name):
"""
def __init__(self,**_args) :
super().__init__(**_args)
#
#
if not self.has(bucket=self._bucket_name) :
self.make_bucket(self._bucket_name)
def make_bucket(self,bucket_name):
"""
This function will create a folder in a bucket,It is best that the bucket is organized as a namespace
This function will create a folder in a bucket
:name name of the folder
"""
self._client.create_bucket(Bucket=bucket_name,CreateBucketConfiguration={'LocationConstraint': self._region})
def write(self,_data,**_args):
"""
This function will write the data to the s3 bucket, files can be either csv, or json formatted files
"""
content = 'text/plain'
if type(_data) == pd.DataFrame :
_stream = _data.to_csv(index=False)
content = 'text/csv'
elif type(_data) == dict :
_stream = json.dumps(_data)
content = 'application/json'
else:
_stream = _data
file = StringIO(_stream)
bucket = self._bucket_name if 'bucket' not in _args else _args['bucket']
file_name = self._file_name if 'file' not in _args else _args['file']
self._client.put_object(Bucket=bucket, Key = file_name, Body=_stream,ContentType=content)
self.s3.put_object(Bucket=self.bucket_name,key=(name+'/'))
def write(self,content):
file = StringIO(content.decode("utf8"))
self.s3.upload_fileobj(file,self.bucket_name,self.filename)
pass

@ -1,19 +0,0 @@
"""
This file will be intended to handle duckdb database
"""
import duckdb
from transport.common import Reader,Writer
class Duck(Reader):
def __init__(self,**_args):
super().__init__(**_args)
self._path = None if 'path' not in _args else _args['path']
self._handler = duckdb.connect() if not self._path else duckdb.connect(self._path)
class DuckReader(Duck) :
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args) :
pass

@ -39,22 +39,22 @@ import os
from multiprocessing import Process
# SYS_ARGS = {}
# if len(sys.argv) > 1:
# N = len(sys.argv)
# for i in range(1,N):
# value = None
# if sys.argv[i].startswith('--'):
# key = sys.argv[i][2:] #.replace('-','')
# SYS_ARGS[key] = 1
# if i + 1 < N:
# value = sys.argv[i + 1] = sys.argv[i+1].strip()
# if key and value and not value.startswith('--'):
# SYS_ARGS[key] = value
# i += 2
SYS_ARGS = {}
if len(sys.argv) > 1:
N = len(sys.argv)
for i in range(1,N):
value = None
if sys.argv[i].startswith('--'):
key = sys.argv[i][2:] #.replace('-','')
SYS_ARGS[key] = 1
if i + 1 < N:
value = sys.argv[i + 1] = sys.argv[i+1].strip()
if key and value and not value.startswith('--'):
SYS_ARGS[key] = value
i += 2
class Transporter(Process):
"""
The transporter (Jason Stathem) moves data from one persistant store to another
@ -74,72 +74,81 @@ class Transporter(Process):
#
# Let's insure we can support multiple targets
self._target = [self._target] if type(self._target) != list else self._target
pass
def run(self):
_reader = transport.get.etl(source=self._source,target=self._target)
pass
def read(self,**_args):
"""
This function
"""
_reader = transport.factory.instance(**self._source)
#
# If arguments are provided then a query is to be executed (not just a table dump)
if 'cmd' in self._source or 'query' in self._source :
_query = self._source['cmd'] if 'cmd' in self._source else self._source['query']
return _reader.read(**_query)
else:
return _reader.read()
# return _reader.read() if 'query' not in self._source else _reader.read(**self._source['query'])
def _delegate_write(self,_data,**_args):
"""
This function will write a data-frame to a designated data-store, The function is built around a delegation design pattern
:data data-frame or object to be written
"""
if _data.shape[0] > 0 :
for _target in self._target :
if 'write' not in _target :
_target['context'] = 'write'
# _target['lock'] = True
else:
# _target['write']['lock'] = True
pass
_writer = transport.factory.instance(**_target)
_writer.write(_data,**_args)
if hasattr(_writer,'close') :
_writer.close()
def write(self,_df,**_args):
"""
"""
SEGMENT_COUNT = 6
MAX_ROWS = 1000000
# _df = self.read()
_segments = np.array_split(np.arange(_df.shape[0]),SEGMENT_COUNT) if _df.shape[0] > MAX_ROWS else np.array( [np.arange(_df.shape[0])])
# _index = 0
# def _read(self,**_args):
# """
# This function
# """
# _reader = transport.factory.instance(**self._source)
# #
# # If arguments are provided then a query is to be executed (not just a table dump)
# if 'cmd' in self._source or 'query' in self._source :
# _query = self._source['cmd'] if 'cmd' in self._source else self._source['query']
# return _reader.read(**_query)
# else:
# return _reader.read()
# # return _reader.read() if 'query' not in self._source else _reader.read(**self._source['query'])
# def _delegate_write(self,_data,**_args):
# """
# This function will write a data-frame to a designated data-store, The function is built around a delegation design pattern
# :data data-frame or object to be written
# """
# if _data.shape[0] > 0 :
# for _target in self._target :
# if 'write' not in _target :
# _target['context'] = 'write'
# # _target['lock'] = True
# else:
# # _target['write']['lock'] = True
# pass
# _writer = transport.factory.instance(**_target)
# _writer.write(_data,**_args)
# if hasattr(_writer,'close') :
# _writer.close()
# def write(self,_df,**_args):
# """
# """
# SEGMENT_COUNT = 6
# MAX_ROWS = 1000000
# # _df = self.read()
# _segments = np.array_split(np.arange(_df.shape[0]),SEGMENT_COUNT) if _df.shape[0] > MAX_ROWS else np.array( [np.arange(_df.shape[0])])
# # _index = 0
# for _indexes in _segments :
# _fwd_args = {} if not _args else _args
# self._delegate_write(_df.iloc[_indexes],**_fwd_args)
# time.sleep(1)
# #
# # @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?)
# pass
for _indexes in _segments :
_fwd_args = {} if not _args else _args
self._delegate_write(_df.iloc[_indexes],**_fwd_args)
time.sleep(1)
#
# @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?)
pass
def instance(**_args):
pthread = Transporter (**_args)
pthread.start()
return pthread
_proxy = lambda _agent: _agent.write(_agent.read())
if 'source' in _args and 'target' in _args :
_agent = Transporter(**_args)
_proxy(_agent)
else:
_config = _args['config']
_items = [Transporter(**_item) for _item in _config ]
_MAX_JOBS = 5
_items = np.array_split(_items,_MAX_JOBS)
for _batch in _items :
jobs = []
for _item in _batch :
thread = Process(target=_proxy,args = (_item,))
thread.start()
jobs.append(thread)
while jobs :
jobs = [thread for thread in jobs if thread.is_alive()]
time.sleep(1)
pass
# class Post(Process):
# def __init__(self,**args):

@ -1,32 +1,14 @@
"""
This class is a wrapper around read/write classes of cloud,sql,nosql,other packages
The wrapper allows for application of plugins as pre-post conditions.
NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading or writing:
- upon initialization we will load plugins
- on read/write we apply a pipeline (if passed as an argument)
The wrapper allows for application of plugins as pre-post conditions
"""
from transport.plugins import Plugin, PluginLoader
import transport
from transport import providers
from multiprocessing import Process
import time
import plugin_ix
class IO:
"""
Base wrapper class for read/write and support for logs
Base wrapper class for read/write
"""
def __init__(self,**_args):
_agent = _args['agent']
plugins = _args['plugins'] if 'plugins' in _args else None
def __init__(self,_agent,plugins):
self._agent = _agent
# self._ixloader = plugin_ix.Loader () #-- must indicate where the plugin registry file is
self._ixloader = plugin_ix.Loader (registry=plugin_ix.Registry(folder=transport.registry.REGISTRY_PATH))
if plugins :
self.init_plugins(plugins)
self._plugins = plugins
def meta (self,**_args):
if hasattr(self._agent,'meta') :
return self._agent.meta(**_args)
@ -35,97 +17,31 @@ class IO:
def close(self):
if hasattr(self._agent,'close') :
self._agent.close()
# def apply(self):
# """
# applying pre/post conditions given a pipeline expression
# """
# for _pointer in self._plugins :
# _data = _pointer(_data)
def apply(self):
"""
applying pre/post conditions given a pipeline expression
"""
for _pointer in self._plugins :
_data = _pointer(_data)
def apply(self,_query):
if hasattr(self._agent,'apply') :
return self._agent.apply(_query)
return None
def submit(self,_query):
return self.delegate('submit',_query)
def delegate(self,_name,_query):
if hasattr(self._agent,_name) :
pointer = getattr(self._agent,_name)
return pointer(_query)
return None
def init_plugins(self,plugins):
for _ref in plugins :
self._ixloader.set(_ref)
class IReader(IO):
"""
This is a wrapper for read functionalities
"""
def __init__(self,**_args):
super().__init__(**_args)
def __init__(self,_agent,pipeline=None):
super().__init__(_agent,pipeline)
def read(self,**_args):
if 'plugins' in _args :
self.init_plugins(_args['plugins'])
_data = self._agent.read(**_args)
# if self._plugins and self._plugins.ratio() > 0 :
# _data = self._plugins.apply(_data)
if self._plugins and self._plugins.ratio() > 0 :
_data = self._plugins.apply(_data)
#
# output data
#
# applying the the design pattern
_data = self._ixloader.visitor(_data)
return _data
class IWriter(IO):
def __init__(self,**_args): #_agent,pipeline=None):
super().__init__(**_args) #_agent,pipeline)
def __init__(self,_agent,pipeline=None):
super().__init__(_agent,pipeline)
def write(self,_data,**_args):
# if 'plugins' in _args :
# self._init_plugins(_args['plugins'])
if 'plugins' in _args :
self.init_plugins(_args['plugins'])
if self._plugins and self._plugins.ratio() > 0 :
_data = self._plugins.apply(_data)
self._ixloader.visitor(_data)
self._agent.write(_data,**_args)
#
# The ETL object in its simplest form is an aggregation of read/write objects
# @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process
class IETL(IReader) :
"""
This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
"""
def __init__(self,**_args):
super().__init__(agent=transport.get.reader(**_args['source']),plugins=None)
if 'target' in _args:
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
else:
self._targets = []
self.jobs = []
#
# If the parent is already multiprocessing
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
def read(self,**_args):
_data = super().read(**_args)
_schema = super().meta()
for _kwargs in self._targets :
if _schema :
_kwargs['schema'] = _schema
self.post(_data,**_kwargs)
return _data
def run(self) :
return self.read()
def post (self,_data,**_args) :
"""
This function returns an instance of a process that will perform the write operation
:_args parameters associated with writer object
"""
writer = transport.get.writer(**_args)
if 'schema' in _args :
writer.write(_data,schema=_args['schema'])
else:
writer.write(_data)
writer.close()

@ -33,8 +33,6 @@ class Mongo :
:password password for current user
"""
self.host = 'localhost' if 'host' not in args else args['host']
if ':' not in self.host and 'port' in args :
self.host = ':'.join([self.host,str(args['port'])])
self.mechanism= 'SCRAM-SHA-256' if 'mechanism' not in args else args['mechanism']
# authSource=(args['authSource'] if 'authSource' in args else self.dbname)
self._lock = False if 'lock' not in args else args['lock']

@ -1 +1 @@
from . import files, http, rabbitmq, callback, files, console
from . import files, http, rabbitmq, callback, files

@ -1,7 +1,3 @@
"""
This module uses callback architectural style as a writer to enable user-defined code to handle the output of a reader
The intent is to allow users to have control over the output of data to handle things like logging, encryption/decryption and other
"""
import queue
from threading import Thread, Lock
# from transport.common import Reader,Writer

@ -1,6 +1,3 @@
"""
This class uses classback pattern to allow output to be printed to the console (debugging)
"""
from . import callback

@ -53,8 +53,8 @@ class Writer (File):
"""
try:
_delim = self.delimiter if 'delimiter' not in _args else _args['delimiter']
_path = self.path if 'path' not in _args else _args['path']
_delim = self._delimiter if 'delimiter' not in _args else _args['delimiter']
_path = self._path if 'path' not in _args else _args['path']
_mode = self._mode if 'mode' not in _args else _args['mode']
info.to_csv(_path,index=False,sep=_delim)
@ -62,7 +62,6 @@ class Writer (File):
except Exception as e:
#
# Not sure what should be done here ...
print (e)
pass
finally:
# DiskWriter.THREAD_LOCK.release()

@ -11,10 +11,8 @@ import importlib as IL
import importlib.util
import sys
import os
import pandas as pd
import time
class Plugin :
class plugin :
"""
Implementing function decorator for data-transport plugins (post-pre)-processing
"""
@ -24,78 +22,66 @@ class Plugin :
:mode restrict to reader/writer
:about tell what the function is about
"""
self._name = _args['name'] if 'name' in _args else None
self._version = _args['version'] if 'version' in _args else '0.1'
self._doc = _args['doc'] if 'doc' in _args else "N/A"
self._name = _args['name']
self._about = _args['about']
self._mode = _args['mode'] if 'mode' in _args else 'rw'
def __call__(self,pointer,**kwargs):
def wrapper(_args,**kwargs):
return pointer(_args,**kwargs)
def __call__(self,pointer):
def wrapper(_args):
return pointer(_args)
#
# @TODO:
# add attributes to the wrapper object
#
self._name = pointer.__name__ if not self._name else self._name
setattr(wrapper,'transport',True)
setattr(wrapper,'name',self._name)
setattr(wrapper,'version',self._version)
setattr(wrapper,'doc',self._doc)
setattr(wrapper,'mode',self._mode)
setattr(wrapper,'about',self._about)
return wrapper
class PluginLoader :
"""
This class is intended to load a plugin and make it available and assess the quality of the developed plugin
"""
def __init__(self,**_args):
"""
:path location of the plugin (should be a single file)
:_names of functions to load
"""
# _names = _args['names'] if 'names' in _args else None
# path = _args['path'] if 'path' in _args else None
# self._names = _names if type(_names) == list else [_names]
self._modules = {}
self._names = []
self._registry = _args['registry']
pass
def load (self,**_args):
"""
This function loads a plugin
"""
_names = _args['names'] if 'names' in _args else None
path = _args['path'] if 'path' in _args else None
self._names = _names if type(_names) == list else [_names]
self._modules = {}
self._names = []
path = _args ['path']
if os.path.exists(path) :
_alias = path.split(os.sep)[-1]
spec = importlib.util.spec_from_file_location(_alias, path)
if path and os.path.exists(path) and _names:
for _name in self._names :
spec = importlib.util.spec_from_file_location('private', path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) #--loads it into sys.modules
for _name in dir(module) :
if hasattr(module,_name) :
if self.isplugin(module,_name) :
self._module[_name] = getattr(module,_name)
# self._names [_name]
def format (self,**_args):
uri = _args['alias'],_args['name']
# def set(self,_pointer) :
def set(self,_key) :
self._modules[_name] = getattr(module,_name)
else:
print ([f'Found {_name}', 'not plugin'])
else:
#
# @TODO: We should log this somewhere some how
print (['skipping ',_name, hasattr(module,_name)])
pass
else:
#
# Initialization is empty
self._names = []
pass
def set(self,_pointer) :
"""
This function will set a pointer to the list of modules to be called
This should be used within the context of using the framework as a library
"""
if type(_key).__name__ == 'function':
#
# The pointer is in the code provided by the user and loaded in memory
#
_pointer = _key
_key = 'inline@'+_key.__name__
# self._names.append(_key.__name__)
else:
_pointer = self._registry.get(key=_key)
if _pointer :
self._modules[_key] = _pointer
self._names.append(_key)
_name = _pointer.__name__
self._modules[_name] = _pointer
self._names.append(_name)
def isplugin(self,module,name):
"""
This function determines if a module is a recognized plugin
@ -115,36 +101,17 @@ class PluginLoader :
return _name in self._modules
def ratio (self):
"""
This functiion determines how many modules loaded vs unloaded given the list of names
how many modules loaded vs unloaded given the list of names
"""
_n = len(self._names)
return len(set(self._modules.keys()) & set (self._names)) / _n
def apply(self,_data,_logger=[]):
_input= {}
def apply(self,_data):
for _name in self._modules :
try:
_input = {'action':'plugin','object':_name,'input':{'status':'PASS'}}
_pointer = self._modules[_name]
if type(_data) == list :
_data = pd.DataFrame(_data)
_brow,_bcol = list(_data.shape)
#
# @TODO: add exception handling
_data = _pointer(_data)
_input['input']['shape'] = {'rows-dropped':_brow - _data.shape[0]}
except Exception as e:
_input['input']['status'] = 'FAILED'
print (e)
time.sleep(1)
if _logger:
try:
_logger(**_input)
except Exception as e:
pass
return _data
# def apply(self,_data,_name):
# """

@ -10,11 +10,8 @@ HTTP='http'
BIGQUERY ='bigquery'
FILE = 'file'
ETL = 'etl'
SQLITE = 'sqlite3'
SQLITE = 'sqlite'
SQLITE3= 'sqlite3'
DUCKDB = 'duckdb'
REDSHIFT = 'redshift'
NETEZZA = 'netezza'
MYSQL = 'mysql'
@ -44,9 +41,6 @@ PGSQL = POSTGRESQL
AWS_S3 = 's3'
RABBIT = RABBITMQ
ICEBERG='iceberg'
APACHE_ICEBERG = 'iceberg'
DRILL = 'drill'
APACHE_DRILL = 'drill'
# QLISTENER = 'qlistener'

@ -1,114 +0,0 @@
import os
import json
from info import __version__
import copy
import transport
import importlib
import importlib.util
import shutil
from io import StringIO
"""
This class manages data from the registry and allows (read only)
@TODO: add property to the DATA attribute
"""
if 'HOME' in os.environ :
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
else:
REGISTRY_PATH=os.sep.join([os.environ['USERPROFILE'],'.data-transport'])
#
# This path can be overriden by an environment variable ...
#
if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH']
REGISTRY_FILE= 'transport-registry.json'
DATA = {}
def isloaded ():
return DATA not in [{},None]
def exists (path=REGISTRY_PATH,_file=REGISTRY_FILE) :
"""
This function determines if there is a registry at all
"""
p = os.path.exists(path)
q = os.path.exists( os.sep.join([path,_file]))
return p and q
def load (_path=REGISTRY_PATH,_file=REGISTRY_FILE):
global DATA
if exists(_path) :
path = os.sep.join([_path,_file])
f = open(path)
DATA = json.loads(f.read())
f.close()
def init (email,path=REGISTRY_PATH,override=False,_file=REGISTRY_FILE):
"""
Initializing the registry and will raise an exception in the advent of an issue
"""
p = '@' in email
q = False if '.' not in email else email.split('.')[-1] in ['edu','com','io','ai','org']
if p and q :
_config = {"email":email,'version':__version__}
if not os.path.exists(path):
os.makedirs(path)
filename = os.sep.join([path,_file])
if not os.path.exists(filename) or override == True :
f = open(filename,'w')
f.write( json.dumps(_config))
f.close()
# _msg = f"""{CHECK_MARK} Successfully wrote configuration to {path} from {email}"""
else:
raise Exception (f"""Unable to write configuration, Please check parameters (or help) and try again""")
else:
raise Exception (f"""Invalid Input, {email} is not well formatted, provide an email with adequate format""")
def lookup (label):
global DATA
return label in DATA
has = lookup
def get (label='default') :
global DATA
return copy.copy(DATA[label]) if label in DATA else {}
def set (label, auth_file, default=False,path=REGISTRY_PATH) :
"""
This function will add a label (auth-file data) into the registry and can set it as the default
"""
if label == 'default' :
raise Exception ("""Invalid label name provided, please change the label name and use the switch""")
reg_file = os.sep.join([path,REGISTRY_FILE])
if os.path.exists(path) and os.path.exists(reg_file):
if type(auth_file) == str and os.path.exists (auth_file) :
f = open(auth_file)
elif type(auth_file) == StringIO:
f = auth_file
_info = json.loads(f.read())
f.close()
f = open(reg_file)
_config = json.loads(f.read())
f.close()
#
# set the proposed label
_object = transport.factory.instance(**_info)
if _object :
_config[label] = _info
if default :
_config['default'] = _info
#
# now we need to write this to the location
f = open(reg_file,'w')
f.write(json.dumps(_config))
f.close()
else:
raise Exception( f"""Unable to load file locate at {path},\nLearn how to generate auth-file with wizard found at https://healthcareio.the-phi.com/data-transport""")
pass
else:
pass
pass

@ -3,7 +3,7 @@ This namespace/package wrap the sql functionalities for a certain data-stores
- netezza, postgresql, mysql and sqlite
- mariadb, redshift (also included)
"""
from . import postgresql, mysql, netezza, sqlite, sqlserver, duckdb
from . import postgresql, mysql, netezza, sqlite, sqlserver
#

@ -3,8 +3,6 @@ This file encapsulates common operations associated with SQL databases via SQLAl
"""
import sqlalchemy as sqa
from sqlalchemy import text , MetaData, inspect
import pandas as pd
class Base:
@ -13,13 +11,7 @@ class Base:
self._port = None
self._database = _args['database']
self._table = _args['table'] if 'table' in _args else None
_uri = self._get_uri(**_args)
if type(_uri) == str :
self._engine= sqa.create_engine(_uri,future=True)
else:
_uri,_kwargs = _uri
self._engine= sqa.create_engine(_uri,**_kwargs,future=True)
self._engine= sqa.create_engine(self._get_uri(**_args),future=True)
def _set_uri(self,**_args) :
"""
:provider provider
@ -40,33 +32,21 @@ class Base:
:table optional name of the table (can be fully qualified)
"""
_table = self._table if 'table' not in _args else _args['table']
_map = {'TINYINT':'INTEGER','BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
_schema = []
# if _table :
# if sqa.__version__.startswith('1.') :
# _handler = sqa.MetaData(bind=self._engine)
# _handler.reflect()
# else:
# #
# # sqlalchemy's version 2.+
# _handler = sqa.MetaData()
# _handler.reflect(bind=self._engine)
# #
# # Let us extract the schema with the native types
# _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
# _schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns]
#
try:
if _table :
_inspector = inspect(self._engine)
_columns = _inspector.get_columns(_table)
_schema = [{'name':column['name'],'type':_map.get(str(column['type']),str(column['type'])) } for column in _columns]
if sqa.__version__.startswith('1.') :
_handler = sqa.MetaData(bind=self._engine)
_handler.reflect()
else:
#
# sqlalchemy's version 2.+
_handler = sqa.MetaData()
_handler.reflect(bind=self._engine)
#
# Let us extract the schema with the native types
_map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
_schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns]
return _schema
except Exception as e:
pass
# else:
return []
def has(self,**_args):
return self.meta(**_args)
def apply(self,sql):
@ -76,20 +56,11 @@ class Base:
@TODO: Execution of stored procedures
"""
if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'):
return pd.read_sql(sql,self._engine)
else:
_handler = self._engine.connect()
_handler.execute(text(sql))
_handler.commit ()
_handler.close()
return None
return pd.read_sql(sql,self._engine) if sql.lower().startswith('select') or sql.lower().startswith('with') else None
class SQLBase(Base):
def __init__(self,**_args):
super().__init__(**_args)
self._schema = _args.get('schema',None)
def get_provider(self):
raise Exception ("Provider Needs to be set ...")
def get_default_port(self) :
@ -113,11 +84,7 @@ class SQLBase(Base):
# _uri = [_item.strip() for _item in _uri if _item.strip()]
# return '/'.join(_uri)
return f'{_provider}://{_host}/{_database}' if _account == '' else f'{_provider}://{_account}{_host}/{_database}'
def close(self,) :
try:
self._engine.dispose()
except :
pass
class BaseReader(SQLBase):
def __init__(self,**_args):
super().__init__(**_args)
@ -129,8 +96,6 @@ class BaseReader(SQLBase):
sql = _args['sql']
else:
_table = _args['table'] if 'table' in _args else self._table
if self._schema and type(self._schema) == str :
_table = f'{self._schema}.{_table}'
sql = f'SELECT * FROM {_table}'
return self.apply(sql)
@ -141,11 +106,9 @@ class BaseWriter (SQLBase):
"""
def __init__(self,**_args):
super().__init__(**_args)
def write(self,_data,**_args):
if type(_data) == dict :
_df = pd.DataFrame([_data])
_df = pd.DataFrame(_data)
elif type(_data) == list :
_df = pd.DataFrame(_data)
else:
@ -162,8 +125,5 @@ class BaseWriter (SQLBase):
# _mode['schema'] = _args['schema']
# if 'if_exists' in _args :
# _mode['if_exists'] = _args['if_exists']
if 'schema' in _args and type(_args['schema']) == str:
self._schema = _args.get('schema',None)
if self._schema :
_mode['schema'] = self._schema
_df.to_sql(_table,self._engine,**_mode)

@ -1,26 +0,0 @@
"""
This module implements the handler for duckdb (in memory or not)
"""
from transport.sql.common import Base, BaseReader, BaseWriter
class Duck :
def __init__(self,**_args):
#
# duckdb with none as database will operate as an in-memory database
#
self.database = _args['database'] if 'database' in _args else ''
def get_provider(self):
return "duckdb"
def _get_uri(self,**_args):
return f"""duckdb:///{self.database}"""
class Reader(Duck,BaseReader) :
def __init__(self,**_args):
Duck.__init__(self,**_args)
BaseReader.__init__(self,**_args)
def _get_uri(self,**_args):
return super()._get_uri(**_args),{'connect_args':{'read_only':True}}
class Writer(Duck,BaseWriter):
def __init__(self,**_args):
Duck.__init__(self,**_args)
BaseWriter.__init__(self,**_args)

@ -1,7 +0,0 @@
"""
This namespace/package is intended to handle read/writes against data warehouse solutions like :
- apache iceberg
- clickhouse (...)
"""
from . import iceberg, drill

@ -1,55 +0,0 @@
import sqlalchemy
import pandas as pd
from .. sql.common import BaseReader , BaseWriter
import sqlalchemy as sqa
class Drill :
__template = {'host':None,'port':None,'ssl':None,'table':None,'database':None}
def __init__(self,**_args):
self._host = _args['host'] if 'host' in _args else 'localhost'
self._port = _args['port'] if 'port' in _args else self.get_default_port()
self._ssl = False if 'ssl' not in _args else _args['ssl']
self._table = _args['table'] if 'table' in _args else None
if self._table and '.' in self._table :
_seg = self._table.split('.')
if len(_seg) > 2 :
self._schema,self._database = _seg[:2]
else:
self._database=_args['database']
self._schema = self._database.split('.')[0]
def _get_uri(self,**_args):
return f'drill+sadrill://{self._host}:{self._port}/{self._database}?use_ssl={self._ssl}'
def get_provider(self):
return "drill+sadrill"
def get_default_port(self):
return "8047"
def meta(self,**_args):
_table = _args['table'] if 'table' in _args else self._table
if '.' in _table :
_schema = _table.split('.')[:2]
_schema = '.'.join(_schema)
_table = _table.split('.')[-1]
else:
_schema = self._schema
# _sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( 125 )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
_sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( '||COLUMN_SIZE||' )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
try:
_df = pd.read_sql(_sql,self._engine)
return _df.to_dict(orient='records')
except Exception as e:
print (e)
pass
return []
class Reader (Drill,BaseReader) :
def __init__(self,**_args):
super().__init__(**_args)
self._chunksize = 0 if 'chunksize' not in _args else _args['chunksize']
self._engine= sqa.create_engine(self._get_uri(),future=True)
class Writer(Drill,BaseWriter):
def __init__(self,**_args):
super().__init__(self,**_args)

@ -1,151 +0,0 @@
"""
dependency:
- spark and SPARK_HOME environment variable must be set
NOTE:
When using streaming option, insure that it is inline with default (1000 rows) or increase it in spark-defaults.conf
"""
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import col, to_date, to_timestamp
import copy
class Iceberg :
def __init__(self,**_args):
"""
providing catalog meta information (you must get this from apache iceberg)
"""
#
# Turning off logging (it's annoying & un-professional)
#
# _spconf = SparkContext()
# _spconf.setLogLevel("ERROR")
#
# @TODO:
# Make arrangements for additional configuration elements
#
self._session = SparkSession.builder.appName("data-transport").getOrCreate()
self._session.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
# self._session.sparkContext.setLogLevel("ERROR")
self._catalog = self._session.catalog
self._table = _args['table'] if 'table' in _args else None
if 'catalog' in _args :
#
# Let us set the default catalog
self._catalog.setCurrentCatalog(_args['catalog'])
else:
# No current catalog has been set ...
pass
if 'database' in _args :
self._database = _args['database']
self._catalog.setCurrentDatabase(self._database)
else:
#
# Should we set the default as the first one if available ?
#
pass
self._catalogName = self._catalog.currentCatalog()
self._databaseName = self._catalog.currentDatabase()
def meta (self,**_args) :
"""
This function should return the schema of a table (only)
"""
_schema = []
try:
_table = _args['table'] if 'table' in _args else self._table
_tableName = self._getPrefix(**_args) + f".{_table}"
_tmp = self._session.table(_tableName).schema
_schema = _tmp.jsonValue()['fields']
for _item in _schema :
del _item['nullable'],_item['metadata']
except Exception as e:
pass
return _schema
def _getPrefix (self,**_args):
_catName = self._catalogName if 'catalog' not in _args else _args['catalog']
_datName = self._databaseName if 'database' not in _args else _args['database']
return '.'.join([_catName,_datName])
def apply(self,_query):
"""
sql query/command to run against apache iceberg
"""
return self._session.sql(_query).toPandas()
def has (self,**_args):
try:
_prefix = self._getPrefix(**_args)
if _prefix.endswith('.') :
return False
return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)]
except Exception as e:
print (e)
return False
def close(self):
self._session.stop()
class Reader(Iceberg) :
def __init__(self,**_args):
super().__init__(**_args)
def read(self,**_args):
_table = self._table
_prefix = self._getPrefix(**_args)
if 'table' in _args or _table:
_table = _args['table'] if 'table' in _args else _table
_table = _prefix + f'.{_table}'
return self._session.table(_table).toPandas()
else:
sql = _args['sql']
return self._session.sql(sql).toPandas()
pass
class Writer (Iceberg):
"""
Writing data to an Apache Iceberg data warehouse (using pyspark)
"""
def __init__(self,**_args):
super().__init__(**_args)
self._mode = 'append' if 'mode' not in _args else _args['mode']
self._table = None if 'table' not in _args else _args['table']
def format (self,_schema) :
_iceSchema = StructType([])
_map = {'integer':IntegerType(),'float':DoubleType(),'double':DoubleType(),'date':DateType(),
'timestamp':TimestampType(),'datetime':TimestampType(),'string':StringType(),'varchar':StringType()}
for _item in _schema :
_name = _item['name']
_type = _item['type'].lower()
if _type not in _map :
_iceType = StringType()
else:
_iceType = _map[_type]
_iceSchema.add (StructField(_name,_iceType,True))
return _iceSchema if len(_iceSchema) else []
def write(self,_data,**_args):
_prefix = self._getPrefix(**_args)
if 'table' not in _args and not self._table :
raise Exception (f"Table Name should be specified for catalog/database {_prefix}")
_schema = self.format(_args['schema']) if 'schema' in _args else []
if not _schema :
rdd = self._session.createDataFrame(_data,verifySchema=False)
else :
rdd = self._session.createDataFrame(_data,schema=_schema,verifySchema=True)
_mode = self._mode if 'mode' not in _args else _args['mode']
_table = self._table if 'table' not in _args else _args['table']
# print (_data.shape,_mode,_table)
if not self._session.catalog.tableExists(_table):
# # @TODO:
# # add partitioning information here
rdd.writeTo(_table).using('iceberg').create()
# # _mode = 'overwrite'
# # rdd.write.format('iceberg').mode(_mode).saveAsTable(_table)
else:
# rdd.writeTo(_table).append()
# # _table = f'{_prefix}.{_table}'
rdd.coalesce(10).write.format('iceberg').mode('append').save(_table)
Loading…
Cancel
Save