Compare commits
No commits in common. 'master' and 'v2.0.4' have entirely different histories.
@ -1,149 +0,0 @@
|
|||||||
{
|
|
||||||
"cells": [
|
|
||||||
{
|
|
||||||
"cell_type": "markdown",
|
|
||||||
"metadata": {},
|
|
||||||
"source": [
|
|
||||||
"#### Writing data-transport plugins\n",
|
|
||||||
"\n",
|
|
||||||
"The data-transport plugins are designed to automate pre/post processing i.e\n",
|
|
||||||
"\n",
|
|
||||||
" - Read -> Post processing\n",
|
|
||||||
" - Write-> Pre processing\n",
|
|
||||||
" \n",
|
|
||||||
"In this example we will assume, data and write both pre/post processing to any supported infrastructure. We will equally show how to specify the plugins within a configuration file"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 1,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"#\n",
|
|
||||||
"# Writing to Google Bigquery database\n",
|
|
||||||
"#\n",
|
|
||||||
"import transport\n",
|
|
||||||
"from transport import providers\n",
|
|
||||||
"import pandas as pd\n",
|
|
||||||
"import os\n",
|
|
||||||
"import shutil\n",
|
|
||||||
"#\n",
|
|
||||||
"#\n",
|
|
||||||
"\n",
|
|
||||||
"DATABASE = '/home/steve/tmp/demo.db3'\n",
|
|
||||||
"if os.path.exists(DATABASE) :\n",
|
|
||||||
" os.remove(DATABASE)\n",
|
|
||||||
"#\n",
|
|
||||||
"# \n",
|
|
||||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
|
||||||
"litew = transport.get.writer(provider=providers.SQLITE,database=DATABASE)\n",
|
|
||||||
"litew.write(_data,table='friends')"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "markdown",
|
|
||||||
"metadata": {},
|
|
||||||
"source": [
|
|
||||||
"#### Reading from SQLite\n",
|
|
||||||
"\n",
|
|
||||||
"The cell below reads the data that has been written by the cell above and computes the average age from a plugin function we will write. \n",
|
|
||||||
"\n",
|
|
||||||
"- Basic read of the designated table (friends) created above\n",
|
|
||||||
"- Read with pipeline functions defined in code\n",
|
|
||||||
"\n",
|
|
||||||
"**NOTE**\n",
|
|
||||||
"\n",
|
|
||||||
"It is possible to use **transport.factory.instance** or **transport.instance** or **transport.get.<[reader|writer]>** they are the same. It allows the maintainers to know that we used a factory design pattern."
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 4,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stdout",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
" name age\n",
|
|
||||||
"0 James Bond 55\n",
|
|
||||||
"1 Steve Rogers 150\n",
|
|
||||||
"2 Steve Nyemba 44\n",
|
|
||||||
"\n",
|
|
||||||
"\n",
|
|
||||||
" name age autoinc\n",
|
|
||||||
"0 James Bond 5.5 0\n",
|
|
||||||
"1 Steve Rogers 15.0 1\n",
|
|
||||||
"2 Steve Nyemba 4.4 2\n"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"\n",
|
|
||||||
"import transport\n",
|
|
||||||
"from transport import providers\n",
|
|
||||||
"import os\n",
|
|
||||||
"import numpy as np\n",
|
|
||||||
"def _autoincrement (_data,**kwargs) :\n",
|
|
||||||
" \"\"\"\n",
|
|
||||||
" This function will add an autoincrement field to the table\n",
|
|
||||||
" \"\"\"\n",
|
|
||||||
" _data['autoinc'] = np.arange(_data.shape[0])\n",
|
|
||||||
" \n",
|
|
||||||
" return _data\n",
|
|
||||||
"def reduce(_data,**_args) :\n",
|
|
||||||
" \"\"\"\n",
|
|
||||||
" This function will reduce the age of the data frame\n",
|
|
||||||
" \"\"\"\n",
|
|
||||||
" _data.age /= 10\n",
|
|
||||||
" return _data\n",
|
|
||||||
"reader = transport.get.reader(provider=providers.SQLITE,database=DATABASE,table='friends')\n",
|
|
||||||
"#\n",
|
|
||||||
"# basic read of the data created in the first cell\n",
|
|
||||||
"_df = reader.read()\n",
|
|
||||||
"print (_df)\n",
|
|
||||||
"print ()\n",
|
|
||||||
"print()\n",
|
|
||||||
"#\n",
|
|
||||||
"# read of the data with pipeline function provided to alter the database\n",
|
|
||||||
"print (reader.read(pipeline=[_autoincrement,reduce]))"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "markdown",
|
|
||||||
"metadata": {},
|
|
||||||
"source": [
|
|
||||||
"The parameters for instianciating a transport object (reader or writer) can be found at [data-transport home](https://healthcareio.the-phi.com/data-transport)\n"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": []
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"metadata": {
|
|
||||||
"kernelspec": {
|
|
||||||
"display_name": "Python 3",
|
|
||||||
"language": "python",
|
|
||||||
"name": "python3"
|
|
||||||
},
|
|
||||||
"language_info": {
|
|
||||||
"codemirror_mode": {
|
|
||||||
"name": "ipython",
|
|
||||||
"version": 3
|
|
||||||
},
|
|
||||||
"file_extension": ".py",
|
|
||||||
"mimetype": "text/x-python",
|
|
||||||
"name": "python",
|
|
||||||
"nbconvert_exporter": "python",
|
|
||||||
"pygments_lexer": "ipython3",
|
|
||||||
"version": "3.9.7"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"nbformat": 4,
|
|
||||||
"nbformat_minor": 2
|
|
||||||
}
|
|
@ -1,131 +0,0 @@
|
|||||||
{
|
|
||||||
"cells": [
|
|
||||||
{
|
|
||||||
"cell_type": "markdown",
|
|
||||||
"metadata": {},
|
|
||||||
"source": [
|
|
||||||
"#### Writing to AWS S3\n",
|
|
||||||
"\n",
|
|
||||||
"We have setup our demo environment with the label **aws** passed to reference our s3 access_key and secret_key and file (called friends.csv). In the cell below we will write the data to our aws s3 bucket named **com.phi.demo**"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 1,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stdout",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"2.2.1\n"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"#\n",
|
|
||||||
"# Writing to mongodb database\n",
|
|
||||||
"#\n",
|
|
||||||
"import transport\n",
|
|
||||||
"from transport import providers\n",
|
|
||||||
"import pandas as pd\n",
|
|
||||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
|
||||||
"mgw = transport.get.writer(label='aws')\n",
|
|
||||||
"mgw.write(_data)\n",
|
|
||||||
"print (transport.__version__)"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "markdown",
|
|
||||||
"metadata": {},
|
|
||||||
"source": [
|
|
||||||
"#### Reading from AWS S3\n",
|
|
||||||
"\n",
|
|
||||||
"The cell below reads the data that has been written by the cell above and computes the average age within a mongodb pipeline. The code in the background executes an aggregation using\n",
|
|
||||||
"\n",
|
|
||||||
"- Basic read of the designated file **friends.csv**\n",
|
|
||||||
"- Compute average age using standard pandas functions\n",
|
|
||||||
"\n",
|
|
||||||
"**NOTE**\n",
|
|
||||||
"\n",
|
|
||||||
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
|
|
||||||
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 2,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stdout",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
" bname age\n",
|
|
||||||
"0 James Bond 55\n",
|
|
||||||
"1 Steve Rogers 150\n",
|
|
||||||
"2 Steve Nyemba 44\n",
|
|
||||||
"--------- STATISTICS ------------\n",
|
|
||||||
"83.0\n"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"\n",
|
|
||||||
"import transport\n",
|
|
||||||
"from transport import providers\n",
|
|
||||||
"import pandas as pd\n",
|
|
||||||
"\n",
|
|
||||||
"def cast(stream) :\n",
|
|
||||||
" print (stream)\n",
|
|
||||||
" return pd.DataFrame(str(stream))\n",
|
|
||||||
"mgr = transport.get.reader(label='aws')\n",
|
|
||||||
"_df = mgr.read()\n",
|
|
||||||
"print (_df)\n",
|
|
||||||
"print ('--------- STATISTICS ------------')\n",
|
|
||||||
"print (_df.age.mean())"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "markdown",
|
|
||||||
"metadata": {},
|
|
||||||
"source": [
|
|
||||||
"An **auth-file** is a file that contains database parameters used to access the database. \n",
|
|
||||||
"For code in shared environments, we recommend \n",
|
|
||||||
"\n",
|
|
||||||
"1. Having the **auth-file** stored on disk \n",
|
|
||||||
"2. and the location of the file is set to an environment variable.\n",
|
|
||||||
"\n",
|
|
||||||
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": []
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"metadata": {
|
|
||||||
"kernelspec": {
|
|
||||||
"display_name": "Python 3",
|
|
||||||
"language": "python",
|
|
||||||
"name": "python3"
|
|
||||||
},
|
|
||||||
"language_info": {
|
|
||||||
"codemirror_mode": {
|
|
||||||
"name": "ipython",
|
|
||||||
"version": 3
|
|
||||||
},
|
|
||||||
"file_extension": ".py",
|
|
||||||
"mimetype": "text/x-python",
|
|
||||||
"name": "python",
|
|
||||||
"nbconvert_exporter": "python",
|
|
||||||
"pygments_lexer": "ipython3",
|
|
||||||
"version": "3.9.7"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"nbformat": 4,
|
|
||||||
"nbformat_minor": 2
|
|
||||||
}
|
|
@ -1,19 +0,0 @@
|
|||||||
"""
|
|
||||||
This file will be intended to handle duckdb database
|
|
||||||
"""
|
|
||||||
|
|
||||||
import duckdb
|
|
||||||
from transport.common import Reader,Writer
|
|
||||||
|
|
||||||
class Duck(Reader):
|
|
||||||
def __init__(self,**_args):
|
|
||||||
super().__init__(**_args)
|
|
||||||
self._path = None if 'path' not in _args else _args['path']
|
|
||||||
self._handler = duckdb.connect() if not self._path else duckdb.connect(self._path)
|
|
||||||
|
|
||||||
|
|
||||||
class DuckReader(Duck) :
|
|
||||||
def __init__(self,**_args):
|
|
||||||
super().__init__(**_args)
|
|
||||||
def read(self,**_args) :
|
|
||||||
pass
|
|
@ -1 +1 @@
|
|||||||
from . import files, http, rabbitmq, callback, files, console
|
from . import files, http, rabbitmq, callback, files
|
@ -1,24 +0,0 @@
|
|||||||
"""
|
|
||||||
This module implements the handler for duckdb (in memory or not)
|
|
||||||
"""
|
|
||||||
from transport.sql.common import Base, BaseReader, BaseWriter
|
|
||||||
|
|
||||||
class Duck :
|
|
||||||
def __init__(self,**_args):
|
|
||||||
#
|
|
||||||
# duckdb with none as database will operate as an in-memory database
|
|
||||||
#
|
|
||||||
self.database = _args['database'] if 'database' in _args else ''
|
|
||||||
def get_provider(self):
|
|
||||||
return "duckdb"
|
|
||||||
|
|
||||||
def _get_uri(self,**_args):
|
|
||||||
return f"""duckdb:///{self.database}"""
|
|
||||||
class Reader(Duck,BaseReader) :
|
|
||||||
def __init__(self,**_args):
|
|
||||||
Duck.__init__(self,**_args)
|
|
||||||
BaseReader.__init__(self,**_args)
|
|
||||||
class Writer(Duck,BaseWriter):
|
|
||||||
def __init__(self,**_args):
|
|
||||||
Duck.__init__(self,**_args)
|
|
||||||
BaseWriter.__init__(self,**_args)
|
|
Loading…
Reference in new issue