Compare commits
No commits in common. 'master' and 'nextcloud' have entirely different histories.
@ -1,21 +0,0 @@
|
||||
__app_name__ = 'data-transport'
|
||||
__author__ = 'The Phi Technology'
|
||||
__version__= '2.2.6'
|
||||
__email__ = "info@the-phi.com"
|
||||
__license__=f"""
|
||||
Copyright 2010 - 2024, Steve L. Nyemba
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
__whatsnew__=f"""version {__version__}, focuses on collaborative environments like jupyter-base servers (apache zeppelin; jupyter notebook, jupyterlab, jupyterhub)
|
||||
|
||||
1. simpler syntax to create readers/writers
|
||||
2. auth-file registry that can be referenced using a label
|
||||
3. duckdb support
|
||||
"""
|
@ -1,148 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Writing to Google Bigquery\n",
|
||||
"\n",
|
||||
"1. Insure you have a Google Bigquery service account key on disk\n",
|
||||
"2. The service key location is set as an environment variable **BQ_KEY**\n",
|
||||
"3. The dataset will be automatically created within the project associated with the service key\n",
|
||||
"\n",
|
||||
"The cell below creates a dataframe that will be stored within Google Bigquery"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stderr",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"100%|██████████| 1/1 [00:00<00:00, 10106.76it/s]\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"['data transport version ', '2.0.4']\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to Google Bigquery database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"import os\n",
|
||||
"\n",
|
||||
"PRIVATE_KEY = os.environ['BQ_KEY'] #-- location of the service key\n",
|
||||
"DATASET = 'demo'\n",
|
||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||
"bqw = transport.get.writer(provider=providers.BIGQUERY,dataset=DATASET,table='friends',private_key=PRIVATE_KEY)\n",
|
||||
"bqw.write(_data,if_exists='replace') #-- default is append\n",
|
||||
"print (['data transport version ', transport.__version__])\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Reading from Google Bigquery\n",
|
||||
"\n",
|
||||
"The cell below reads the data that has been written by the cell above and computes the average age within a Google Bigquery (simple query). \n",
|
||||
"\n",
|
||||
"- Basic read of the designated table (friends) created above\n",
|
||||
"- Execute an aggregate SQL against the table\n",
|
||||
"\n",
|
||||
"**NOTE**\n",
|
||||
"\n",
|
||||
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
|
||||
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"Downloading: 100%|\u001b[32m██████████\u001b[0m|\n",
|
||||
"Downloading: 100%|\u001b[32m██████████\u001b[0m|\n",
|
||||
" name age\n",
|
||||
"0 James Bond 55\n",
|
||||
"1 Steve Rogers 150\n",
|
||||
"2 Steve Nyemba 44\n",
|
||||
"--------- STATISTICS ------------\n",
|
||||
" _counts f0_\n",
|
||||
"0 3 83.0\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import os\n",
|
||||
"PRIVATE_KEY=os.environ['BQ_KEY']\n",
|
||||
"pgr = transport.get.reader(provider=providers.BIGQUERY,dataset='demo',table='friends',private_key=PRIVATE_KEY)\n",
|
||||
"_df = pgr.read()\n",
|
||||
"_query = 'SELECT COUNT(*) _counts, AVG(age) from demo.friends'\n",
|
||||
"_sdf = pgr.read(sql=_query)\n",
|
||||
"print (_df)\n",
|
||||
"print ('--------- STATISTICS ------------')\n",
|
||||
"print (_sdf)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"An **auth-file** is a file that contains database parameters used to access the database. \n",
|
||||
"For code in shared environments, we recommend \n",
|
||||
"\n",
|
||||
"1. Having the **auth-file** stored on disk \n",
|
||||
"2. and the location of the file is set to an environment variable.\n",
|
||||
"\n",
|
||||
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -1,188 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Extract Transform Load (ETL) from Code\n",
|
||||
"\n",
|
||||
"The example below reads data from an http source (github) and will copy the data to a csv file and to a database. This example illustrates the one-to-many ETL features.\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"data": {
|
||||
"text/html": [
|
||||
"<div>\n",
|
||||
"<style scoped>\n",
|
||||
" .dataframe tbody tr th:only-of-type {\n",
|
||||
" vertical-align: middle;\n",
|
||||
" }\n",
|
||||
"\n",
|
||||
" .dataframe tbody tr th {\n",
|
||||
" vertical-align: top;\n",
|
||||
" }\n",
|
||||
"\n",
|
||||
" .dataframe thead th {\n",
|
||||
" text-align: right;\n",
|
||||
" }\n",
|
||||
"</style>\n",
|
||||
"<table border=\"1\" class=\"dataframe\">\n",
|
||||
" <thead>\n",
|
||||
" <tr style=\"text-align: right;\">\n",
|
||||
" <th></th>\n",
|
||||
" <th>id</th>\n",
|
||||
" <th>location_id</th>\n",
|
||||
" <th>address_1</th>\n",
|
||||
" <th>address_2</th>\n",
|
||||
" <th>city</th>\n",
|
||||
" <th>state_province</th>\n",
|
||||
" <th>postal_code</th>\n",
|
||||
" <th>country</th>\n",
|
||||
" </tr>\n",
|
||||
" </thead>\n",
|
||||
" <tbody>\n",
|
||||
" <tr>\n",
|
||||
" <th>0</th>\n",
|
||||
" <td>1</td>\n",
|
||||
" <td>1</td>\n",
|
||||
" <td>2600 Middlefield Road</td>\n",
|
||||
" <td>NaN</td>\n",
|
||||
" <td>Redwood City</td>\n",
|
||||
" <td>CA</td>\n",
|
||||
" <td>94063</td>\n",
|
||||
" <td>US</td>\n",
|
||||
" </tr>\n",
|
||||
" <tr>\n",
|
||||
" <th>1</th>\n",
|
||||
" <td>2</td>\n",
|
||||
" <td>2</td>\n",
|
||||
" <td>24 Second Avenue</td>\n",
|
||||
" <td>NaN</td>\n",
|
||||
" <td>San Mateo</td>\n",
|
||||
" <td>CA</td>\n",
|
||||
" <td>94401</td>\n",
|
||||
" <td>US</td>\n",
|
||||
" </tr>\n",
|
||||
" <tr>\n",
|
||||
" <th>2</th>\n",
|
||||
" <td>3</td>\n",
|
||||
" <td>3</td>\n",
|
||||
" <td>24 Second Avenue</td>\n",
|
||||
" <td>NaN</td>\n",
|
||||
" <td>San Mateo</td>\n",
|
||||
" <td>CA</td>\n",
|
||||
" <td>94403</td>\n",
|
||||
" <td>US</td>\n",
|
||||
" </tr>\n",
|
||||
" <tr>\n",
|
||||
" <th>3</th>\n",
|
||||
" <td>4</td>\n",
|
||||
" <td>4</td>\n",
|
||||
" <td>24 Second Avenue</td>\n",
|
||||
" <td>NaN</td>\n",
|
||||
" <td>San Mateo</td>\n",
|
||||
" <td>CA</td>\n",
|
||||
" <td>94401</td>\n",
|
||||
" <td>US</td>\n",
|
||||
" </tr>\n",
|
||||
" <tr>\n",
|
||||
" <th>4</th>\n",
|
||||
" <td>5</td>\n",
|
||||
" <td>5</td>\n",
|
||||
" <td>24 Second Avenue</td>\n",
|
||||
" <td>NaN</td>\n",
|
||||
" <td>San Mateo</td>\n",
|
||||
" <td>CA</td>\n",
|
||||
" <td>94401</td>\n",
|
||||
" <td>US</td>\n",
|
||||
" </tr>\n",
|
||||
" </tbody>\n",
|
||||
"</table>\n",
|
||||
"</div>"
|
||||
],
|
||||
"text/plain": [
|
||||
" id location_id address_1 address_2 city \\\n",
|
||||
"0 1 1 2600 Middlefield Road NaN Redwood City \n",
|
||||
"1 2 2 24 Second Avenue NaN San Mateo \n",
|
||||
"2 3 3 24 Second Avenue NaN San Mateo \n",
|
||||
"3 4 4 24 Second Avenue NaN San Mateo \n",
|
||||
"4 5 5 24 Second Avenue NaN San Mateo \n",
|
||||
"\n",
|
||||
" state_province postal_code country \n",
|
||||
"0 CA 94063 US \n",
|
||||
"1 CA 94401 US \n",
|
||||
"2 CA 94403 US \n",
|
||||
"3 CA 94401 US \n",
|
||||
"4 CA 94401 US "
|
||||
]
|
||||
},
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to Google Bigquery database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"import os\n",
|
||||
"\n",
|
||||
"#\n",
|
||||
"#\n",
|
||||
"source = {\"provider\": \"http\", \"url\": \"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv\"}\n",
|
||||
"target = [{\"provider\": \"files\", \"path\": \"addresses.csv\", \"delimiter\": \",\"}, {\"provider\": \"sqlite\", \"database\": \"sample.db3\", \"table\": \"addresses\"}]\n",
|
||||
"\n",
|
||||
"_handler = transport.get.etl (source=source,target=target)\n",
|
||||
"_data = _handler.read() #-- all etl begins with data being read\n",
|
||||
"_data.head()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Extract Transform Load (ETL) from CLI\n",
|
||||
"\n",
|
||||
"The documentation for this is available at https://healthcareio.the-phi.com/data-transport \"Docs\" -> \"Terminal CLI\"\n",
|
||||
"\n",
|
||||
"The entire process is documented including how to generate an ETL configuration file."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -1,128 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Writing to mongodb\n",
|
||||
"\n",
|
||||
"Insure mongodb is actually installed on the system, The cell below creates a dataframe that will be stored within mongodb"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"2.0.4\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to mongodb database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||
"mgw = transport.get.writer(provider=providers.MONGODB,db='demo',collection='friends')\n",
|
||||
"mgw.write(_data)\n",
|
||||
"print (transport.__version__)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Reading from mongodb\n",
|
||||
"\n",
|
||||
"The cell below reads the data that has been written by the cell above and computes the average age within a mongodb pipeline. The code in the background executes an aggregation using **db.runCommand**\n",
|
||||
"\n",
|
||||
"- Basic read of the designated collection **find=\\<collection>**\n",
|
||||
"- Executing an aggregate pipeline against a collection **aggreate=\\<collection>**\n",
|
||||
"\n",
|
||||
"**NOTE**\n",
|
||||
"\n",
|
||||
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
|
||||
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
" name age\n",
|
||||
"0 James Bond 55\n",
|
||||
"1 Steve Rogers 150\n",
|
||||
"--------- STATISTICS ------------\n",
|
||||
" _id _counts _mean\n",
|
||||
"0 0 2 102.5\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"mgr = transport.get.reader(provider=providers.MONGODB,db='foo',collection='friends')\n",
|
||||
"_df = mgr.read()\n",
|
||||
"PIPELINE = [{\"$group\":{\"_id\":0,\"_counts\":{\"$sum\":1}, \"_mean\":{\"$avg\":\"$age\"}}}]\n",
|
||||
"_sdf = mgr.read(aggregate='friends',pipeline=PIPELINE)\n",
|
||||
"print (_df)\n",
|
||||
"print ('--------- STATISTICS ------------')\n",
|
||||
"print (_sdf)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"An **auth-file** is a file that contains database parameters used to access the database. \n",
|
||||
"For code in shared environments, we recommend \n",
|
||||
"\n",
|
||||
"1. Having the **auth-file** stored on disk \n",
|
||||
"2. and the location of the file is set to an environment variable.\n",
|
||||
"\n",
|
||||
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -1,150 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Writing to Microsoft SQLServer\n",
|
||||
"\n",
|
||||
"1. Insure the Microsoft SQL Server is installed and you have access i.e account information\n",
|
||||
"2. The target database must be created before hand.\n",
|
||||
"3. We created an authentication file that will contain user account and location of the database\n",
|
||||
"\n",
|
||||
"The cell below creates a dataframe that will be stored in a Microsoft SQL Server database.\n",
|
||||
"\n",
|
||||
"**NOTE** This was not tested with a cloud instance"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to Google Bigquery database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"import os\n",
|
||||
"\n",
|
||||
"AUTH_FOLDER = os.environ['DT_AUTH_FOLDER'] #-- location of the service key\n",
|
||||
"MSSQL_AUTH_FILE= os.sep.join([AUTH_FOLDER,'mssql.json'])\n",
|
||||
"\n",
|
||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||
"msw = transport.get.writer(provider=providers.MSSQL,table='friends',auth_file=MSSQL_AUTH_FILE)\n",
|
||||
"msw.write(_data,if_exists='replace') #-- default is append\n",
|
||||
"print (['data transport version ', transport.__version__])\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Reading from Microsoft SQL Server database\n",
|
||||
"\n",
|
||||
"The cell below reads the data that has been written by the cell above and computes the average age within an MS SQL Server (simple query). \n",
|
||||
"\n",
|
||||
"- Basic read of the designated table (friends) created above\n",
|
||||
"- Execute an aggregate SQL against the table\n",
|
||||
"\n",
|
||||
"**NOTE**\n",
|
||||
"\n",
|
||||
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
|
||||
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import os\n",
|
||||
"AUTH_FOLDER = os.environ['DT_AUTH_FOLDER'] #-- location of the service key\n",
|
||||
"MSSQL_AUTH_FILE= os.sep.join([AUTH_FOLDER,'mssql.json'])\n",
|
||||
"\n",
|
||||
"msr = transport.get.reader(provider=providers.MSSQL,table='friends',auth_file=MSSQL_AUTH_FILE)\n",
|
||||
"_df = msr.read()\n",
|
||||
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
|
||||
"_sdf = msr.read(sql=_query)\n",
|
||||
"print (_df)\n",
|
||||
"print ('\\n--------- STATISTICS ------------\\n')\n",
|
||||
"print (_sdf)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"An **auth-file** is a file that contains database parameters used to access the database. \n",
|
||||
"For code in shared environments, we recommend \n",
|
||||
"\n",
|
||||
"1. Having the **auth-file** stored on disk \n",
|
||||
"2. and the location of the file is set to an environment variable.\n",
|
||||
"\n",
|
||||
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"{'provider': 'sqlserver',\n",
|
||||
" 'dataset': 'demo',\n",
|
||||
" 'table': 'friends',\n",
|
||||
" 'username': '<username>',\n",
|
||||
" 'password': '<password>'}"
|
||||
]
|
||||
},
|
||||
"execution_count": 1,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"\n",
|
||||
"{\n",
|
||||
" \"provider\":\"sqlserver\",\n",
|
||||
" \"dataset\":\"demo\",\"table\":\"friends\",\"username\":\"<username>\",\"password\":\"<password>\"\n",
|
||||
"}"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -1,161 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Writing to MySQL\n",
|
||||
"\n",
|
||||
"1. Insure MySQL is actually installed on the system, \n",
|
||||
"2. There is a database called demo created on the said system\n",
|
||||
"\n",
|
||||
"The cell below creates a dataframe that will be stored within postgreSQL"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"2.0.4\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to PostgreSQL database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||
"myw = transport.get.writer(provider=providers.MYSQL,database='demo',table='friends',auth_file=\"/home/steve/auth-mysql.json\")\n",
|
||||
"myw.write(_data,if_exists='replace') #-- default is append\n",
|
||||
"print (transport.__version__)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Reading from MySQL\n",
|
||||
"\n",
|
||||
"The cell below reads the data that has been written by the cell above and computes the average age within a MySQL (simple query). \n",
|
||||
"\n",
|
||||
"- Basic read of the designated table (friends) created above\n",
|
||||
"- Execute an aggregate SQL against the table\n",
|
||||
"\n",
|
||||
"**NOTE**\n",
|
||||
"\n",
|
||||
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
|
||||
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
" name age\n",
|
||||
"0 James Bond 55\n",
|
||||
"1 Steve Rogers 150\n",
|
||||
"2 Steve Nyemba 44\n",
|
||||
"--------- STATISTICS ------------\n",
|
||||
" _counts AVG(age)\n",
|
||||
"0 3 83.0\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"myr = transport.get.reader(provider=providers.MYSQL,database='demo',table='friends',auth_file='/home/steve/auth-mysql.json')\n",
|
||||
"_df = myr.read()\n",
|
||||
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
|
||||
"_sdf = myr.read(sql=_query)\n",
|
||||
"print (_df)\n",
|
||||
"print ('--------- STATISTICS ------------')\n",
|
||||
"print (_sdf)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"An **auth-file** is a file that contains database parameters used to access the database. \n",
|
||||
"For code in shared environments, we recommend \n",
|
||||
"\n",
|
||||
"1. Having the **auth-file** stored on disk \n",
|
||||
"2. and the location of the file is set to an environment variable.\n",
|
||||
"\n",
|
||||
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 5,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"{'host': 'klingon.io',\n",
|
||||
" 'port': 3306,\n",
|
||||
" 'username': 'me',\n",
|
||||
" 'password': 'foobar',\n",
|
||||
" 'provider': 'mysql',\n",
|
||||
" 'database': 'demo',\n",
|
||||
" 'table': 'friends'}"
|
||||
]
|
||||
},
|
||||
"execution_count": 5,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"{\n",
|
||||
" \"host\":\"klingon.io\",\"port\":3306,\"username\":\"me\",\"password\":\"foobar\", \"provider\":\"mysql\",\n",
|
||||
" \"database\":\"demo\",\"table\":\"friends\"\n",
|
||||
"}"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -1,149 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Writing data-transport plugins\n",
|
||||
"\n",
|
||||
"The data-transport plugins are designed to automate pre/post processing i.e\n",
|
||||
"\n",
|
||||
" - Read -> Post processing\n",
|
||||
" - Write-> Pre processing\n",
|
||||
" \n",
|
||||
"In this example we will assume, data and write both pre/post processing to any supported infrastructure. We will equally show how to specify the plugins within a configuration file"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to Google Bigquery database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"import os\n",
|
||||
"import shutil\n",
|
||||
"#\n",
|
||||
"#\n",
|
||||
"\n",
|
||||
"DATABASE = '/home/steve/tmp/demo.db3'\n",
|
||||
"if os.path.exists(DATABASE) :\n",
|
||||
" os.remove(DATABASE)\n",
|
||||
"#\n",
|
||||
"# \n",
|
||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||
"litew = transport.get.writer(provider=providers.SQLITE,database=DATABASE)\n",
|
||||
"litew.write(_data,table='friends')"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Reading from SQLite\n",
|
||||
"\n",
|
||||
"The cell below reads the data that has been written by the cell above and computes the average age from a plugin function we will write. \n",
|
||||
"\n",
|
||||
"- Basic read of the designated table (friends) created above\n",
|
||||
"- Read with pipeline functions defined in code\n",
|
||||
"\n",
|
||||
"**NOTE**\n",
|
||||
"\n",
|
||||
"It is possible to use **transport.factory.instance** or **transport.instance** or **transport.get.<[reader|writer]>** they are the same. It allows the maintainers to know that we used a factory design pattern."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
" name age\n",
|
||||
"0 James Bond 55\n",
|
||||
"1 Steve Rogers 150\n",
|
||||
"2 Steve Nyemba 44\n",
|
||||
"\n",
|
||||
"\n",
|
||||
" name age autoinc\n",
|
||||
"0 James Bond 5.5 0\n",
|
||||
"1 Steve Rogers 15.0 1\n",
|
||||
"2 Steve Nyemba 4.4 2\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import os\n",
|
||||
"import numpy as np\n",
|
||||
"def _autoincrement (_data,**kwargs) :\n",
|
||||
" \"\"\"\n",
|
||||
" This function will add an autoincrement field to the table\n",
|
||||
" \"\"\"\n",
|
||||
" _data['autoinc'] = np.arange(_data.shape[0])\n",
|
||||
" \n",
|
||||
" return _data\n",
|
||||
"def reduce(_data,**_args) :\n",
|
||||
" \"\"\"\n",
|
||||
" This function will reduce the age of the data frame\n",
|
||||
" \"\"\"\n",
|
||||
" _data.age /= 10\n",
|
||||
" return _data\n",
|
||||
"reader = transport.get.reader(provider=providers.SQLITE,database=DATABASE,table='friends')\n",
|
||||
"#\n",
|
||||
"# basic read of the data created in the first cell\n",
|
||||
"_df = reader.read()\n",
|
||||
"print (_df)\n",
|
||||
"print ()\n",
|
||||
"print()\n",
|
||||
"#\n",
|
||||
"# read of the data with pipeline function provided to alter the database\n",
|
||||
"print (reader.read(pipeline=[_autoincrement,reduce]))"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"The parameters for instianciating a transport object (reader or writer) can be found at [data-transport home](https://healthcareio.the-phi.com/data-transport)\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -1,162 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Writing to PostgreSQL\n",
|
||||
"\n",
|
||||
"1. Insure PostgreSQL is actually installed on the system, \n",
|
||||
"2. There is a database called demo created on the said system\n",
|
||||
"\n",
|
||||
"The cell below creates a dataframe that will be stored within postgreSQL"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"2.0.4\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to PostgreSQL database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||
"pgw = transport.get.writer(provider=providers.POSTGRESQL,database='demo',table='friends')\n",
|
||||
"pgw.write(_data,if_exists='replace') #-- default is append\n",
|
||||
"print (transport.__version__)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Reading from PostgreSQL\n",
|
||||
"\n",
|
||||
"The cell below reads the data that has been written by the cell above and computes the average age within a PostreSQL (simple query). \n",
|
||||
"\n",
|
||||
"- Basic read of the designated table (friends) created above\n",
|
||||
"- Execute an aggregate SQL against the table\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"**NOTE**\n",
|
||||
"\n",
|
||||
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
|
||||
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
" name age\n",
|
||||
"0 James Bond 55\n",
|
||||
"1 Steve Rogers 150\n",
|
||||
"2 Steve Nyemba 44\n",
|
||||
"--------- STATISTICS ------------\n",
|
||||
" _counts avg\n",
|
||||
"0 3 83.0\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"pgr = transport.get.reader(provider=providers.POSTGRESQL,database='demo',table='friends')\n",
|
||||
"_df = pgr.read()\n",
|
||||
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
|
||||
"_sdf = pgr.read(sql=_query)\n",
|
||||
"print (_df)\n",
|
||||
"print ('--------- STATISTICS ------------')\n",
|
||||
"print (_sdf)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"An **auth-file** is a file that contains database parameters used to access the database. \n",
|
||||
"For code in shared environments, we recommend \n",
|
||||
"\n",
|
||||
"1. Having the **auth-file** stored on disk \n",
|
||||
"2. and the location of the file is set to an environment variable.\n",
|
||||
"\n",
|
||||
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"{'host': 'klingon.io',\n",
|
||||
" 'port': 5432,\n",
|
||||
" 'username': 'me',\n",
|
||||
" 'password': 'foobar',\n",
|
||||
" 'provider': 'postgresql',\n",
|
||||
" 'database': 'demo',\n",
|
||||
" 'table': 'friends'}"
|
||||
]
|
||||
},
|
||||
"execution_count": 4,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"{\n",
|
||||
" \"host\":\"klingon.io\",\"port\":5432,\"username\":\"me\",\"password\":\"foobar\", \"provider\":\"postgresql\",\n",
|
||||
" \"database\":\"demo\",\"table\":\"friends\"\n",
|
||||
"}"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -1,131 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Writing to AWS S3\n",
|
||||
"\n",
|
||||
"We have setup our demo environment with the label **aws** passed to reference our s3 access_key and secret_key and file (called friends.csv). In the cell below we will write the data to our aws s3 bucket named **com.phi.demo**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"2.2.1\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to mongodb database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||
"mgw = transport.get.writer(label='aws')\n",
|
||||
"mgw.write(_data)\n",
|
||||
"print (transport.__version__)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Reading from AWS S3\n",
|
||||
"\n",
|
||||
"The cell below reads the data that has been written by the cell above and computes the average age within a mongodb pipeline. The code in the background executes an aggregation using\n",
|
||||
"\n",
|
||||
"- Basic read of the designated file **friends.csv**\n",
|
||||
"- Compute average age using standard pandas functions\n",
|
||||
"\n",
|
||||
"**NOTE**\n",
|
||||
"\n",
|
||||
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
|
||||
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
" bname age\n",
|
||||
"0 James Bond 55\n",
|
||||
"1 Steve Rogers 150\n",
|
||||
"2 Steve Nyemba 44\n",
|
||||
"--------- STATISTICS ------------\n",
|
||||
"83.0\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"\n",
|
||||
"def cast(stream) :\n",
|
||||
" print (stream)\n",
|
||||
" return pd.DataFrame(str(stream))\n",
|
||||
"mgr = transport.get.reader(label='aws')\n",
|
||||
"_df = mgr.read()\n",
|
||||
"print (_df)\n",
|
||||
"print ('--------- STATISTICS ------------')\n",
|
||||
"print (_df.age.mean())"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"An **auth-file** is a file that contains database parameters used to access the database. \n",
|
||||
"For code in shared environments, we recommend \n",
|
||||
"\n",
|
||||
"1. Having the **auth-file** stored on disk \n",
|
||||
"2. and the location of the file is set to an environment variable.\n",
|
||||
"\n",
|
||||
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -1,143 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Writing to SQLite3+\n",
|
||||
"\n",
|
||||
"The requirements to get started are minimal (actually none). The cell below creates a dataframe that will be stored within SQLite 3+"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"2.0.4\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to PostgreSQL database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||
"sqw = transport.get.writer(provider=providers.SQLITE,database='/home/steve/demo.db3',table='friends')\n",
|
||||
"sqw.write(_data,if_exists='replace') #-- default is append\n",
|
||||
"print (transport.__version__)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Reading from SQLite3+\n",
|
||||
"\n",
|
||||
"The cell below reads the data that has been written by the cell above and computes the average age within a PostreSQL (simple query). \n",
|
||||
"\n",
|
||||
"- Basic read of the designated table (friends) created above\n",
|
||||
"- Execute an aggregate SQL against the table\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"**NOTE**\n",
|
||||
"\n",
|
||||
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
|
||||
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
" name age\n",
|
||||
"0 James Bond 55\n",
|
||||
"1 Steve Rogers 150\n",
|
||||
"2 Steve Nyemba 44\n",
|
||||
"--------- STATISTICS ------------\n",
|
||||
" _counts AVG(age)\n",
|
||||
"0 3 83.0\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"sqr = transport.get.reader(provider=providers.SQLITE,database='/home/steve/demo.db3',table='friends')\n",
|
||||
"_df = sqr.read()\n",
|
||||
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
|
||||
"_sdf = sqr.read(sql=_query)\n",
|
||||
"print (_df)\n",
|
||||
"print ('--------- STATISTICS ------------')\n",
|
||||
"print (_sdf)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"An **auth-file** is a file that contains database parameters used to access the database. \n",
|
||||
"For code in shared environments, we recommend \n",
|
||||
"\n",
|
||||
"1. Having the **auth-file** stored on disk \n",
|
||||
"2. and the location of the file is set to an environment variable.\n",
|
||||
"\n",
|
||||
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 5,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"\n",
|
||||
"{\n",
|
||||
" \"provider\":\"sqlite\",\n",
|
||||
" \"database\":\"/home/steve/demo.db3\",\"table\":\"friends\"\n",
|
||||
"}\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -1,6 +0,0 @@
|
||||
"""
|
||||
Steve L. Nyemba, nyemba@gmail.com
|
||||
This namespace implements support for cloud databases databricks,bigquery ...
|
||||
"""
|
||||
from . import bigquery, databricks, nextcloud, s3
|
||||
|
@ -1,159 +0,0 @@
|
||||
"""
|
||||
Implementing support for google's bigquery
|
||||
- cloud.bigquery.Read
|
||||
- cloud.bigquery.Write
|
||||
"""
|
||||
import json
|
||||
from google.oauth2 import service_account
|
||||
from google.cloud import bigquery as bq
|
||||
|
||||
from multiprocessing import Lock, RLock
|
||||
import pandas as pd
|
||||
import pandas_gbq as pd_gbq
|
||||
import numpy as np
|
||||
import time
|
||||
|
||||
MAX_CHUNK = 2000000
|
||||
class BigQuery:
|
||||
def __init__(self,**_args):
|
||||
path = _args['service_key'] if 'service_key' in _args else _args['private_key']
|
||||
self.credentials = service_account.Credentials.from_service_account_file(path)
|
||||
self.dataset = _args['dataset'] if 'dataset' in _args else None
|
||||
self.path = path
|
||||
self.dtypes = _args['dtypes'] if 'dtypes' in _args else None
|
||||
self.table = _args['table'] if 'table' in _args else None
|
||||
self.client = bq.Client.from_service_account_json(self.path)
|
||||
def meta(self,**_args):
|
||||
"""
|
||||
This function returns meta data for a given table or query with dataset/table properly formatted
|
||||
:param table name of the name WITHOUT including dataset
|
||||
:param sql sql query to be pulled,
|
||||
"""
|
||||
table = _args['table'] if 'table' in _args else self.table
|
||||
|
||||
try:
|
||||
if table :
|
||||
_dataset = self.dataset if 'dataset' not in _args else _args['dataset']
|
||||
sql = f"""SELECT column_name as name, data_type as type FROM {_dataset}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{table}' """
|
||||
_info = {'credentials':self.credentials,'dialect':'standard'}
|
||||
return pd_gbq.read_gbq(sql,**_info).to_dict(orient='records')
|
||||
# return self.read(sql=sql).to_dict(orient='records')
|
||||
# ref = self.client.dataset(self.dataset).table(table)
|
||||
|
||||
# _schema = self.client.get_table(ref).schema
|
||||
# return [{"name":_item.name,"type":_item.field_type,"description":( "" if not hasattr(_item,"description") else _item.description )} for _item in _schema]
|
||||
else :
|
||||
return []
|
||||
except Exception as e:
|
||||
|
||||
return []
|
||||
def has(self,**_args):
|
||||
found = False
|
||||
try:
|
||||
_has = self.meta(**_args)
|
||||
found = _has is not None and len(_has) > 0
|
||||
except Exception as e:
|
||||
pass
|
||||
return found
|
||||
class Reader (BigQuery):
|
||||
"""
|
||||
Implementing support for reading from bigquery, This class acts as a wrapper around google's API
|
||||
"""
|
||||
def __init__(self,**_args):
|
||||
|
||||
super().__init__(**_args)
|
||||
def apply(self,sql):
|
||||
return self.read(sql=sql)
|
||||
|
||||
def read(self,**_args):
|
||||
SQL = None
|
||||
table = self.table if 'table' not in _args else _args['table']
|
||||
if 'sql' in _args :
|
||||
SQL = _args['sql']
|
||||
elif table:
|
||||
|
||||
table = "".join(["`",table,"`"]) if '.' in table else "".join(["`:dataset.",table,"`"])
|
||||
SQL = "SELECT * FROM :table ".replace(":table",table)
|
||||
if not SQL :
|
||||
return None
|
||||
if SQL and 'limit' in _args:
|
||||
SQL += " LIMIT "+str(_args['limit'])
|
||||
if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset:
|
||||
SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset)
|
||||
_info = {'credentials':self.credentials,'dialect':'standard'}
|
||||
return pd_gbq.read_gbq(SQL,**_info) if SQL else None
|
||||
# return self.client.query(SQL).to_dataframe() if SQL else None
|
||||
|
||||
class Writer (BigQuery):
|
||||
"""
|
||||
This class implements support for writing against bigquery
|
||||
"""
|
||||
lock = RLock()
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
|
||||
self.parallel = False if 'lock' not in _args else _args['lock']
|
||||
self.table = _args['table'] if 'table' in _args else None
|
||||
self.mode = {'if_exists':'append','chunksize':900000,'destination_table':self.table,'credentials':self.credentials}
|
||||
self._chunks = 1 if 'chunks' not in _args else int(_args['chunks'])
|
||||
self._location = 'US' if 'location' not in _args else _args['location']
|
||||
def write(self,_data,**_args) :
|
||||
"""
|
||||
This function will perform a write to bigquery
|
||||
:_data data-frame to be written to bigquery
|
||||
"""
|
||||
try:
|
||||
if self.parallel or 'lock' in _args :
|
||||
Writer.lock.acquire()
|
||||
_args['table'] = self.table if 'table' not in _args else _args['table']
|
||||
self._write(_data,**_args)
|
||||
finally:
|
||||
if self.parallel:
|
||||
Writer.lock.release()
|
||||
def submit(self,_sql):
|
||||
"""
|
||||
Write the output of a massive query to a given table, biquery will handle this as a job
|
||||
This function will return the job identifier
|
||||
"""
|
||||
_config = bq.QueryJobConfig()
|
||||
_config.destination = self.client.dataset(self.dataset).table(self.table)
|
||||
_config.allow_large_results = True
|
||||
# _config.write_disposition = bq.bq_consts.WRITE_APPEND
|
||||
_config.dry_run = False
|
||||
# _config.priority = 'BATCH'
|
||||
_resp = self.client.query(_sql,location=self._location,job_config=_config)
|
||||
return _resp.job_id
|
||||
def status (self,_id):
|
||||
return self.client.get_job(_id,location=self._location)
|
||||
def _write(self,_info,**_args) :
|
||||
_df = None
|
||||
if type(_info) in [list,pd.DataFrame] :
|
||||
if type(_info) == list :
|
||||
_df = pd.DataFrame(_info)
|
||||
elif type(_info) == pd.DataFrame :
|
||||
_df = _info
|
||||
|
||||
if '.' not in _args['table'] :
|
||||
self.mode['destination_table'] = '.'.join([self.dataset,_args['table']])
|
||||
else:
|
||||
|
||||
self.mode['destination_table'] = _args['table'].strip()
|
||||
if 'schema' in _args :
|
||||
self.mode['table_schema'] = _args['schema']
|
||||
#
|
||||
# Let us insure that the types are somewhat compatible ...
|
||||
# _map = {'INTEGER':np.int64,'DATETIME':'datetime64[ns]','TIMESTAMP':'datetime64[ns]','FLOAT':np.float64,'DOUBLE':np.float64,'STRING':str}
|
||||
# _mode = copy.deepcopy(self.mode)
|
||||
# _mode = self.mode
|
||||
# _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
|
||||
#
|
||||
# Let us adjust the chunking here
|
||||
if 'if_exists' in _args :
|
||||
self.mode['if_exists'] = _args['if_exists']
|
||||
self._chunks = 10 if _df.shape[0] > MAX_CHUNK and self._chunks == 1 else self._chunks
|
||||
_indexes = np.array_split(np.arange(_df.shape[0]),self._chunks)
|
||||
for i in _indexes :
|
||||
# _df.iloc[i].to_gbq(**self.mode)
|
||||
pd_gbq.to_gbq(_df.iloc[i],**self.mode)
|
||||
time.sleep(1)
|
||||
pass
|
@ -1,137 +0,0 @@
|
||||
"""
|
||||
Data Transport - 1.0
|
||||
Steve L. Nyemba, The Phi Technology LLC
|
||||
|
||||
This file is a wrapper around s3 bucket provided by AWS for reading and writing content
|
||||
TODO:
|
||||
- Address limitations that will properly read csv if it is stored with content type text/csv
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
import boto3
|
||||
# from boto.s3.connection import S3Connection, OrdinaryCallingFormat
|
||||
import numpy as np
|
||||
import botocore
|
||||
from smart_open import smart_open
|
||||
import sys
|
||||
|
||||
import json
|
||||
from io import StringIO
|
||||
import pandas as pd
|
||||
import json
|
||||
|
||||
class s3 :
|
||||
"""
|
||||
@TODO: Implement a search function for a file given a bucket??
|
||||
"""
|
||||
def __init__(self,**args) :
|
||||
"""
|
||||
This function will extract a file or set of files from s3 bucket provided
|
||||
@param access_key
|
||||
@param secret_key
|
||||
@param path location of the file
|
||||
@param filter filename or filtering elements
|
||||
"""
|
||||
try:
|
||||
self._client = boto3.client('s3',aws_access_key_id=args['access_key'],aws_secret_access_key=args['secret_key'],region_name=args['region'])
|
||||
self._bucket_name = args['bucket']
|
||||
self._file_name = args['file']
|
||||
self._region = args['region']
|
||||
except Exception as e :
|
||||
print (e)
|
||||
pass
|
||||
def has(self,**_args):
|
||||
_found = None
|
||||
try:
|
||||
if 'file' in _args and 'bucket' in _args:
|
||||
_found = self.meta(**_args)
|
||||
elif 'bucket' in _args and not 'file' in _args:
|
||||
_found = self._client.list_objects(Bucket=_args['bucket'])
|
||||
elif 'file' in _args and not 'bucket' in _args :
|
||||
_found = self.meta(bucket=self._bucket_name,file = _args['file'])
|
||||
except Exception as e:
|
||||
_found = None
|
||||
pass
|
||||
return type(_found) == dict
|
||||
def meta(self,**args):
|
||||
"""
|
||||
This function will return information either about the file in a given bucket
|
||||
:name name of the bucket
|
||||
"""
|
||||
_bucket = self._bucket_name if 'bucket' not in args else args['bucket']
|
||||
_file = self._file_name if 'file' not in args else args['file']
|
||||
_data = self._client.get_object(Bucket=_bucket,Key=_file)
|
||||
return _data['ResponseMetadata']
|
||||
def close(self):
|
||||
self._client.close()
|
||||
|
||||
class Reader(s3) :
|
||||
"""
|
||||
Because s3 contains buckets and files, reading becomes a tricky proposition :
|
||||
- list files if file is None
|
||||
- stream content if file is Not None
|
||||
@TODO: support read from all buckets, think about it
|
||||
"""
|
||||
def __init__(self,**_args) :
|
||||
super().__init__(**_args)
|
||||
|
||||
def _stream(self,**_args):
|
||||
"""
|
||||
At this point we should stream a file from a given bucket
|
||||
"""
|
||||
_object = self._client.get_object(Bucket=_args['bucket'],Key=_args['file'])
|
||||
_stream = None
|
||||
try:
|
||||
_stream = _object['Body'].read()
|
||||
except Exception as e:
|
||||
pass
|
||||
if not _stream :
|
||||
return None
|
||||
if _object['ContentType'] in ['text/csv'] :
|
||||
return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")))
|
||||
else:
|
||||
return _stream
|
||||
|
||||
def read(self,**args) :
|
||||
|
||||
_name = self._file_name if 'file' not in args else args['file']
|
||||
_bucket = args['bucket'] if 'bucket' in args else self._bucket_name
|
||||
return self._stream(bucket=_bucket,file=_name)
|
||||
|
||||
|
||||
class Writer(s3) :
|
||||
"""
|
||||
|
||||
"""
|
||||
def __init__(self,**_args) :
|
||||
super().__init__(**_args)
|
||||
#
|
||||
#
|
||||
if not self.has(bucket=self._bucket_name) :
|
||||
self.make_bucket(self._bucket_name)
|
||||
def make_bucket(self,bucket_name):
|
||||
"""
|
||||
This function will create a folder in a bucket,It is best that the bucket is organized as a namespace
|
||||
:name name of the folder
|
||||
"""
|
||||
|
||||
self._client.create_bucket(Bucket=bucket_name,CreateBucketConfiguration={'LocationConstraint': self._region})
|
||||
def write(self,_data,**_args):
|
||||
"""
|
||||
This function will write the data to the s3 bucket, files can be either csv, or json formatted files
|
||||
"""
|
||||
content = 'text/plain'
|
||||
if type(_data) == pd.DataFrame :
|
||||
_stream = _data.to_csv(index=False)
|
||||
content = 'text/csv'
|
||||
elif type(_data) == dict :
|
||||
_stream = json.dumps(_data)
|
||||
content = 'application/json'
|
||||
else:
|
||||
_stream = _data
|
||||
file = StringIO(_stream)
|
||||
bucket = self._bucket_name if 'bucket' not in _args else _args['bucket']
|
||||
file_name = self._file_name if 'file' not in _args else _args['file']
|
||||
self._client.put_object(Bucket=bucket, Key = file_name, Body=_stream,ContentType=content)
|
||||
pass
|
||||
|
@ -1,18 +1,138 @@
|
||||
import json
|
||||
"""
|
||||
Data Transport - 1.0
|
||||
Steve L. Nyemba, The Phi Technology LLC
|
||||
|
||||
This module is designed to serve as a wrapper to a set of supported data stores :
|
||||
- couchdb
|
||||
- mongodb
|
||||
- Files (character delimited)
|
||||
- Queues (Rabbmitmq)
|
||||
- Session (Flask)
|
||||
- s3
|
||||
The supported operations are read/write and providing meta data to the calling code
|
||||
Requirements :
|
||||
pymongo
|
||||
boto
|
||||
couldant
|
||||
@TODO:
|
||||
Enable read/writing to multiple reads/writes
|
||||
"""
|
||||
__author__ = 'The Phi Technology'
|
||||
import numpy as np
|
||||
from datetime import datetime
|
||||
|
||||
class IEncoder (json.JSONEncoder):
|
||||
def default (self,object):
|
||||
if type(object) == np.integer :
|
||||
return int(object)
|
||||
elif type(object) == np.floating:
|
||||
return float(object)
|
||||
elif type(object) == np.ndarray :
|
||||
return object.tolist()
|
||||
elif type(object) == datetime :
|
||||
return object.isoformat()
|
||||
else:
|
||||
return super(IEncoder,self).default(object)
|
||||
import json
|
||||
import importlib
|
||||
from multiprocessing import RLock
|
||||
import queue
|
||||
# import couch
|
||||
# import mongo
|
||||
|
||||
|
||||
class IO:
|
||||
def init(self,**args):
|
||||
"""
|
||||
This function enables attributes to be changed at runtime. Only the attributes defined in the class can be changed
|
||||
Adding attributes will require sub-classing otherwise we may have an unpredictable class ...
|
||||
"""
|
||||
allowed = list(vars(self).keys())
|
||||
for field in args :
|
||||
if field not in allowed :
|
||||
continue
|
||||
value = args[field]
|
||||
setattr(self,field,value)
|
||||
class Reader (IO):
|
||||
"""
|
||||
This class is an abstraction of a read functionalities of a data store
|
||||
"""
|
||||
def __init__(self):
|
||||
pass
|
||||
def meta(self,**_args):
|
||||
"""
|
||||
This function is intended to return meta-data associated with what has just been read
|
||||
@return object of meta data information associated with the content of the store
|
||||
"""
|
||||
raise Exception ("meta function needs to be implemented")
|
||||
def read(self,**args):
|
||||
"""
|
||||
This function is intended to read the content of a store provided parameters to be used at the discretion of the subclass
|
||||
"""
|
||||
raise Exception ("read function needs to be implemented")
|
||||
|
||||
|
||||
class Writer(IO):
|
||||
def __init__(self):
|
||||
self.cache = {"default":[]}
|
||||
def log(self,**args):
|
||||
self.cache[id] = args
|
||||
def meta (self,id="default",**args):
|
||||
raise Exception ("meta function needs to be implemented")
|
||||
def format(self,row,xchar):
|
||||
if xchar is not None and isinstance(row,list):
|
||||
return xchar.join(row)+'\n'
|
||||
elif xchar is None and isinstance(row,dict):
|
||||
row = json.dumps(row)
|
||||
return row
|
||||
def write(self,**args):
|
||||
"""
|
||||
This function will write content to a store given parameters to be used at the discretion of the sub-class
|
||||
"""
|
||||
raise Exception ("write function needs to be implemented")
|
||||
|
||||
def archive(self):
|
||||
"""
|
||||
It is important to be able to archive data so as to insure that growth is controlled
|
||||
Nothing in nature grows indefinitely neither should data being handled.
|
||||
"""
|
||||
raise Exception ("archive function needs to be implemented")
|
||||
def close(self):
|
||||
"""
|
||||
This function will close the persistent storage connection/handler
|
||||
"""
|
||||
pass
|
||||
class ReadWriter(Reader,Writer) :
|
||||
"""
|
||||
This class implements the read/write functions aggregated
|
||||
"""
|
||||
pass
|
||||
# class Console(Writer):
|
||||
# lock = RLock()
|
||||
# def __init__(self,**_args):
|
||||
# self.lock = _args['lock'] if 'lock' in _args else False
|
||||
# self.info = self.write
|
||||
# self.debug = self.write
|
||||
# self.log = self.write
|
||||
# pass
|
||||
# def write (self,logs=None,**_args):
|
||||
# if self.lock :
|
||||
# Console.lock.acquire()
|
||||
# try:
|
||||
# _params = _args if logs is None and _args else logs
|
||||
# if type(_params) == list:
|
||||
# for row in _params :
|
||||
# print (row)
|
||||
# else:
|
||||
# print (_params)
|
||||
# except Exception as e :
|
||||
# print (e)
|
||||
# finally:
|
||||
# if self.lock :
|
||||
# Console.lock.release()
|
||||
|
||||
|
||||
"""
|
||||
@NOTE : Experimental !!
|
||||
"""
|
||||
class Proxy :
|
||||
"""
|
||||
This class will forward a call to a function that is provided by the user code
|
||||
"""
|
||||
def __init__(self,**_args):
|
||||
self.callback = _args['callback']
|
||||
def read(self,**_args) :
|
||||
try:
|
||||
return self.callback(**_args)
|
||||
except Exception as e:
|
||||
return self.callback()
|
||||
|
||||
pass
|
||||
def write(self,data,**_args):
|
||||
self.callback(data,**_args)
|
||||
|
@ -0,0 +1,244 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
|
||||
if sys.version_info[0] > 2 :
|
||||
from transport.common import Reader, Writer #, factory
|
||||
else:
|
||||
from common import Reader,Writer
|
||||
# import nujson as json
|
||||
import json
|
||||
# from threading import Lock
|
||||
import sqlite3
|
||||
import pandas as pd
|
||||
from multiprocessing import Lock
|
||||
class DiskReader(Reader) :
|
||||
"""
|
||||
This class is designed to read data from disk (location on hard drive)
|
||||
@pre : isready() == True
|
||||
"""
|
||||
|
||||
def __init__(self,**params):
|
||||
"""
|
||||
@param path absolute path of the file to be read
|
||||
"""
|
||||
|
||||
Reader.__init__(self)
|
||||
self.path = params['path'] if 'path' in params else None
|
||||
self.delimiter = params['delimiter'] if 'delimiter' in params else ','
|
||||
|
||||
def isready(self):
|
||||
return os.path.exists(self.path)
|
||||
def meta(self,**_args):
|
||||
return []
|
||||
def read(self,**args):
|
||||
_path = self.path if 'path' not in args else args['path']
|
||||
_delimiter = self.delimiter if 'delimiter' not in args else args['delimiter']
|
||||
return pd.read_csv(_path,delimiter=self.delimiter)
|
||||
def stream(self,**args):
|
||||
"""
|
||||
This function reads the rows from a designated location on disk
|
||||
@param size number of rows to be read, -1 suggests all rows
|
||||
"""
|
||||
|
||||
size = -1 if 'size' not in args else int(args['size'])
|
||||
f = open(self.path,'rU')
|
||||
i = 1
|
||||
for row in f:
|
||||
|
||||
i += 1
|
||||
if size == i:
|
||||
break
|
||||
if self.delimiter :
|
||||
yield row.split(self.delimiter)
|
||||
yield row
|
||||
f.close()
|
||||
class DiskWriter(Writer):
|
||||
|
||||
"""
|
||||
This function writes output to disk in a designated location. The function will write a text to a text file
|
||||
- If a delimiter is provided it will use that to generate a xchar-delimited file
|
||||
- If not then the object will be dumped as is
|
||||
"""
|
||||
THREAD_LOCK = Lock()
|
||||
def __init__(self,**params):
|
||||
super().__init__()
|
||||
self._path = params['path']
|
||||
self._delimiter = params['delimiter'] if 'delimiter' in params else None
|
||||
self._mode = 'w' if 'mode' not in params else params['mode']
|
||||
# def meta(self):
|
||||
# return self.cache['meta']
|
||||
# def isready(self):
|
||||
# """
|
||||
# This function determines if the class is ready for execution or not
|
||||
# i.e it determines if the preconditions of met prior execution
|
||||
# """
|
||||
# return True
|
||||
# # p = self.path is not None and os.path.exists(self.path)
|
||||
# # q = self.name is not None
|
||||
# # return p and q
|
||||
# def format (self,row):
|
||||
# self.cache['meta']['cols'] += len(row) if isinstance(row,list) else len(row.keys())
|
||||
# self.cache['meta']['rows'] += 1
|
||||
# return (self.delimiter.join(row) if self.delimiter else json.dumps(row))+"\n"
|
||||
def write(self,info,**_args):
|
||||
"""
|
||||
This function writes a record to a designated file
|
||||
@param label <passed|broken|fixed|stats>
|
||||
@param row row to be written
|
||||
"""
|
||||
try:
|
||||
|
||||
|
||||
DiskWriter.THREAD_LOCK.acquire()
|
||||
|
||||
_delim = self._delimiter if 'delimiter' not in _args else _args['delimiter']
|
||||
_path = self._path if 'path' not in _args else _args['path']
|
||||
_mode = self._mode if 'mode' not in _args else _args['mode']
|
||||
info.to_csv(_path,index=False,sep=_delim)
|
||||
pass
|
||||
except Exception as e:
|
||||
#
|
||||
# Not sure what should be done here ...
|
||||
pass
|
||||
finally:
|
||||
DiskWriter.THREAD_LOCK.release()
|
||||
class SQLite :
|
||||
def __init__(self,**_args) :
|
||||
self.path = _args['database'] if 'database' in _args else _args['path']
|
||||
self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE")
|
||||
self.conn.row_factory = sqlite3.Row
|
||||
self.fields = _args['fields'] if 'fields' in _args else []
|
||||
def has (self,**_args):
|
||||
found = False
|
||||
try:
|
||||
if 'table' in _args :
|
||||
table = _args['table']
|
||||
sql = "SELECT * FROM :table limit 1".replace(":table",table)
|
||||
_df = pd.read_sql(sql,self.conn)
|
||||
found = _df.columns.size > 0
|
||||
except Exception as e:
|
||||
pass
|
||||
return found
|
||||
def close(self):
|
||||
try:
|
||||
self.conn.close()
|
||||
except Exception as e :
|
||||
print(e)
|
||||
def apply(self,sql):
|
||||
try:
|
||||
if not sql.lower().startswith('select'):
|
||||
cursor = self.conn.cursor()
|
||||
cursor.execute(sql)
|
||||
cursor.close()
|
||||
self.conn.commit()
|
||||
else:
|
||||
return pd.read_sql(sql,self.conn)
|
||||
except Exception as e:
|
||||
print (e)
|
||||
class SQLiteReader (SQLite,DiskReader):
|
||||
def __init__(self,**args):
|
||||
super().__init__(**args)
|
||||
# DiskReader.__init__(self,**args)
|
||||
# self.path = args['database'] if 'database' in args else args['path']
|
||||
# self.conn = sqlite3.connect(self.path,isolation_level=None)
|
||||
# self.conn.row_factory = sqlite3.Row
|
||||
self.table = args['table'] if 'table' in args else None
|
||||
def read(self,**args):
|
||||
if 'sql' in args :
|
||||
sql = args['sql']
|
||||
elif 'filter' in args :
|
||||
sql = "SELECT :fields FROM ",self.table, "WHERE (:filter)".replace(":filter",args['filter'])
|
||||
sql = sql.replace(":fields",args['fields']) if 'fields' in args else sql.replace(":fields","*")
|
||||
else:
|
||||
sql = ' '.join(['SELECT * FROM ',self.table])
|
||||
if 'limit' in args :
|
||||
sql = sql + " LIMIT "+args['limit']
|
||||
return pd.read_sql(sql,self.conn)
|
||||
def close(self):
|
||||
try:
|
||||
self.conn.close()
|
||||
except Exception as e :
|
||||
pass
|
||||
|
||||
class SQLiteWriter(SQLite,DiskWriter) :
|
||||
connection = None
|
||||
LOCK = Lock()
|
||||
def __init__(self,**args):
|
||||
"""
|
||||
:path
|
||||
:fields json|csv
|
||||
"""
|
||||
# DiskWriter.__init__(self,**args)
|
||||
super().__init__(**args)
|
||||
self.table = args['table'] if 'table' in args else None
|
||||
|
||||
# self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE")
|
||||
# self.conn.row_factory = sqlite3.Row
|
||||
# self.fields = args['fields'] if 'fields' in args else []
|
||||
|
||||
if self.fields and not self.isready() and self.table:
|
||||
self.init(self.fields)
|
||||
SQLiteWriter.connection = self.conn
|
||||
def init(self,fields):
|
||||
self.fields = fields;
|
||||
sql = " ".join(["CREATE TABLE IF NOT EXISTS ",self.table," (", ",".join(self.fields),")"])
|
||||
|
||||
cursor = self.conn.cursor()
|
||||
cursor.execute(sql)
|
||||
cursor.close()
|
||||
self.conn.commit()
|
||||
def isready(self):
|
||||
try:
|
||||
sql = "SELECT count(*) FROM sqlite_master where name=':table'"
|
||||
sql = sql.replace(":table",self.table)
|
||||
cursor = self.conn.cursor()
|
||||
|
||||
r = cursor.execute(sql)
|
||||
r = r.fetchall()
|
||||
cursor.close()
|
||||
|
||||
return r[0][0] != 0
|
||||
except Exception as e:
|
||||
pass
|
||||
return 0
|
||||
#
|
||||
# If the table doesn't exist we should create it
|
||||
#
|
||||
def write(self,info,**_args):
|
||||
"""
|
||||
"""
|
||||
|
||||
#if not self.fields :
|
||||
# #if type(info) == pd.DataFrame :
|
||||
# # _columns = list(info.columns)
|
||||
# #self.init(list(info.keys()))
|
||||
|
||||
if type(info) == dict :
|
||||
info = [info]
|
||||
elif type(info) == pd.DataFrame :
|
||||
info = info.fillna('')
|
||||
info = info.to_dict(orient='records')
|
||||
if not self.fields :
|
||||
_rec = info[0]
|
||||
self.init(list(_rec.keys()))
|
||||
|
||||
SQLiteWriter.LOCK.acquire()
|
||||
try:
|
||||
|
||||
cursor = self.conn.cursor()
|
||||
sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(:values)"])
|
||||
for row in info :
|
||||
stream =["".join(["",value,""]) if type(value) == str else value for value in row.values()]
|
||||
stream = json.dumps(stream).replace("[","").replace("]","")
|
||||
|
||||
|
||||
self.conn.execute(sql.replace(":values",stream) )
|
||||
# cursor.commit()
|
||||
|
||||
self.conn.commit()
|
||||
# print (sql)
|
||||
except Exception as e :
|
||||
print (e)
|
||||
pass
|
||||
SQLiteWriter.LOCK.release()
|
@ -1,19 +0,0 @@
|
||||
"""
|
||||
This file will be intended to handle duckdb database
|
||||
"""
|
||||
|
||||
import duckdb
|
||||
from transport.common import Reader,Writer
|
||||
|
||||
class Duck(Reader):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
self._path = None if 'path' not in _args else _args['path']
|
||||
self._handler = duckdb.connect() if not self._path else duckdb.connect(self._path)
|
||||
|
||||
|
||||
class DuckReader(Duck) :
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
def read(self,**_args) :
|
||||
pass
|
@ -1,120 +0,0 @@
|
||||
"""
|
||||
This class is a wrapper around read/write classes of cloud,sql,nosql,other packages
|
||||
The wrapper allows for application of plugins as pre-post conditions.
|
||||
NOTE: Plugins are converted to a pipeline, so we apply a pipeline when reading or writing:
|
||||
- upon initialization we will load plugins
|
||||
- on read/write we apply a pipeline (if passed as an argument)
|
||||
"""
|
||||
from transport.plugins import plugin, PluginLoader
|
||||
import transport
|
||||
from transport import providers
|
||||
from multiprocessing import Process
|
||||
import time
|
||||
|
||||
|
||||
class IO:
|
||||
"""
|
||||
Base wrapper class for read/write and support for logs
|
||||
"""
|
||||
def __init__(self,_agent,plugins):
|
||||
self._agent = _agent
|
||||
if plugins :
|
||||
self._init_plugins(plugins)
|
||||
else:
|
||||
self._plugins = None
|
||||
|
||||
def _init_plugins(self,_args):
|
||||
"""
|
||||
This function will load pipelined functions as a plugin loader
|
||||
"""
|
||||
if 'path' in _args and 'names' in _args :
|
||||
self._plugins = PluginLoader(**_args)
|
||||
else:
|
||||
self._plugins = PluginLoader()
|
||||
[self._plugins.set(_pointer) for _pointer in _args]
|
||||
#
|
||||
# @TODO: We should have a way to log what plugins are loaded and ready to use
|
||||
def meta (self,**_args):
|
||||
if hasattr(self._agent,'meta') :
|
||||
return self._agent.meta(**_args)
|
||||
return []
|
||||
|
||||
def close(self):
|
||||
if hasattr(self._agent,'close') :
|
||||
self._agent.close()
|
||||
def apply(self):
|
||||
"""
|
||||
applying pre/post conditions given a pipeline expression
|
||||
"""
|
||||
for _pointer in self._plugins :
|
||||
_data = _pointer(_data)
|
||||
def apply(self,_query):
|
||||
if hasattr(self._agent,'apply') :
|
||||
return self._agent.apply(_query)
|
||||
return None
|
||||
def submit(self,_query):
|
||||
return self.delegate('submit',_query)
|
||||
def delegate(self,_name,_query):
|
||||
if hasattr(self._agent,_name) :
|
||||
pointer = getattr(self._agent,_name)
|
||||
return pointer(_query)
|
||||
return None
|
||||
class IReader(IO):
|
||||
"""
|
||||
This is a wrapper for read functionalities
|
||||
"""
|
||||
def __init__(self,_agent,pipeline=None):
|
||||
super().__init__(_agent,pipeline)
|
||||
def read(self,**_args):
|
||||
if 'plugins' in _args :
|
||||
self._init_plugins(_args['plugins'])
|
||||
_data = self._agent.read(**_args)
|
||||
if self._plugins and self._plugins.ratio() > 0 :
|
||||
_data = self._plugins.apply(_data)
|
||||
#
|
||||
# output data
|
||||
return _data
|
||||
class IWriter(IO):
|
||||
def __init__(self,_agent,pipeline=None):
|
||||
super().__init__(_agent,pipeline)
|
||||
def write(self,_data,**_args):
|
||||
if 'plugins' in _args :
|
||||
self._init_plugins(_args['plugins'])
|
||||
if self._plugins and self._plugins.ratio() > 0 :
|
||||
_data = self._plugins.apply(_data)
|
||||
|
||||
self._agent.write(_data,**_args)
|
||||
|
||||
#
|
||||
# The ETL object in its simplest form is an aggregation of read/write objects
|
||||
# @TODO: ETL can/should aggregate a writer as a plugin and apply it as a process
|
||||
|
||||
class IETL(IReader) :
|
||||
"""
|
||||
This class performs an ETL operation by ineriting a read and adding writes as pipeline functions
|
||||
"""
|
||||
def __init__(self,**_args):
|
||||
super().__init__(transport.get.reader(**_args['source']))
|
||||
if 'target' in _args:
|
||||
self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
|
||||
else:
|
||||
self._targets = []
|
||||
self.jobs = []
|
||||
#
|
||||
# If the parent is already multiprocessing
|
||||
self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
|
||||
def read(self,**_args):
|
||||
_data = super().read(**_args)
|
||||
|
||||
for _kwargs in self._targets :
|
||||
self.post(_data,**_kwargs)
|
||||
|
||||
return _data
|
||||
def post (self,_data,**_args) :
|
||||
"""
|
||||
This function returns an instance of a process that will perform the write operation
|
||||
:_args parameters associated with writer object
|
||||
"""
|
||||
writer = transport.get.writer(**_args)
|
||||
writer.write(_data)
|
||||
writer.close()
|
@ -1,12 +0,0 @@
|
||||
"""
|
||||
Steve L. Nyemba, nyemba@gmail.com
|
||||
This namespace implements support for cloud databases couchdb,mongodb, cloudant ...
|
||||
"""
|
||||
# from transport.nosql import couchdb
|
||||
# from transport.nosql import mongodb
|
||||
from . import mongodb
|
||||
from . import couchdb
|
||||
# import mongodb
|
||||
# import couchdb
|
||||
|
||||
cloudant = couchdb
|
@ -1 +0,0 @@
|
||||
from . import files, http, rabbitmq, callback, files, console
|
@ -1,10 +0,0 @@
|
||||
"""
|
||||
This class uses classback pattern to allow output to be printed to the console (debugging)
|
||||
"""
|
||||
from . import callback
|
||||
|
||||
|
||||
class Writer (callback.Writer):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(callback=print)
|
||||
|
@ -1,69 +0,0 @@
|
||||
"""
|
||||
This file is a wrapper around pandas built-in functionalities to handle character delimited files
|
||||
"""
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import os
|
||||
class File :
|
||||
def __init__(self,**params):
|
||||
"""
|
||||
|
||||
@param path absolute path of the file to be read
|
||||
"""
|
||||
self.path = params['path'] if 'path' in params else None
|
||||
self.delimiter = params['delimiter'] if 'delimiter' in params else ','
|
||||
|
||||
def isready(self):
|
||||
return os.path.exists(self.path)
|
||||
def meta(self,**_args):
|
||||
return []
|
||||
|
||||
class Reader (File):
|
||||
"""
|
||||
This class is designed to read data from disk (location on hard drive)
|
||||
@pre : isready() == True
|
||||
"""
|
||||
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
|
||||
def read(self,**args):
|
||||
_path = self.path if 'path' not in args else args['path']
|
||||
_delimiter = self.delimiter if 'delimiter' not in args else args['delimiter']
|
||||
return pd.read_csv(_path,delimiter=self.delimiter)
|
||||
def stream(self,**args):
|
||||
raise Exception ("streaming needs to be implemented")
|
||||
class Writer (File):
|
||||
|
||||
"""
|
||||
This function writes output to disk in a designated location. The function will write a text to a text file
|
||||
- If a delimiter is provided it will use that to generate a xchar-delimited file
|
||||
- If not then the object will be dumped as is
|
||||
"""
|
||||
# THREAD_LOCK = RLock()
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
self._mode = 'w' if 'mode' not in _args else _args['mode']
|
||||
|
||||
def write(self,info,**_args):
|
||||
"""
|
||||
This function writes a record to a designated file
|
||||
@param label <passed|broken|fixed|stats>
|
||||
@param row row to be written
|
||||
"""
|
||||
try:
|
||||
|
||||
_delim = self.delimiter if 'delimiter' not in _args else _args['delimiter']
|
||||
_path = self.path if 'path' not in _args else _args['path']
|
||||
_mode = self._mode if 'mode' not in _args else _args['mode']
|
||||
info.to_csv(_path,index=False,sep=_delim)
|
||||
|
||||
pass
|
||||
except Exception as e:
|
||||
#
|
||||
# Not sure what should be done here ...
|
||||
print (e)
|
||||
pass
|
||||
finally:
|
||||
# DiskWriter.THREAD_LOCK.release()
|
||||
pass
|
@ -1,129 +0,0 @@
|
||||
"""
|
||||
The functions within are designed to load external files and apply functions against the data
|
||||
The plugins are applied as
|
||||
- post-processing if we are reading data
|
||||
- and pre-processing if we are writing data
|
||||
|
||||
The plugin will use a decorator to identify meaningful functions
|
||||
@TODO: This should work in tandem with loggin (otherwise we don't have visibility into what is going on)
|
||||
"""
|
||||
import importlib as IL
|
||||
import importlib.util
|
||||
import sys
|
||||
import os
|
||||
|
||||
class plugin :
|
||||
"""
|
||||
Implementing function decorator for data-transport plugins (post-pre)-processing
|
||||
"""
|
||||
def __init__(self,**_args):
|
||||
"""
|
||||
:name name of the plugin
|
||||
:mode restrict to reader/writer
|
||||
:about tell what the function is about
|
||||
"""
|
||||
self._name = _args['name']
|
||||
self._about = _args['about']
|
||||
self._mode = _args['mode'] if 'mode' in _args else 'rw'
|
||||
def __call__(self,pointer,**kwargs):
|
||||
def wrapper(_args,**kwargs):
|
||||
return pointer(_args,**kwargs)
|
||||
#
|
||||
# @TODO:
|
||||
# add attributes to the wrapper object
|
||||
#
|
||||
setattr(wrapper,'transport',True)
|
||||
setattr(wrapper,'name',self._name)
|
||||
setattr(wrapper,'mode',self._mode)
|
||||
setattr(wrapper,'about',self._about)
|
||||
return wrapper
|
||||
|
||||
|
||||
class PluginLoader :
|
||||
"""
|
||||
This class is intended to load a plugin and make it available and assess the quality of the developed plugin
|
||||
"""
|
||||
def __init__(self,**_args):
|
||||
"""
|
||||
:path location of the plugin (should be a single file)
|
||||
:_names of functions to load
|
||||
"""
|
||||
_names = _args['names'] if 'names' in _args else None
|
||||
path = _args['path'] if 'path' in _args else None
|
||||
self._names = _names if type(_names) == list else [_names]
|
||||
self._modules = {}
|
||||
self._names = []
|
||||
if path and os.path.exists(path) and _names:
|
||||
for _name in self._names :
|
||||
|
||||
spec = importlib.util.spec_from_file_location('private', path)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module) #--loads it into sys.modules
|
||||
if hasattr(module,_name) :
|
||||
if self.isplugin(module,_name) :
|
||||
self._modules[_name] = getattr(module,_name)
|
||||
else:
|
||||
print ([f'Found {_name}', 'not plugin'])
|
||||
else:
|
||||
#
|
||||
# @TODO: We should log this somewhere some how
|
||||
print (['skipping ',_name, hasattr(module,_name)])
|
||||
pass
|
||||
else:
|
||||
#
|
||||
# Initialization is empty
|
||||
self._names = []
|
||||
pass
|
||||
def set(self,_pointer) :
|
||||
"""
|
||||
This function will set a pointer to the list of modules to be called
|
||||
This should be used within the context of using the framework as a library
|
||||
"""
|
||||
_name = _pointer.__name__
|
||||
|
||||
self._modules[_name] = _pointer
|
||||
self._names.append(_name)
|
||||
def isplugin(self,module,name):
|
||||
"""
|
||||
This function determines if a module is a recognized plugin
|
||||
:module module object loaded from importlib
|
||||
:name name of the functiion of interest
|
||||
"""
|
||||
|
||||
p = type(getattr(module,name)).__name__ =='function'
|
||||
q = hasattr(getattr(module,name),'transport')
|
||||
#
|
||||
# @TODO: add a generated key, and more indepth validation
|
||||
return p and q
|
||||
def has(self,_name):
|
||||
"""
|
||||
This will determine if the module name is loaded or not
|
||||
"""
|
||||
return _name in self._modules
|
||||
def ratio (self):
|
||||
"""
|
||||
This functiion determines how many modules loaded vs unloaded given the list of names
|
||||
"""
|
||||
|
||||
_n = len(self._names)
|
||||
return len(set(self._modules.keys()) & set (self._names)) / _n
|
||||
def apply(self,_data):
|
||||
for _name in self._modules :
|
||||
_pointer = self._modules[_name]
|
||||
#
|
||||
# @TODO: add exception handling
|
||||
_data = _pointer(_data)
|
||||
return _data
|
||||
# def apply(self,_data,_name):
|
||||
# """
|
||||
# This function applies an external module function against the data.
|
||||
# The responsibility is on the plugin to properly return data, thus responsibility is offloaded
|
||||
# """
|
||||
# try:
|
||||
|
||||
# _pointer = self._modules[_name]
|
||||
# _data = _pointer(_data)
|
||||
|
||||
# except Exception as e:
|
||||
# pass
|
||||
# return _data
|
@ -0,0 +1,102 @@
|
||||
# from transport.common import Reader, Writer,Console #, factory
|
||||
from transport import disk
|
||||
import sqlite3
|
||||
from transport import s3 as s3
|
||||
from transport import rabbitmq as queue
|
||||
from transport import couch as couch
|
||||
from transport import mongo as mongo
|
||||
from transport import sql as sql
|
||||
from transport import etl as etl
|
||||
from transport import qlistener
|
||||
from transport import bricks
|
||||
from transport import session
|
||||
from transport import nextcloud
|
||||
import psycopg2 as pg
|
||||
import mysql.connector as my
|
||||
from google.cloud import bigquery as bq
|
||||
import nzpy as nz #--- netezza drivers
|
||||
import os
|
||||
|
||||
from transport.version import __version__
|
||||
|
||||
POSTGRESQL = 'postgresql'
|
||||
MONGODB = 'mongodb'
|
||||
HTTP='http'
|
||||
BIGQUERY ='bigquery'
|
||||
FILE = 'file'
|
||||
ETL = 'etl'
|
||||
SQLITE = 'sqlite'
|
||||
SQLITE3= 'sqlite'
|
||||
REDSHIFT = 'redshift'
|
||||
NETEZZA = 'netezza'
|
||||
MYSQL = 'mysql+mysqlconnector'
|
||||
RABBITMQ = 'rabbitmq'
|
||||
MARIADB = 'mariadb'
|
||||
COUCHDB = 'couch'
|
||||
CONSOLE = 'console'
|
||||
ETL = 'etl'
|
||||
NEXTCLOUD = 'nextcloud'
|
||||
|
||||
#
|
||||
# synonyms of the above
|
||||
BQ = BIGQUERY
|
||||
MONGO = MONGODB
|
||||
FERRETDB= MONGODB
|
||||
PG = POSTGRESQL
|
||||
PSQL = POSTGRESQL
|
||||
PGSQL = POSTGRESQL
|
||||
S3 = 's3'
|
||||
AWS_S3 = 's3'
|
||||
RABBIT = RABBITMQ
|
||||
|
||||
QLISTENER = 'qlistener'
|
||||
QUEUE = QLISTENER
|
||||
DATABRICKS= 'databricks+connector'
|
||||
DRIVERS = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3}
|
||||
CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[NEXTCLOUD,S3,BIGQUERY,DATABRICKS],'file':[FILE],
|
||||
'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QUEUE],'http':[HTTP]}
|
||||
|
||||
READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader},
|
||||
'cloud':{BIGQUERY:sql.BigQueryReader,DATABRICKS:bricks.BricksReader,NEXTCLOUD:nextcloud.NextcloudReader},
|
||||
'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener},
|
||||
# 'cli':{CONSOLE:Console},'memory':{CONSOLE:Console},'http':session.HttpReader
|
||||
}
|
||||
WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter},
|
||||
'cloud':{BIGQUERY:sql.BigQueryWriter,DATABRICKS:bricks.BricksWriter,NEXTCLOUD:nextcloud.NextcloudWriter},
|
||||
'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener},
|
||||
# 'cli':{CONSOLE:Console},
|
||||
# 'memory':{CONSOLE:Console}, 'http':session.HttpReader
|
||||
|
||||
}
|
||||
# SQL_PROVIDERS = [POSTGRESQL,MYSQL,NETEZZA,MARIADB,SQLITE]
|
||||
PROVIDERS = {
|
||||
FILE:{'read':disk.DiskReader,'write':disk.DiskWriter},
|
||||
SQLITE:{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3},
|
||||
'sqlite3':{'read':disk.SQLiteReader,'write':disk.SQLiteWriter,'driver':sqlite3},
|
||||
|
||||
POSTGRESQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}},
|
||||
NETEZZA:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':nz,'default':{'port':5480}},
|
||||
REDSHIFT:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':pg,'default':{'host':'localhost','port':5432}},
|
||||
RABBITMQ:{'read':queue.QueueReader,'writer':queue.QueueWriter,'context':queue.QueueListener,'default':{'host':'localhost','port':5432}},
|
||||
|
||||
MYSQL:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
|
||||
MARIADB:{'read':sql.SQLReader,'write':sql.SQLWriter,'driver':my,'default':{'host':'localhost','port':3306}},
|
||||
|
||||
S3:{'read':s3.s3Reader,'write':s3.s3Writer},
|
||||
BIGQUERY:{'read':sql.BigQueryReader,'write':sql.BigQueryWriter},
|
||||
DATABRICKS:{'read':bricks.BricksReader,'write':bricks.BricksWriter},
|
||||
NEXTCLOUD:{'read':nextcloud.NextcloudReader,'write':nextcloud.NextcloudWriter},
|
||||
|
||||
QLISTENER:{'read':qlistener.qListener,'write':qlistener.qListener,'default':{'host':'localhost','port':5672}},
|
||||
CONSOLE:{'read':qlistener.Console,"write":qlistener.Console},
|
||||
HTTP:{'read':session.HttpReader,'write':session.HttpWriter},
|
||||
|
||||
MONGODB:{'read':mongo.MongoReader,'write':mongo.MongoWriter,'default':{'port':27017,'host':'localhost'}},
|
||||
COUCHDB:{'read':couch.CouchReader,'writer':couch.CouchWriter,'default':{'host':'localhost','port':5984}},
|
||||
ETL :{'read':etl.Transporter,'write':etl.Transporter}
|
||||
}
|
||||
DEFAULT = {PG:{'host':'localhost','port':5432},MYSQL:{'host':'localhost','port':3306}}
|
||||
DEFAULT[MONGODB] = {'port':27017,'host':'localhost'}
|
||||
DEFAULT[REDSHIFT] = DEFAULT[PG]
|
||||
DEFAULT[MARIADB] = DEFAULT[MYSQL]
|
||||
DEFAULT[NETEZZA] = {'port':5480}
|
@ -1,50 +0,0 @@
|
||||
"""
|
||||
This file is intended to aggregate all we can about the framework in terms of support
|
||||
"""
|
||||
|
||||
BIGQUERY='bigquery'
|
||||
|
||||
POSTGRESQL = 'postgresql'
|
||||
MONGODB = 'mongodb'
|
||||
HTTP='http'
|
||||
BIGQUERY ='bigquery'
|
||||
FILE = 'file'
|
||||
ETL = 'etl'
|
||||
|
||||
SQLITE = 'sqlite'
|
||||
SQLITE3= 'sqlite3'
|
||||
DUCKDB = 'duckdb'
|
||||
|
||||
REDSHIFT = 'redshift'
|
||||
NETEZZA = 'netezza'
|
||||
MYSQL = 'mysql'
|
||||
MARIADB= MYSQL
|
||||
|
||||
COUCHDB = 'couchdb'
|
||||
CONSOLE = 'console'
|
||||
ETL = 'etl'
|
||||
TRANSPORT = ETL
|
||||
NEXTCLOUD = 'nextcloud'
|
||||
S3 = 's3'
|
||||
CALLBACK = 'callback'
|
||||
CONSOLE = 'console'
|
||||
RABBITMQ = 'rabbitmq'
|
||||
DATABRICKS = 'databricks'
|
||||
MSSQL ='sqlserver'
|
||||
SQLSERVER ='sqlserver'
|
||||
|
||||
#
|
||||
# synonyms of the above
|
||||
BQ = BIGQUERY
|
||||
MONGO = MONGODB
|
||||
FERRETDB= MONGODB
|
||||
PG = POSTGRESQL
|
||||
PSQL = POSTGRESQL
|
||||
PGSQL = POSTGRESQL
|
||||
|
||||
AWS_S3 = 's3'
|
||||
RABBIT = RABBITMQ
|
||||
|
||||
|
||||
# QLISTENER = 'qlistener'
|
||||
|
@ -1,102 +0,0 @@
|
||||
import os
|
||||
import json
|
||||
from info import __version__
|
||||
import copy
|
||||
import transport
|
||||
|
||||
"""
|
||||
This class manages data from the registry and allows (read only)
|
||||
@TODO: add property to the DATA attribute
|
||||
"""
|
||||
|
||||
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
|
||||
#
|
||||
# This path can be overriden by an environment variable ...
|
||||
#
|
||||
if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
|
||||
REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH']
|
||||
REGISTRY_FILE= 'transport-registry.json'
|
||||
|
||||
DATA = {}
|
||||
|
||||
def isloaded ():
|
||||
return DATA not in [{},None]
|
||||
def exists (path=REGISTRY_PATH) :
|
||||
"""
|
||||
This function determines if there is a registry at all
|
||||
"""
|
||||
p = os.path.exists(path)
|
||||
q = os.path.exists( os.sep.join([path,REGISTRY_FILE]))
|
||||
|
||||
return p and q
|
||||
def load (_path=REGISTRY_PATH):
|
||||
global DATA
|
||||
|
||||
if exists(_path) :
|
||||
path = os.sep.join([_path,REGISTRY_FILE])
|
||||
f = open(path)
|
||||
DATA = json.loads(f.read())
|
||||
f.close()
|
||||
def init (email,path=REGISTRY_PATH,override=False):
|
||||
"""
|
||||
Initializing the registry and will raise an exception in the advent of an issue
|
||||
"""
|
||||
p = '@' in email
|
||||
q = False if '.' not in email else email.split('.')[-1] in ['edu','com','io','ai','org']
|
||||
if p and q :
|
||||
_config = {"email":email,'version':__version__}
|
||||
if not os.path.exists(path):
|
||||
os.makedirs(path)
|
||||
filename = os.sep.join([path,REGISTRY_FILE])
|
||||
if not os.path.exists(filename) or override == True :
|
||||
|
||||
f = open(filename,'w')
|
||||
f.write( json.dumps(_config))
|
||||
f.close()
|
||||
# _msg = f"""{CHECK_MARK} Successfully wrote configuration to {path} from {email}"""
|
||||
|
||||
else:
|
||||
raise Exception (f"""Unable to write configuration, Please check parameters (or help) and try again""")
|
||||
else:
|
||||
raise Exception (f"""Invalid Input, {email} is not well formatted, provide an email with adequate format""")
|
||||
def lookup (label):
|
||||
global DATA
|
||||
return label in DATA
|
||||
def get (label='default') :
|
||||
global DATA
|
||||
return copy.copy(DATA[label]) if label in DATA else {}
|
||||
|
||||
def set (label, auth_file, default=False,path=REGISTRY_PATH) :
|
||||
"""
|
||||
This function will add a label (auth-file data) into the registry and can set it as the default
|
||||
"""
|
||||
if label == 'default' :
|
||||
raise Exception ("""Invalid label name provided, please change the label name and use the switch""")
|
||||
reg_file = os.sep.join([path,REGISTRY_FILE])
|
||||
if os.path.exists (auth_file) and os.path.exists(path) and os.path.exists(reg_file):
|
||||
f = open(auth_file)
|
||||
_info = json.loads(f.read())
|
||||
f.close()
|
||||
f = open(reg_file)
|
||||
_config = json.loads(f.read())
|
||||
f.close()
|
||||
|
||||
#
|
||||
# set the proposed label
|
||||
_object = transport.factory.instance(**_info)
|
||||
if _object :
|
||||
_config[label] = _info
|
||||
if default :
|
||||
_config['default'] = _info
|
||||
#
|
||||
# now we need to write this to the location
|
||||
f = open(reg_file,'w')
|
||||
f.write(json.dumps(_config))
|
||||
f.close()
|
||||
else:
|
||||
raise Exception( f"""Unable to load file locate at {path},\nLearn how to generate auth-file with wizard found at https://healthcareio.the-phi.com/data-transport""")
|
||||
pass
|
||||
else:
|
||||
pass
|
||||
pass
|
||||
|
@ -0,0 +1,130 @@
|
||||
"""
|
||||
Data Transport - 1.0
|
||||
Steve L. Nyemba, The Phi Technology LLC
|
||||
|
||||
This file is a wrapper around s3 bucket provided by AWS for reading and writing content
|
||||
"""
|
||||
from datetime import datetime
|
||||
import boto
|
||||
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
|
||||
import numpy as np
|
||||
import botocore
|
||||
from smart_open import smart_open
|
||||
import sys
|
||||
if sys.version_info[0] > 2 :
|
||||
from transport.common import Reader, Writer
|
||||
else:
|
||||
from common import Reader, Writer
|
||||
import json
|
||||
from io import StringIO
|
||||
import json
|
||||
|
||||
class s3 :
|
||||
"""
|
||||
@TODO: Implement a search function for a file given a bucket??
|
||||
"""
|
||||
def __init__(self,**args) :
|
||||
"""
|
||||
This function will extract a file or set of files from s3 bucket provided
|
||||
@param access_key
|
||||
@param secret_key
|
||||
@param path location of the file
|
||||
@param filter filename or filtering elements
|
||||
"""
|
||||
try:
|
||||
self.s3 = S3Connection(args['access_key'],args['secret_key'],calling_format=OrdinaryCallingFormat())
|
||||
self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None
|
||||
# self.path = args['path']
|
||||
self.filter = args['filter'] if 'filter' in args else None
|
||||
self.filename = args['file'] if 'file' in args else None
|
||||
self.bucket_name = args['bucket'] if 'bucket' in args else None
|
||||
|
||||
except Exception as e :
|
||||
self.s3 = None
|
||||
self.bucket = None
|
||||
print (e)
|
||||
def meta(self,**args):
|
||||
"""
|
||||
:name name of the bucket
|
||||
"""
|
||||
info = self.list(**args)
|
||||
[item.open() for item in info]
|
||||
return [{"name":item.name,"size":item.size} for item in info]
|
||||
def list(self,**args):
|
||||
"""
|
||||
This function will list the content of a bucket, the bucket must be provided by the name
|
||||
:name name of the bucket
|
||||
"""
|
||||
return list(self.s3.get_bucket(args['name']).list())
|
||||
|
||||
|
||||
def buckets(self):
|
||||
#
|
||||
# This function will return all buckets, not sure why but it should be used cautiously
|
||||
# based on why the s3 infrastructure is used
|
||||
#
|
||||
return [item.name for item in self.s3.get_all_buckets()]
|
||||
|
||||
# def buckets(self):
|
||||
pass
|
||||
# """
|
||||
# This function is a wrapper around the bucket list of buckets for s3
|
||||
# """
|
||||
# return self.s3.get_all_buckets()
|
||||
|
||||
|
||||
class s3Reader(s3,Reader) :
|
||||
"""
|
||||
Because s3 contains buckets and files, reading becomes a tricky proposition :
|
||||
- list files if file is None
|
||||
- stream content if file is Not None
|
||||
@TODO: support read from all buckets, think about it
|
||||
"""
|
||||
def __init__(self,**args) :
|
||||
s3.__init__(self,**args)
|
||||
def files(self):
|
||||
r = []
|
||||
try:
|
||||
return [item.name for item in self.bucket if item.size > 0]
|
||||
except Exception as e:
|
||||
pass
|
||||
return r
|
||||
def stream(self,limit=-1):
|
||||
"""
|
||||
At this point we should stream a file from a given bucket
|
||||
"""
|
||||
key = self.bucket.get_key(self.filename.strip())
|
||||
if key is None :
|
||||
yield None
|
||||
else:
|
||||
count = 0
|
||||
with smart_open(key) as remote_file:
|
||||
for line in remote_file:
|
||||
if count == limit and limit > 0 :
|
||||
break
|
||||
yield line
|
||||
count += 1
|
||||
def read(self,**args) :
|
||||
if self.filename is None :
|
||||
#
|
||||
# returning the list of files because no one file was specified.
|
||||
return self.files()
|
||||
else:
|
||||
limit = args['size'] if 'size' in args else -1
|
||||
return self.stream(limit)
|
||||
|
||||
class s3Writer(s3,Writer) :
|
||||
|
||||
def __init__(self,**args) :
|
||||
s3.__init__(self,**args)
|
||||
def mkdir(self,name):
|
||||
"""
|
||||
This function will create a folder in a bucket
|
||||
:name name of the folder
|
||||
"""
|
||||
self.s3.put_object(Bucket=self.bucket_name,key=(name+'/'))
|
||||
def write(self,content):
|
||||
file = StringIO(content.decode("utf8"))
|
||||
self.s3.upload_fileobj(file,self.bucket_name,self.filename)
|
||||
pass
|
||||
|
@ -0,0 +1,507 @@
|
||||
"""
|
||||
This file is intended to perform read/writes against an SQL database such as PostgreSQL, Redshift, Mysql, MsSQL ...
|
||||
|
||||
LICENSE (MIT)
|
||||
Copyright 2016-2020, The Phi Technology LLC
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
|
||||
@TODO:
|
||||
- Migrate SQLite to SQL hierarchy
|
||||
- Include Write in Chunks from pandas
|
||||
"""
|
||||
import psycopg2 as pg
|
||||
import mysql.connector as my
|
||||
import sys
|
||||
|
||||
import sqlalchemy
|
||||
if sys.version_info[0] > 2 :
|
||||
from transport.common import Reader, Writer #, factory
|
||||
else:
|
||||
from common import Reader,Writer
|
||||
import json
|
||||
from google.oauth2 import service_account
|
||||
from google.cloud import bigquery as bq
|
||||
from multiprocessing import Lock, RLock
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import nzpy as nz #--- netezza drivers
|
||||
import sqlite3
|
||||
import copy
|
||||
import os
|
||||
|
||||
|
||||
class SQLRW :
|
||||
lock = RLock()
|
||||
MAX_CHUNK = 2000000
|
||||
DRIVERS = {"postgresql":pg,"redshift":pg,"mysql":my,"mariadb":my,"netezza":nz}
|
||||
REFERENCE = {
|
||||
"netezza":{"port":5480,"handler":nz,"dtype":"VARCHAR(512)"},
|
||||
"postgresql":{"port":5432,"handler":pg,"dtype":"VARCHAR"},
|
||||
"redshift":{"port":5432,"handler":pg,"dtype":"VARCHAR"},
|
||||
"mysql":{"port":3360,"handler":my,"dtype":"VARCHAR(256)"},
|
||||
"mariadb":{"port":3360,"handler":my,"dtype":"VARCHAR(256)"},
|
||||
}
|
||||
def __init__(self,**_args):
|
||||
|
||||
|
||||
_info = {}
|
||||
_info['dbname'] = _args['db'] if 'db' in _args else _args['database']
|
||||
self.table = _args['table'] if 'table' in _args else None
|
||||
self.fields = _args['fields'] if 'fields' in _args else []
|
||||
self.schema = _args['schema'] if 'schema' in _args else ''
|
||||
self._chunks = 1 if 'chunks' not in _args else int(_args['chunks'])
|
||||
|
||||
self._provider = _args['provider'] if 'provider' in _args else None
|
||||
# _info['host'] = 'localhost' if 'host' not in _args else _args['host']
|
||||
# _info['port'] = SQLWriter.REFERENCE[_provider]['port'] if 'port' not in _args else _args['port']
|
||||
|
||||
_info['host'] = _args['host'] if 'host' in _args else ''
|
||||
_info['port'] = _args['port'] if 'port' in _args else ''
|
||||
|
||||
# if 'host' in _args :
|
||||
# _info['host'] = 'localhost' if 'host' not in _args else _args['host']
|
||||
# # _info['port'] = SQLWriter.PROVIDERS[_args['provider']] if 'port' not in _args else _args['port']
|
||||
# _info['port'] = SQLWriter.REFERENCE[_provider]['port'] if 'port' not in _args else _args['port']
|
||||
self.lock = False if 'lock' not in _args else _args['lock']
|
||||
if 'username' in _args or 'user' in _args:
|
||||
key = 'username' if 'username' in _args else 'user'
|
||||
_info['user'] = _args[key]
|
||||
_info['password'] = _args['password'] if 'password' in _args else ''
|
||||
if 'auth_file' in _args :
|
||||
_auth = json.loads( open(_args['auth_file']).read() )
|
||||
key = 'username' if 'username' in _auth else 'user'
|
||||
_info['user'] = _auth[key]
|
||||
_info['password'] = _auth['password'] if 'password' in _auth else ''
|
||||
|
||||
_info['host'] = _auth['host'] if 'host' in _auth else _info['host']
|
||||
_info['port'] = _auth['port'] if 'port' in _auth else _info['port']
|
||||
if 'database' in _auth:
|
||||
_info['dbname'] = _auth['database']
|
||||
self.table = _auth['table'] if 'table' in _auth else self.table
|
||||
#
|
||||
# We need to load the drivers here to see what we are dealing with ...
|
||||
|
||||
|
||||
# _handler = SQLWriter.REFERENCE[_provider]['handler']
|
||||
_handler = _args['driver'] #-- handler to the driver
|
||||
self._dtype = _args['default']['type'] if 'default' in _args and 'type' in _args['default'] else 'VARCHAR(256)'
|
||||
# self._provider = _args['provider']
|
||||
# self._dtype = SQLWriter.REFERENCE[_provider]['dtype'] if 'dtype' not in _args else _args['dtype']
|
||||
# self._provider = _provider
|
||||
if _handler == nz :
|
||||
_info['database'] = _info['dbname']
|
||||
_info['securityLevel'] = 0
|
||||
del _info['dbname']
|
||||
if _handler == my :
|
||||
_info['database'] = _info['dbname']
|
||||
del _info['dbname']
|
||||
if _handler == sqlite3 :
|
||||
_info = {'path':_info['dbname'],'isolation_level':'IMMEDIATE'}
|
||||
if _handler != sqlite3 :
|
||||
self.conn = _handler.connect(**_info)
|
||||
else:
|
||||
self.conn = _handler.connect(_info['path'],isolation_level='IMMEDIATE')
|
||||
self._engine = _args['sqlalchemy'] if 'sqlalchemy' in _args else None
|
||||
def meta(self,**_args):
|
||||
schema = []
|
||||
try:
|
||||
if self._engine :
|
||||
table = _args['table'] if 'table' in _args else self.table
|
||||
if sqlalchemy.__version__.startswith('1.') :
|
||||
_m = sqlalchemy.MetaData(bind=self._engine)
|
||||
_m.reflect()
|
||||
else:
|
||||
|
||||
_m = sqlalchemy.MetaData()
|
||||
_m.reflect(bind=self._engine)
|
||||
schema = [{"name":_attr.name,"type":str(_attr.type)} for _attr in _m.tables[table].columns]
|
||||
#
|
||||
# Some house keeping work
|
||||
_m = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
|
||||
for _item in schema :
|
||||
if _item['type'] in _m :
|
||||
_item['type'] = _m[_item['type']]
|
||||
|
||||
except Exception as e:
|
||||
print (e)
|
||||
pass
|
||||
return schema
|
||||
def _tablename(self,name) :
|
||||
|
||||
return self.schema +'.'+name if self.schema not in [None, ''] and '.' not in name else name
|
||||
def has(self,**_args):
|
||||
return self.meta(**_args)
|
||||
# found = False
|
||||
# try:
|
||||
|
||||
# table = self._tablename(_args['table'])if 'table' in _args else self._tablename(self.table)
|
||||
# sql = "SELECT * FROM :table LIMIT 1".replace(":table",table)
|
||||
# if self._engine :
|
||||
# _conn = self._engine.connect()
|
||||
# else:
|
||||
# _conn = self.conn
|
||||
# found = pd.read_sql(sql,_conn).shape[0]
|
||||
# found = True
|
||||
|
||||
# except Exception as e:
|
||||
# print (e)
|
||||
# pass
|
||||
# finally:
|
||||
# if not self._engine :
|
||||
# _conn.close()
|
||||
# return found
|
||||
def isready(self):
|
||||
_sql = "SELECT * FROM :table LIMIT 1".replace(":table",self.table)
|
||||
try:
|
||||
_conn = self.conn if not hasattr(self,'_engine') else self._engine
|
||||
return pd.read_sql(_sql,_conn).columns.tolist()
|
||||
except Exception as e:
|
||||
pass
|
||||
return False
|
||||
def apply(self,_sql):
|
||||
"""
|
||||
This function applies a command and/or a query against the current relational data-store
|
||||
:param _sql insert/select statement
|
||||
@TODO: Store procedure calls
|
||||
"""
|
||||
#
|
||||
_out = None
|
||||
try:
|
||||
if _sql.lower().startswith('select') :
|
||||
|
||||
_conn = self._engine if self._engine else self.conn
|
||||
return pd.read_sql(_sql,_conn)
|
||||
else:
|
||||
# Executing a command i.e no expected return values ...
|
||||
cursor = self.conn.cursor()
|
||||
cursor.execute(_sql)
|
||||
self.conn.commit()
|
||||
except Exception as e :
|
||||
print (e)
|
||||
finally:
|
||||
if not self._engine :
|
||||
self.conn.commit()
|
||||
# cursor.close()
|
||||
def close(self):
|
||||
try:
|
||||
self.conn.close()
|
||||
except Exception as error :
|
||||
print (error)
|
||||
pass
|
||||
class SQLReader(SQLRW,Reader) :
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
|
||||
def read(self,**_args):
|
||||
if 'sql' in _args :
|
||||
_sql = (_args['sql'])
|
||||
else:
|
||||
if 'table' in _args :
|
||||
table = _args['table']
|
||||
else:
|
||||
table = self.table
|
||||
# table = self.table if self.table is not None else _args['table']
|
||||
_sql = "SELECT :fields FROM "+self._tablename(table)
|
||||
if 'filter' in _args :
|
||||
_sql = _sql +" WHERE "+_args['filter']
|
||||
if 'fields' in _args :
|
||||
_fields = _args['fields']
|
||||
else:
|
||||
_fields = '*' if not self.fields else ",".join(self.fields)
|
||||
_sql = _sql.replace(":fields",_fields)
|
||||
#
|
||||
# At this point we have a query we can execute gracefully
|
||||
if 'limit' in _args :
|
||||
_sql = _sql + " LIMIT "+str(_args['limit'])
|
||||
#
|
||||
# @TODO:
|
||||
# It is here that we should inspect to see if there are any pre/post conditions
|
||||
#
|
||||
return self.apply(_sql)
|
||||
def close(self) :
|
||||
try:
|
||||
self.conn.close()
|
||||
except Exception as error :
|
||||
print (error)
|
||||
pass
|
||||
|
||||
class SQLWriter(SQLRW,Writer):
|
||||
def __init__(self,**_args) :
|
||||
super().__init__(**_args)
|
||||
#
|
||||
# In the advent that data typing is difficult to determine we can inspect and perform a default case
|
||||
# This slows down the process but improves reliability of the data
|
||||
# NOTE: Proper data type should be set on the target system if their source is unclear.
|
||||
|
||||
self._cast = False if 'cast' not in _args else _args['cast']
|
||||
|
||||
def init(self,fields=None):
|
||||
# if not fields :
|
||||
# try:
|
||||
# table = self._tablename(self.table)
|
||||
# self.fields = pd.read_sql_query("SELECT * FROM :table LIMIT 1".replace(":table",table),self.conn).columns.tolist()
|
||||
# except Exception as e:
|
||||
# pass
|
||||
# finally:
|
||||
# pass
|
||||
# else:
|
||||
self.fields = fields;
|
||||
|
||||
def make(self,**_args):
|
||||
table = self._tablename(self.table) if 'table' not in _args else self._tablename(_args['table'])
|
||||
if 'fields' in _args :
|
||||
fields = _args['fields']
|
||||
# table = self._tablename(self.table)
|
||||
sql = " ".join(["CREATE TABLE",table," (", ",".join([ name +' '+ self._dtype for name in fields]),")"])
|
||||
|
||||
else:
|
||||
schema = _args['schema'] if 'schema' in _args else []
|
||||
|
||||
_map = _args['map'] if 'map' in _args else {}
|
||||
sql = [] # ["CREATE TABLE ",_args['table'],"("]
|
||||
for _item in schema :
|
||||
_type = _item['type']
|
||||
if _type in _map :
|
||||
_type = _map[_type]
|
||||
sql = sql + [" " .join([_item['name'], ' ',_type])]
|
||||
sql = ",".join(sql)
|
||||
# table = self._tablename(_args['table'])
|
||||
sql = ["CREATE TABLE ",table,"( ",sql," )"]
|
||||
sql = " ".join(sql)
|
||||
|
||||
cursor = self.conn.cursor()
|
||||
try:
|
||||
|
||||
cursor.execute(sql)
|
||||
except Exception as e :
|
||||
print (e)
|
||||
# print (sql)
|
||||
pass
|
||||
finally:
|
||||
# cursor.close()
|
||||
self.conn.commit()
|
||||
pass
|
||||
def write(self,info,**_args):
|
||||
"""
|
||||
:param info writes a list of data to a given set of fields
|
||||
"""
|
||||
# inspect = False if 'inspect' not in _args else _args['inspect']
|
||||
# cast = False if 'cast' not in _args else _args['cast']
|
||||
# if not self.fields :
|
||||
# if type(info) == list :
|
||||
# _fields = info[0].keys()
|
||||
# elif type(info) == dict :
|
||||
# _fields = info.keys()
|
||||
# elif type(info) == pd.DataFrame :
|
||||
# _fields = info.columns.tolist()
|
||||
|
||||
# # _fields = info.keys() if type(info) == dict else info[0].keys()
|
||||
# # _fields = list (_fields)
|
||||
# self.init(_fields)
|
||||
|
||||
try:
|
||||
table = _args['table'] if 'table' in _args else self.table
|
||||
#
|
||||
# In SQL, schema can stand for namespace or the structure of a table
|
||||
# In case we have a list, we are likely dealing with table structure
|
||||
#
|
||||
if 'schema' in _args :
|
||||
if type(_args['schema']) == str :
|
||||
self.schema = _args['schema'] if 'schema' in _args else self.schema
|
||||
elif type(_args['schema']) == list and len(_args['schema']) > 0 and not self.has(table=table):
|
||||
#
|
||||
# There is a messed up case when an empty array is passed (no table should be created)
|
||||
#
|
||||
self.make(table=table,schema=_args['schema'])
|
||||
pass
|
||||
# self.schema = _args['schema'] if 'schema' in _args else self.schema
|
||||
table = self._tablename(table)
|
||||
|
||||
_sql = "INSERT INTO :table (:fields) VALUES (:values)".replace(":table",table) #.replace(":table",self.table).replace(":fields",_fields)
|
||||
|
||||
if type(info) == list :
|
||||
_info = pd.DataFrame(info)
|
||||
elif type(info) == dict :
|
||||
_info = pd.DataFrame([info])
|
||||
else:
|
||||
_info = pd.DataFrame(info)
|
||||
|
||||
|
||||
if _info.shape[0] == 0 :
|
||||
|
||||
return
|
||||
if self.lock :
|
||||
SQLRW.lock.acquire()
|
||||
#
|
||||
# we will adjust the chunks here in case we are not always sure of the
|
||||
if self._chunks == 1 and _info.shape[0] > SQLRW.MAX_CHUNK :
|
||||
self._chunks = 10
|
||||
_indexes = np.array_split(np.arange(_info.shape[0]),self._chunks)
|
||||
for i in _indexes :
|
||||
#
|
||||
# In case we have an invalid chunk ...
|
||||
if _info.iloc[i].shape[0] == 0 :
|
||||
continue
|
||||
#
|
||||
# We are enabling writing by chunks/batches because some persistent layers have quotas or limitations on volume of data
|
||||
|
||||
if self._engine is not None:
|
||||
# pd.to_sql(_info,self._engine)
|
||||
if self.schema in ['',None] :
|
||||
rows = _info.iloc[i].to_sql(table,self._engine,if_exists='append',index=False)
|
||||
else:
|
||||
#
|
||||
# Writing with schema information ...
|
||||
rows = _info.iloc[i].to_sql(self.table,self._engine,schema=self.schema,if_exists='append',index=False)
|
||||
|
||||
else:
|
||||
_fields = ",".join(self.fields)
|
||||
_sql = _sql.replace(":fields",_fields)
|
||||
values = ", ".join("?"*len(self.fields)) if self._provider == 'netezza' else ",".join(["%s" for name in self.fields])
|
||||
_sql = _sql.replace(":values",values)
|
||||
cursor = self.conn.cursor()
|
||||
cursor.executemany(_sql,_info.iloc[i].values.tolist())
|
||||
cursor.close()
|
||||
# cursor.commit()
|
||||
|
||||
# self.conn.commit()
|
||||
except Exception as e:
|
||||
print(e)
|
||||
pass
|
||||
finally:
|
||||
|
||||
if self._engine is None :
|
||||
self.conn.commit()
|
||||
if self.lock :
|
||||
SQLRW.lock.release()
|
||||
# cursor.close()
|
||||
pass
|
||||
def close(self):
|
||||
try:
|
||||
self.conn.close()
|
||||
finally:
|
||||
pass
|
||||
class BigQuery:
|
||||
def __init__(self,**_args):
|
||||
path = _args['service_key'] if 'service_key' in _args else _args['private_key']
|
||||
self.credentials = service_account.Credentials.from_service_account_file(path)
|
||||
self.dataset = _args['dataset'] if 'dataset' in _args else None
|
||||
self.path = path
|
||||
self.dtypes = _args['dtypes'] if 'dtypes' in _args else None
|
||||
self.table = _args['table'] if 'table' in _args else None
|
||||
self.client = bq.Client.from_service_account_json(self.path)
|
||||
def meta(self,**_args):
|
||||
"""
|
||||
This function returns meta data for a given table or query with dataset/table properly formatted
|
||||
:param table name of the name WITHOUT including dataset
|
||||
:param sql sql query to be pulled,
|
||||
"""
|
||||
table = _args['table'] if 'table' in _args else self.table
|
||||
|
||||
try:
|
||||
if table :
|
||||
_dataset = self.dataset if 'dataset' not in _args else _args['dataset']
|
||||
sql = f"""SELECT column_name as name, data_type as type FROM {_dataset}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{table}' """
|
||||
_info = {'credentials':self.credentials,'dialect':'standard'}
|
||||
return pd.read_gbq(sql,**_info).to_dict(orient='records')
|
||||
# return self.read(sql=sql).to_dict(orient='records')
|
||||
# ref = self.client.dataset(self.dataset).table(table)
|
||||
|
||||
# _schema = self.client.get_table(ref).schema
|
||||
# return [{"name":_item.name,"type":_item.field_type,"description":( "" if not hasattr(_item,"description") else _item.description )} for _item in _schema]
|
||||
else :
|
||||
return []
|
||||
except Exception as e:
|
||||
|
||||
return []
|
||||
def has(self,**_args):
|
||||
found = False
|
||||
try:
|
||||
_has = self.meta(**_args)
|
||||
found = _has is not None and len(_has) > 0
|
||||
except Exception as e:
|
||||
pass
|
||||
return found
|
||||
class BQReader(BigQuery,Reader) :
|
||||
def __init__(self,**_args):
|
||||
|
||||
super().__init__(**_args)
|
||||
def apply(self,sql):
|
||||
return self.read(sql=sql)
|
||||
|
||||
def read(self,**_args):
|
||||
SQL = None
|
||||
table = self.table if 'table' not in _args else _args['table']
|
||||
if 'sql' in _args :
|
||||
SQL = _args['sql']
|
||||
elif table:
|
||||
|
||||
table = "".join(["`",table,"`"]) if '.' in table else "".join(["`:dataset.",table,"`"])
|
||||
SQL = "SELECT * FROM :table ".replace(":table",table)
|
||||
if not SQL :
|
||||
return None
|
||||
if SQL and 'limit' in _args:
|
||||
SQL += " LIMIT "+str(_args['limit'])
|
||||
if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset:
|
||||
SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset)
|
||||
_info = {'credentials':self.credentials,'dialect':'standard'}
|
||||
return pd.read_gbq(SQL,**_info) if SQL else None
|
||||
# return self.client.query(SQL).to_dataframe() if SQL else None
|
||||
|
||||
|
||||
class BQWriter(BigQuery,Writer):
|
||||
lock = Lock()
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
|
||||
self.parallel = False if 'lock' not in _args else _args['lock']
|
||||
self.table = _args['table'] if 'table' in _args else None
|
||||
self.mode = {'if_exists':'append','chunksize':900000,'destination_table':self.table,'credentials':self.credentials}
|
||||
self._chunks = 1 if 'chunks' not in _args else int(_args['chunks'])
|
||||
|
||||
def write(self,_info,**_args) :
|
||||
try:
|
||||
if self.parallel or 'lock' in _args :
|
||||
BQWriter.lock.acquire()
|
||||
_args['table'] = self.table if 'table' not in _args else _args['table']
|
||||
self._write(_info,**_args)
|
||||
finally:
|
||||
if self.parallel:
|
||||
BQWriter.lock.release()
|
||||
def _write(self,_info,**_args) :
|
||||
_df = None
|
||||
if type(_info) in [list,pd.DataFrame] :
|
||||
if type(_info) == list :
|
||||
_df = pd.DataFrame(_info)
|
||||
elif type(_info) == pd.DataFrame :
|
||||
_df = _info
|
||||
|
||||
if '.' not in _args['table'] :
|
||||
self.mode['destination_table'] = '.'.join([self.dataset,_args['table']])
|
||||
else:
|
||||
|
||||
self.mode['destination_table'] = _args['table'].strip()
|
||||
if 'schema' in _args :
|
||||
self.mode['table_schema'] = _args['schema']
|
||||
#
|
||||
# Let us insure that the types are somewhat compatible ...
|
||||
# _map = {'INTEGER':np.int64,'DATETIME':'datetime64[ns]','TIMESTAMP':'datetime64[ns]','FLOAT':np.float64,'DOUBLE':np.float64,'STRING':str}
|
||||
# _mode = copy.deepcopy(self.mode)
|
||||
_mode = self.mode
|
||||
# _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
|
||||
#
|
||||
# Let us adjust the chunking here
|
||||
self._chunkks = 10 if _df.shape[0] > SQLRW.MAX_CHUNK and self._chunks == 1 else self._chunks
|
||||
_indexes = np.array_split(np.arange(_df.shape[0]),self._chunks)
|
||||
for i in _indexes :
|
||||
_df.iloc[i].to_gbq(**self.mode)
|
||||
pass
|
||||
#
|
||||
# Aliasing the big query classes allowing it to be backward compatible
|
||||
#
|
||||
BigQueryReader = BQReader
|
||||
BigQueryWriter = BQWriter
|
@ -1,18 +0,0 @@
|
||||
"""
|
||||
This namespace/package wrap the sql functionalities for a certain data-stores
|
||||
- netezza, postgresql, mysql and sqlite
|
||||
- mariadb, redshift (also included)
|
||||
"""
|
||||
from . import postgresql, mysql, netezza, sqlite, sqlserver, duckdb
|
||||
|
||||
|
||||
#
|
||||
# Creating aliases for support of additional data-store providerss
|
||||
#
|
||||
mariadb = mysql
|
||||
redshift = postgresql
|
||||
sqlite3 = sqlite
|
||||
|
||||
|
||||
# from transport import sql
|
||||
|
@ -1,139 +0,0 @@
|
||||
"""
|
||||
This file encapsulates common operations associated with SQL databases via SQLAlchemy
|
||||
|
||||
"""
|
||||
import sqlalchemy as sqa
|
||||
from sqlalchemy import text
|
||||
|
||||
import pandas as pd
|
||||
|
||||
class Base:
|
||||
def __init__(self,**_args):
|
||||
self._host = _args['host'] if 'host' in _args else 'localhost'
|
||||
self._port = None
|
||||
self._database = _args['database']
|
||||
self._table = _args['table'] if 'table' in _args else None
|
||||
self._engine= sqa.create_engine(self._get_uri(**_args),future=True)
|
||||
def _set_uri(self,**_args) :
|
||||
"""
|
||||
:provider provider
|
||||
:host host and port
|
||||
:account account user/pwd
|
||||
"""
|
||||
_account = _args['account'] if 'account' in _args else None
|
||||
_host = _args['host']
|
||||
_provider = _args['provider'].replace(':','').replace('/','').strip()
|
||||
def _get_uri(self,**_args):
|
||||
"""
|
||||
This function will return the formatted uri for the sqlAlchemy engine
|
||||
"""
|
||||
raise Exception ("Function Needs to be implemented ")
|
||||
def meta (self,**_args):
|
||||
"""
|
||||
This function returns the schema (table definition) of a given table
|
||||
:table optional name of the table (can be fully qualified)
|
||||
"""
|
||||
_table = self._table if 'table' not in _args else _args['table']
|
||||
_schema = []
|
||||
if _table :
|
||||
if sqa.__version__.startswith('1.') :
|
||||
_handler = sqa.MetaData(bind=self._engine)
|
||||
_handler.reflect()
|
||||
else:
|
||||
#
|
||||
# sqlalchemy's version 2.+
|
||||
_handler = sqa.MetaData()
|
||||
_handler.reflect(bind=self._engine)
|
||||
#
|
||||
# Let us extract the schema with the native types
|
||||
_map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
|
||||
_schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns]
|
||||
return _schema
|
||||
def has(self,**_args):
|
||||
return self.meta(**_args)
|
||||
def apply(self,sql):
|
||||
"""
|
||||
Executing sql statement that returns query results (hence the restriction on sql and/or with)
|
||||
:sql SQL query to be exectued
|
||||
|
||||
@TODO: Execution of stored procedures
|
||||
"""
|
||||
if sql.lower().startswith('select') or sql.lower().startswith('with') :
|
||||
|
||||
return pd.read_sql(sql,self._engine)
|
||||
else:
|
||||
_handler = self._engine.connect()
|
||||
_handler.execute(text(sql))
|
||||
_handler.commit ()
|
||||
_handler.close()
|
||||
return None
|
||||
|
||||
class SQLBase(Base):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
def get_provider(self):
|
||||
raise Exception ("Provider Needs to be set ...")
|
||||
def get_default_port(self) :
|
||||
raise Exception ("default port needs to be set")
|
||||
|
||||
def _get_uri(self,**_args):
|
||||
_host = self._host
|
||||
_account = ''
|
||||
if self._port :
|
||||
_port = self._port
|
||||
else:
|
||||
_port = self.get_default_port()
|
||||
|
||||
_host = f'{_host}:{_port}'
|
||||
|
||||
if 'username' in _args :
|
||||
_account = ''.join([_args['username'],':',_args['password'],'@'])
|
||||
_database = self._database
|
||||
_provider = self.get_provider().replace(':','').replace('/','')
|
||||
# _uri = [f'{_provider}:/',_account,_host,_database]
|
||||
# _uri = [_item.strip() for _item in _uri if _item.strip()]
|
||||
# return '/'.join(_uri)
|
||||
return f'{_provider}://{_host}/{_database}' if _account == '' else f'{_provider}://{_account}{_host}/{_database}'
|
||||
|
||||
class BaseReader(SQLBase):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
def read(self,**_args):
|
||||
"""
|
||||
This function will read a query or table from the specific database
|
||||
"""
|
||||
if 'sql' in _args :
|
||||
sql = _args['sql']
|
||||
else:
|
||||
_table = _args['table'] if 'table' in _args else self._table
|
||||
sql = f'SELECT * FROM {_table}'
|
||||
return self.apply(sql)
|
||||
|
||||
|
||||
class BaseWriter (SQLBase):
|
||||
"""
|
||||
This class implements SQLAlchemy support for Writting to a data-store (RDBMS)
|
||||
"""
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
def write(self,_data,**_args):
|
||||
if type(_data) == dict :
|
||||
_df = pd.DataFrame(_data)
|
||||
elif type(_data) == list :
|
||||
_df = pd.DataFrame(_data)
|
||||
else:
|
||||
_df = _data.copy()
|
||||
#
|
||||
# We are assuming we have a data-frame at this point
|
||||
#
|
||||
_table = _args['table'] if 'table' in _args else self._table
|
||||
_mode = {'chunksize':2000000,'if_exists':'append','index':False}
|
||||
for key in ['if_exists','index','chunksize'] :
|
||||
if key in _args :
|
||||
_mode[key] = _args[key]
|
||||
# if 'schema' in _args :
|
||||
# _mode['schema'] = _args['schema']
|
||||
# if 'if_exists' in _args :
|
||||
# _mode['if_exists'] = _args['if_exists']
|
||||
|
||||
_df.to_sql(_table,self._engine,**_mode)
|
@ -1,24 +0,0 @@
|
||||
"""
|
||||
This module implements the handler for duckdb (in memory or not)
|
||||
"""
|
||||
from transport.sql.common import Base, BaseReader, BaseWriter
|
||||
|
||||
class Duck :
|
||||
def __init__(self,**_args):
|
||||
#
|
||||
# duckdb with none as database will operate as an in-memory database
|
||||
#
|
||||
self.database = _args['database'] if 'database' in _args else ''
|
||||
def get_provider(self):
|
||||
return "duckdb"
|
||||
|
||||
def _get_uri(self,**_args):
|
||||
return f"""duckdb:///{self.database}"""
|
||||
class Reader(Duck,BaseReader) :
|
||||
def __init__(self,**_args):
|
||||
Duck.__init__(self,**_args)
|
||||
BaseReader.__init__(self,**_args)
|
||||
class Writer(Duck,BaseWriter):
|
||||
def __init__(self,**_args):
|
||||
Duck.__init__(self,**_args)
|
||||
BaseWriter.__init__(self,**_args)
|
@ -1,18 +0,0 @@
|
||||
"""
|
||||
This file implements support for mysql and maria db (with drivers mysql+mysql)
|
||||
"""
|
||||
from transport.sql.common import BaseReader, BaseWriter
|
||||
# import mysql.connector as my
|
||||
class MYSQL:
|
||||
|
||||
def get_provider(self):
|
||||
return "mysql+mysqlconnector"
|
||||
def get_default_port(self):
|
||||
return "3306"
|
||||
class Reader(MYSQL,BaseReader) :
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
|
||||
class Writer(MYSQL,BaseWriter) :
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
@ -1,15 +0,0 @@
|
||||
import nzpy as nz
|
||||
from transport.sql.common import BaseReader, BaseWriter
|
||||
|
||||
class Netezza:
|
||||
def get_provider(self):
|
||||
return 'netezza+nzpy'
|
||||
def get_default_port(self):
|
||||
return '5480'
|
||||
|
||||
class Reader(Netezza,BaseReader) :
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
class Writer(Netezza,BaseWriter):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
@ -1,22 +0,0 @@
|
||||
|
||||
from transport.sql.common import BaseReader , BaseWriter
|
||||
from psycopg2.extensions import register_adapter, AsIs
|
||||
import numpy as np
|
||||
|
||||
register_adapter(np.int64, AsIs)
|
||||
|
||||
class PG:
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
def get_provider(self):
|
||||
return "postgresql"
|
||||
|
||||
def get_default_port(self):
|
||||
return "5432"
|
||||
class Reader(PG,BaseReader) :
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
class Writer(PG,BaseWriter):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
|
@ -1,25 +0,0 @@
|
||||
import sqlalchemy
|
||||
import pandas as pd
|
||||
from transport.sql.common import Base, BaseReader, BaseWriter
|
||||
class SQLite (BaseReader):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
if 'path' in _args :
|
||||
self._database = _args['path']
|
||||
if 'database' in _args :
|
||||
self._database = _args['database']
|
||||
def _get_uri(self,**_args):
|
||||
path = self._database
|
||||
return f'sqlite:///{path}' # ensure this is the correct path for the sqlite file.
|
||||
|
||||
class Reader(SQLite,BaseReader):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
# def read(self,**_args):
|
||||
# sql = _args['sql']
|
||||
# return pd.read_sql(sql,self._engine)
|
||||
|
||||
|
||||
class Writer (SQLite,BaseWriter):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
@ -1,24 +0,0 @@
|
||||
"""
|
||||
Handling Microsoft SQL Server via pymssql driver/connector
|
||||
"""
|
||||
import sqlalchemy
|
||||
import pandas as pd
|
||||
from transport.sql.common import Base, BaseReader, BaseWriter
|
||||
|
||||
|
||||
class MsSQLServer:
|
||||
def __init__(self,**_args) :
|
||||
super().__init__(**_args)
|
||||
pass
|
||||
def get_provider(self):
|
||||
# mssql+pymssql://scott:tiger@hostname:port/dbname"
|
||||
return "mssql+pymssql"
|
||||
def get_default_port(self):
|
||||
return "1433"
|
||||
class Reader (MsSQLServer,BaseReader):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
|
||||
class Writer (MsSQLServer,BaseWriter):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
@ -0,0 +1,2 @@
|
||||
__author__ = 'The Phi Technology'
|
||||
__version__= '1.9.2'
|
@ -0,0 +1 @@
|
||||
transport/version.py
|
Loading…
Reference in new issue