Compare commits
No commits in common. 'master' and 'v2.0' have entirely different histories.
@ -1,2 +0,0 @@
|
||||
cd /D "%~dp0"
|
||||
python transport %1 %2 %3 %4 %5 %6
|
||||
@ -1,188 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Extract Transform Load (ETL) from Code\n",
|
||||
"\n",
|
||||
"The example below reads data from an http source (github) and will copy the data to a csv file and to a database. This example illustrates the one-to-many ETL features.\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"data": {
|
||||
"text/html": [
|
||||
"<div>\n",
|
||||
"<style scoped>\n",
|
||||
" .dataframe tbody tr th:only-of-type {\n",
|
||||
" vertical-align: middle;\n",
|
||||
" }\n",
|
||||
"\n",
|
||||
" .dataframe tbody tr th {\n",
|
||||
" vertical-align: top;\n",
|
||||
" }\n",
|
||||
"\n",
|
||||
" .dataframe thead th {\n",
|
||||
" text-align: right;\n",
|
||||
" }\n",
|
||||
"</style>\n",
|
||||
"<table border=\"1\" class=\"dataframe\">\n",
|
||||
" <thead>\n",
|
||||
" <tr style=\"text-align: right;\">\n",
|
||||
" <th></th>\n",
|
||||
" <th>id</th>\n",
|
||||
" <th>location_id</th>\n",
|
||||
" <th>address_1</th>\n",
|
||||
" <th>address_2</th>\n",
|
||||
" <th>city</th>\n",
|
||||
" <th>state_province</th>\n",
|
||||
" <th>postal_code</th>\n",
|
||||
" <th>country</th>\n",
|
||||
" </tr>\n",
|
||||
" </thead>\n",
|
||||
" <tbody>\n",
|
||||
" <tr>\n",
|
||||
" <th>0</th>\n",
|
||||
" <td>1</td>\n",
|
||||
" <td>1</td>\n",
|
||||
" <td>2600 Middlefield Road</td>\n",
|
||||
" <td>NaN</td>\n",
|
||||
" <td>Redwood City</td>\n",
|
||||
" <td>CA</td>\n",
|
||||
" <td>94063</td>\n",
|
||||
" <td>US</td>\n",
|
||||
" </tr>\n",
|
||||
" <tr>\n",
|
||||
" <th>1</th>\n",
|
||||
" <td>2</td>\n",
|
||||
" <td>2</td>\n",
|
||||
" <td>24 Second Avenue</td>\n",
|
||||
" <td>NaN</td>\n",
|
||||
" <td>San Mateo</td>\n",
|
||||
" <td>CA</td>\n",
|
||||
" <td>94401</td>\n",
|
||||
" <td>US</td>\n",
|
||||
" </tr>\n",
|
||||
" <tr>\n",
|
||||
" <th>2</th>\n",
|
||||
" <td>3</td>\n",
|
||||
" <td>3</td>\n",
|
||||
" <td>24 Second Avenue</td>\n",
|
||||
" <td>NaN</td>\n",
|
||||
" <td>San Mateo</td>\n",
|
||||
" <td>CA</td>\n",
|
||||
" <td>94403</td>\n",
|
||||
" <td>US</td>\n",
|
||||
" </tr>\n",
|
||||
" <tr>\n",
|
||||
" <th>3</th>\n",
|
||||
" <td>4</td>\n",
|
||||
" <td>4</td>\n",
|
||||
" <td>24 Second Avenue</td>\n",
|
||||
" <td>NaN</td>\n",
|
||||
" <td>San Mateo</td>\n",
|
||||
" <td>CA</td>\n",
|
||||
" <td>94401</td>\n",
|
||||
" <td>US</td>\n",
|
||||
" </tr>\n",
|
||||
" <tr>\n",
|
||||
" <th>4</th>\n",
|
||||
" <td>5</td>\n",
|
||||
" <td>5</td>\n",
|
||||
" <td>24 Second Avenue</td>\n",
|
||||
" <td>NaN</td>\n",
|
||||
" <td>San Mateo</td>\n",
|
||||
" <td>CA</td>\n",
|
||||
" <td>94401</td>\n",
|
||||
" <td>US</td>\n",
|
||||
" </tr>\n",
|
||||
" </tbody>\n",
|
||||
"</table>\n",
|
||||
"</div>"
|
||||
],
|
||||
"text/plain": [
|
||||
" id location_id address_1 address_2 city \\\n",
|
||||
"0 1 1 2600 Middlefield Road NaN Redwood City \n",
|
||||
"1 2 2 24 Second Avenue NaN San Mateo \n",
|
||||
"2 3 3 24 Second Avenue NaN San Mateo \n",
|
||||
"3 4 4 24 Second Avenue NaN San Mateo \n",
|
||||
"4 5 5 24 Second Avenue NaN San Mateo \n",
|
||||
"\n",
|
||||
" state_province postal_code country \n",
|
||||
"0 CA 94063 US \n",
|
||||
"1 CA 94401 US \n",
|
||||
"2 CA 94403 US \n",
|
||||
"3 CA 94401 US \n",
|
||||
"4 CA 94401 US "
|
||||
]
|
||||
},
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to Google Bigquery database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"import os\n",
|
||||
"\n",
|
||||
"#\n",
|
||||
"#\n",
|
||||
"source = {\"provider\": \"http\", \"url\": \"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv\"}\n",
|
||||
"target = [{\"provider\": \"files\", \"path\": \"addresses.csv\", \"delimiter\": \",\"}, {\"provider\": \"sqlite\", \"database\": \"sample.db3\", \"table\": \"addresses\"}]\n",
|
||||
"\n",
|
||||
"_handler = transport.get.etl (source=source,target=target)\n",
|
||||
"_data = _handler.read() #-- all etl begins with data being read\n",
|
||||
"_data.head()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Extract Transform Load (ETL) from CLI\n",
|
||||
"\n",
|
||||
"The documentation for this is available at https://healthcareio.the-phi.com/data-transport \"Docs\" -> \"Terminal CLI\"\n",
|
||||
"\n",
|
||||
"The entire process is documented including how to generate an ETL configuration file."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
||||
@ -1,138 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Writing to Apache Iceberg\n",
|
||||
"\n",
|
||||
"1. Insure you have a Google Bigquery service account key on disk\n",
|
||||
"2. The service key location is set as an environment variable **BQ_KEY**\n",
|
||||
"3. The dataset will be automatically created within the project associated with the service key\n",
|
||||
"\n",
|
||||
"The cell below creates a dataframe that will be stored within Google Bigquery"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 15,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"['data transport version ', '2.4.0']\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to Google Bigquery database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"import os\n",
|
||||
"\n",
|
||||
"PRIVATE_KEY = os.environ['BQ_KEY'] #-- location of the service key\n",
|
||||
"DATASET = 'demo'\n",
|
||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||
"# bqw = transport.get.writer(provider=providers.ICEBERG,catalog='mz',database='edw.mz',table='friends')\n",
|
||||
"bqw = transport.get.writer(provider=providers.ICEBERG,table='edw.mz.friends')\n",
|
||||
"bqw.write(_data,if_exists='replace') #-- default is append\n",
|
||||
"print (['data transport version ', transport.__version__])\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Reading from Google Bigquery\n",
|
||||
"\n",
|
||||
"The cell below reads the data that has been written by the cell above and computes the average age within a Google Bigquery (simple query). \n",
|
||||
"\n",
|
||||
"- Basic read of the designated table (friends) created above\n",
|
||||
"- Execute an aggregate SQL against the table\n",
|
||||
"\n",
|
||||
"**NOTE**\n",
|
||||
"\n",
|
||||
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
|
||||
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 14,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
" name age\n",
|
||||
"0 James Bond 55\n",
|
||||
"1 Steve Rogers 150\n",
|
||||
"2 Steve Nyemba 44\n",
|
||||
"--------- STATISTICS ------------\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import os\n",
|
||||
"PRIVATE_KEY=os.environ['BQ_KEY']\n",
|
||||
"pgr = transport.get.reader(provider=providers.ICEBERG,database='edw.mz')\n",
|
||||
"_df = pgr.read(table='friends')\n",
|
||||
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
|
||||
"_sdf = pgr.read(sql=_query)\n",
|
||||
"print (_df)\n",
|
||||
"print ('--------- STATISTICS ------------')\n",
|
||||
"# print (_sdf)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"An **auth-file** is a file that contains database parameters used to access the database. \n",
|
||||
"For code in shared environments, we recommend \n",
|
||||
"\n",
|
||||
"1. Having the **auth-file** stored on disk \n",
|
||||
"2. and the location of the file is set to an environment variable.\n",
|
||||
"\n",
|
||||
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
||||
@ -1,149 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Writing data-transport plugins\n",
|
||||
"\n",
|
||||
"The data-transport plugins are designed to automate pre/post processing i.e\n",
|
||||
"\n",
|
||||
" - Read -> Post processing\n",
|
||||
" - Write-> Pre processing\n",
|
||||
" \n",
|
||||
"In this example we will assume, data and write both pre/post processing to any supported infrastructure. We will equally show how to specify the plugins within a configuration file"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to Google Bigquery database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"import os\n",
|
||||
"import shutil\n",
|
||||
"#\n",
|
||||
"#\n",
|
||||
"\n",
|
||||
"DATABASE = '/home/steve/tmp/demo.db3'\n",
|
||||
"if os.path.exists(DATABASE) :\n",
|
||||
" os.remove(DATABASE)\n",
|
||||
"#\n",
|
||||
"# \n",
|
||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||
"litew = transport.get.writer(provider=providers.SQLITE,database=DATABASE)\n",
|
||||
"litew.write(_data,table='friends')"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Reading from SQLite\n",
|
||||
"\n",
|
||||
"The cell below reads the data that has been written by the cell above and computes the average age from a plugin function we will write. \n",
|
||||
"\n",
|
||||
"- Basic read of the designated table (friends) created above\n",
|
||||
"- Read with pipeline functions defined in code\n",
|
||||
"\n",
|
||||
"**NOTE**\n",
|
||||
"\n",
|
||||
"It is possible to use **transport.factory.instance** or **transport.instance** or **transport.get.<[reader|writer]>** they are the same. It allows the maintainers to know that we used a factory design pattern."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
" name age\n",
|
||||
"0 James Bond 55\n",
|
||||
"1 Steve Rogers 150\n",
|
||||
"2 Steve Nyemba 44\n",
|
||||
"\n",
|
||||
"\n",
|
||||
" name age autoinc\n",
|
||||
"0 James Bond 5.5 0\n",
|
||||
"1 Steve Rogers 15.0 1\n",
|
||||
"2 Steve Nyemba 4.4 2\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import os\n",
|
||||
"import numpy as np\n",
|
||||
"def _autoincrement (_data,**kwargs) :\n",
|
||||
" \"\"\"\n",
|
||||
" This function will add an autoincrement field to the table\n",
|
||||
" \"\"\"\n",
|
||||
" _data['autoinc'] = np.arange(_data.shape[0])\n",
|
||||
" \n",
|
||||
" return _data\n",
|
||||
"def reduce(_data,**_args) :\n",
|
||||
" \"\"\"\n",
|
||||
" This function will reduce the age of the data frame\n",
|
||||
" \"\"\"\n",
|
||||
" _data.age /= 10\n",
|
||||
" return _data\n",
|
||||
"reader = transport.get.reader(provider=providers.SQLITE,database=DATABASE,table='friends')\n",
|
||||
"#\n",
|
||||
"# basic read of the data created in the first cell\n",
|
||||
"_df = reader.read()\n",
|
||||
"print (_df)\n",
|
||||
"print ()\n",
|
||||
"print()\n",
|
||||
"#\n",
|
||||
"# read of the data with pipeline function provided to alter the database\n",
|
||||
"print (reader.read(pipeline=[_autoincrement,reduce]))"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"The parameters for instianciating a transport object (reader or writer) can be found at [data-transport home](https://healthcareio.the-phi.com/data-transport)\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
||||
@ -1,131 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Writing to AWS S3\n",
|
||||
"\n",
|
||||
"We have setup our demo environment with the label **aws** passed to reference our s3 access_key and secret_key and file (called friends.csv). In the cell below we will write the data to our aws s3 bucket named **com.phi.demo**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"2.2.1\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to mongodb database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||
"mgw = transport.get.writer(label='aws')\n",
|
||||
"mgw.write(_data)\n",
|
||||
"print (transport.__version__)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Reading from AWS S3\n",
|
||||
"\n",
|
||||
"The cell below reads the data that has been written by the cell above and computes the average age within a mongodb pipeline. The code in the background executes an aggregation using\n",
|
||||
"\n",
|
||||
"- Basic read of the designated file **friends.csv**\n",
|
||||
"- Compute average age using standard pandas functions\n",
|
||||
"\n",
|
||||
"**NOTE**\n",
|
||||
"\n",
|
||||
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
|
||||
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
" bname age\n",
|
||||
"0 James Bond 55\n",
|
||||
"1 Steve Rogers 150\n",
|
||||
"2 Steve Nyemba 44\n",
|
||||
"--------- STATISTICS ------------\n",
|
||||
"83.0\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"\n",
|
||||
"def cast(stream) :\n",
|
||||
" print (stream)\n",
|
||||
" return pd.DataFrame(str(stream))\n",
|
||||
"mgr = transport.get.reader(label='aws')\n",
|
||||
"_df = mgr.read()\n",
|
||||
"print (_df)\n",
|
||||
"print ('--------- STATISTICS ------------')\n",
|
||||
"print (_df.age.mean())"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"An **auth-file** is a file that contains database parameters used to access the database. \n",
|
||||
"For code in shared environments, we recommend \n",
|
||||
"\n",
|
||||
"1. Having the **auth-file** stored on disk \n",
|
||||
"2. and the location of the file is set to an environment variable.\n",
|
||||
"\n",
|
||||
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
||||
@ -1,49 +0,0 @@
|
||||
[build-system]
|
||||
requires = ["setuptools>=61.0", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "data-transport"
|
||||
dynamic = ["version"]
|
||||
authors = [
|
||||
{name="Steve L. Nyemba" , email = "info@the-phi.com"},
|
||||
]
|
||||
description = ""
|
||||
readme = "README.md"
|
||||
license = {text = "LICENSE"}
|
||||
keywords = ["mongodb","duckdb","couchdb","rabbitmq","file","read","write","s3","sqlite"]
|
||||
classifiers = [
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Topic :: Utilities",
|
||||
]
|
||||
dependencies = [
|
||||
"termcolor","sqlalchemy", "aiosqlite","duckdb-engine",
|
||||
"mysql-connector-python","psycopg2-binary","nzpy","pymssql","duckdb-engine","aiosqlite",
|
||||
"typer","pandas","numpy","sqlalchemy","pyarrow","smart-open",
|
||||
"plugin-ix@git+https://github.com/lnyemba/plugins-ix"
|
||||
]
|
||||
[project.optional-dependencies]
|
||||
#sql = ["mysql-connector-python","psycopg2-binary","nzpy","pymssql","duckdb-engine","aiosqlite"]
|
||||
nosql = ["pymongo","cloudant"]
|
||||
cloud = ["boto","boto3","botocore","pyncclient","pandas-gbq","google-cloud-bigquery","google-cloud-bigquery-storage", "databricks-sqlalchemy","pyncclient","boto3","boto","botocore"]
|
||||
warehouse = ["pydrill","pyspark","sqlalchemy_drill"]
|
||||
other = ["pika","flask-session"]
|
||||
all = ["pymongo","cloudant","pandas-gbq","google-cloud-bigquery","google-cloud-bigquery-storage", "databricks-sqlalchemy","pyncclient","boto3","boto","botocore","pydrill","pyspark","sqlalchemy_drill", "pika","aiosqlite","boto3","boto","botocore", "pyncclient"]
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://healthcareio.the-phi.com/git/code/transport.git"
|
||||
|
||||
#[project.scripts]
|
||||
#transport = "transport:main"
|
||||
|
||||
[tool.setuptools]
|
||||
include-package-data = true
|
||||
zip-safe = false
|
||||
script-files = ["bin/transport","bin/transport.cmd"]
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
include = [ "transport", "transport.*"]
|
||||
|
||||
[tool.setuptools.dynamic]
|
||||
version = {attr = "transport.info.__version__"}
|
||||
#authors = {attr = "info.__author__"}
|
||||
@ -0,0 +1,31 @@
|
||||
"""
|
||||
This is a build file for the
|
||||
"""
|
||||
from setuptools import setup, find_packages
|
||||
import os
|
||||
import sys
|
||||
# from version import __version__,__author__
|
||||
from info import __version__, __author__
|
||||
|
||||
|
||||
# __author__ = 'The Phi Technology'
|
||||
# __version__= '1.8.0'
|
||||
|
||||
def read(fname):
|
||||
return open(os.path.join(os.path.dirname(__file__), fname)).read()
|
||||
args = {
|
||||
"name":"data-transport",
|
||||
"version":__version__,
|
||||
"author":__author__,"author_email":"info@the-phi.com",
|
||||
"license":"MIT",
|
||||
# "packages":["transport","info","transport/sql"]},
|
||||
|
||||
"packages": find_packages(include=['info','transport', 'transport.*'])}
|
||||
args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite']
|
||||
args["install_requires"] = ['pyncclient','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql']
|
||||
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
|
||||
args['scripts'] = ['bin/transport']
|
||||
# if sys.version_info[0] == 2 :
|
||||
# args['use_2to3'] = True
|
||||
# args['use_2to3_exclude_fixers']=['lib2to3.fixes.fix_import']
|
||||
setup(**args)
|
||||
@ -1,19 +0,0 @@
|
||||
"""
|
||||
This file will be intended to handle duckdb database
|
||||
"""
|
||||
|
||||
import duckdb
|
||||
from transport.common import Reader,Writer
|
||||
|
||||
class Duck(Reader):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
self._path = None if 'path' not in _args else _args['path']
|
||||
self._handler = duckdb.connect() if not self._path else duckdb.connect(self._path)
|
||||
|
||||
|
||||
class DuckReader(Duck) :
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
def read(self,**_args) :
|
||||
pass
|
||||
@ -1,23 +0,0 @@
|
||||
__app_name__ = 'data-transport'
|
||||
__author__ = 'The Phi Technology'
|
||||
__version__= '2.2.42'
|
||||
__email__ = "info@the-phi.com"
|
||||
__edition__= 'community'
|
||||
__license__=f"""
|
||||
Copyright 2010 - 2024, Steve L. Nyemba
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
__whatsnew__=f"""version {__version__},
|
||||
1. Added support for read/write logs as well as plugins (when applied)
|
||||
2. Bug fix with duckdb (adding readonly) for readers because there are issues with threads & processes
|
||||
3. support for streaming data, important to use this with large volumes of data
|
||||
|
||||
|
||||
"""
|
||||
@ -1 +1 @@
|
||||
from . import files, http, rabbitmq, callback, files, console
|
||||
from . import files, http, rabbitmq, callback, files
|
||||
@ -1,116 +0,0 @@
|
||||
import os
|
||||
import json
|
||||
from transport.info import __version__
|
||||
import copy
|
||||
import transport
|
||||
import importlib
|
||||
import importlib.util
|
||||
import shutil
|
||||
from io import StringIO
|
||||
|
||||
"""
|
||||
This class manages data from the registry and allows (read only)
|
||||
@TODO: add property to the DATA attribute
|
||||
"""
|
||||
if 'HOME' in os.environ :
|
||||
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
|
||||
else:
|
||||
REGISTRY_PATH=os.sep.join([os.environ['USERPROFILE'],'.data-transport'])
|
||||
|
||||
#
|
||||
# This path can be overriden by an environment variable ...
|
||||
#
|
||||
if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
|
||||
REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH']
|
||||
REGISTRY_FILE= 'transport-registry.json'
|
||||
DATA = {}
|
||||
|
||||
|
||||
def isloaded ():
|
||||
return DATA not in [{},None]
|
||||
def exists (path=REGISTRY_PATH,_file=REGISTRY_FILE) :
|
||||
"""
|
||||
This function determines if there is a registry at all
|
||||
"""
|
||||
p = os.path.exists(path)
|
||||
q = os.path.exists( os.sep.join([path,_file]))
|
||||
|
||||
return p and q
|
||||
def load (_path=REGISTRY_PATH,_file=REGISTRY_FILE):
|
||||
global DATA
|
||||
|
||||
if exists(_path) :
|
||||
path = os.sep.join([_path,_file])
|
||||
f = open(path)
|
||||
DATA = json.loads(f.read())
|
||||
f.close()
|
||||
def init (email,path=REGISTRY_PATH,override=False,_file=REGISTRY_FILE):
|
||||
"""
|
||||
Initializing the registry and will raise an exception in the advent of an issue
|
||||
"""
|
||||
p = '@' in email
|
||||
#q = False if '.' not in email else email.split('.')[-1] in ['edu','com','io','ai','org']
|
||||
q = len(email.split('.')[-1]) in [2,3]
|
||||
|
||||
if p and q :
|
||||
_config = {"email":email,'version':__version__}
|
||||
if not os.path.exists(path):
|
||||
os.makedirs(path)
|
||||
filename = os.sep.join([path,_file])
|
||||
if not os.path.exists(filename) or override == True :
|
||||
|
||||
f = open(filename,'w')
|
||||
f.write( json.dumps(_config))
|
||||
f.close()
|
||||
# _msg = f"""{CHECK_MARK} Successfully wrote configuration to {path} from {email}"""
|
||||
|
||||
else:
|
||||
raise Exception (f"""Unable to write configuration, Please check parameters (or help) and try again""")
|
||||
else:
|
||||
raise Exception (f"""Invalid Input, {email} is not well formatted, provide an email with adequate format""")
|
||||
def lookup (label):
|
||||
global DATA
|
||||
return label in DATA
|
||||
has = lookup
|
||||
|
||||
def get (label='default') :
|
||||
global DATA
|
||||
return copy.copy(DATA[label]) if label in DATA else {}
|
||||
|
||||
def set (label, auth_file, default=False,path=REGISTRY_PATH) :
|
||||
"""
|
||||
This function will add a label (auth-file data) into the registry and can set it as the default
|
||||
"""
|
||||
if label == 'default' :
|
||||
raise Exception ("""Invalid label name provided, please change the label name and use the switch""")
|
||||
reg_file = os.sep.join([path,REGISTRY_FILE])
|
||||
if os.path.exists(path) and os.path.exists(reg_file):
|
||||
if type(auth_file) == str and os.path.exists (auth_file) :
|
||||
f = open(auth_file)
|
||||
elif type(auth_file) == StringIO:
|
||||
f = auth_file
|
||||
_info = json.loads(f.read())
|
||||
f.close()
|
||||
f = open(reg_file)
|
||||
_config = json.loads(f.read())
|
||||
f.close()
|
||||
|
||||
#
|
||||
# set the proposed label
|
||||
_object = transport.factory.instance(**_info)
|
||||
if _object :
|
||||
_config[label] = _info
|
||||
if default :
|
||||
_config['default'] = _info
|
||||
#
|
||||
# now we need to write this to the location
|
||||
f = open(reg_file,'w')
|
||||
f.write(json.dumps(_config))
|
||||
f.close()
|
||||
else:
|
||||
raise Exception( f"""Unable to load file locate at {path},\nLearn how to generate auth-file with wizard found at https://healthcareio.the-phi.com/data-transport""")
|
||||
pass
|
||||
else:
|
||||
pass
|
||||
pass
|
||||
|
||||
@ -1,27 +0,0 @@
|
||||
"""
|
||||
This module implements the handler for duckdb (in memory or not)
|
||||
"""
|
||||
from transport.sql.common import Base, BaseReader, BaseWriter
|
||||
def template ():
|
||||
return {'database':'path-to-database','table':'table'}
|
||||
class Duck :
|
||||
def __init__(self,**_args):
|
||||
#
|
||||
# duckdb with none as database will operate as an in-memory database
|
||||
#
|
||||
self.database = _args['database'] if 'database' in _args else ''
|
||||
def get_provider(self):
|
||||
return "duckdb"
|
||||
|
||||
def _get_uri(self,**_args):
|
||||
return f"""duckdb:///{self.database}"""
|
||||
class Reader(Duck,BaseReader) :
|
||||
def __init__(self,**_args):
|
||||
Duck.__init__(self,**_args)
|
||||
BaseReader.__init__(self,**_args)
|
||||
def _get_uri(self,**_args):
|
||||
return super()._get_uri(**_args),{'connect_args':{'read_only':True}}
|
||||
class Writer(Duck,BaseWriter):
|
||||
def __init__(self,**_args):
|
||||
Duck.__init__(self,**_args)
|
||||
BaseWriter.__init__(self,**_args)
|
||||
@ -1,7 +0,0 @@
|
||||
"""
|
||||
This namespace/package is intended to handle read/writes against data warehouse solutions like :
|
||||
- apache iceberg
|
||||
- clickhouse (...)
|
||||
"""
|
||||
|
||||
from . import iceberg, drill
|
||||
@ -1,59 +0,0 @@
|
||||
import sqlalchemy
|
||||
import pandas as pd
|
||||
from .. sql.common import BaseReader , BaseWriter
|
||||
import sqlalchemy as sqa
|
||||
|
||||
|
||||
def template():
|
||||
return {'host':'localhost','port':8047,'ssl':False,'table':None,'database':None}
|
||||
|
||||
class Drill :
|
||||
__template = {'host':None,'port':None,'ssl':None,'table':None,'database':None}
|
||||
def __init__(self,**_args):
|
||||
|
||||
self._host = _args['host'] if 'host' in _args else 'localhost'
|
||||
self._port = _args['port'] if 'port' in _args else self.get_default_port()
|
||||
self._ssl = False if 'ssl' not in _args else _args['ssl']
|
||||
|
||||
self._table = _args['table'] if 'table' in _args else None
|
||||
if self._table and '.' in self._table :
|
||||
_seg = self._table.split('.')
|
||||
if len(_seg) > 2 :
|
||||
self._schema,self._database = _seg[:2]
|
||||
else:
|
||||
|
||||
self._database=_args['database']
|
||||
self._schema = self._database.split('.')[0]
|
||||
|
||||
def _get_uri(self,**_args):
|
||||
return f'drill+sadrill://{self._host}:{self._port}/{self._database}?use_ssl={self._ssl}'
|
||||
def get_provider(self):
|
||||
return "drill+sadrill"
|
||||
def get_default_port(self):
|
||||
return "8047"
|
||||
def meta(self,**_args):
|
||||
_table = _args['table'] if 'table' in _args else self._table
|
||||
if '.' in _table :
|
||||
_schema = _table.split('.')[:2]
|
||||
_schema = '.'.join(_schema)
|
||||
_table = _table.split('.')[-1]
|
||||
else:
|
||||
_schema = self._schema
|
||||
|
||||
# _sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( 125 )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
|
||||
_sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( '||COLUMN_SIZE||' )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
|
||||
try:
|
||||
_df = pd.read_sql(_sql,self._engine)
|
||||
return _df.to_dict(orient='records')
|
||||
except Exception as e:
|
||||
print (e)
|
||||
pass
|
||||
return []
|
||||
class Reader (Drill,BaseReader) :
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
self._chunksize = 0 if 'chunksize' not in _args else _args['chunksize']
|
||||
self._engine= sqa.create_engine(self._get_uri(),future=True)
|
||||
class Writer(Drill,BaseWriter):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(self,**_args)
|
||||
@ -1,153 +0,0 @@
|
||||
"""
|
||||
dependency:
|
||||
- spark and SPARK_HOME environment variable must be set
|
||||
NOTE:
|
||||
When using streaming option, insure that it is inline with default (1000 rows) or increase it in spark-defaults.conf
|
||||
|
||||
"""
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark import SparkContext
|
||||
from pyspark.sql.types import *
|
||||
from pyspark.sql.functions import col, to_date, to_timestamp
|
||||
import copy
|
||||
|
||||
def template():
|
||||
return {'catalog':None,'database':None,'table':None}
|
||||
class Iceberg :
|
||||
def __init__(self,**_args):
|
||||
"""
|
||||
providing catalog meta information (you must get this from apache iceberg)
|
||||
"""
|
||||
#
|
||||
# Turning off logging (it's annoying & un-professional)
|
||||
#
|
||||
# _spconf = SparkContext()
|
||||
# _spconf.setLogLevel("ERROR")
|
||||
#
|
||||
# @TODO:
|
||||
# Make arrangements for additional configuration elements
|
||||
#
|
||||
self._session = SparkSession.builder.appName("data-transport").getOrCreate()
|
||||
self._session.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
|
||||
# self._session.sparkContext.setLogLevel("ERROR")
|
||||
self._catalog = self._session.catalog
|
||||
self._table = _args['table'] if 'table' in _args else None
|
||||
|
||||
if 'catalog' in _args :
|
||||
#
|
||||
# Let us set the default catalog
|
||||
self._catalog.setCurrentCatalog(_args['catalog'])
|
||||
|
||||
else:
|
||||
# No current catalog has been set ...
|
||||
pass
|
||||
if 'database' in _args :
|
||||
self._database = _args['database']
|
||||
self._catalog.setCurrentDatabase(self._database)
|
||||
else:
|
||||
#
|
||||
# Should we set the default as the first one if available ?
|
||||
#
|
||||
pass
|
||||
self._catalogName = self._catalog.currentCatalog()
|
||||
self._databaseName = self._catalog.currentDatabase()
|
||||
def meta (self,**_args) :
|
||||
"""
|
||||
This function should return the schema of a table (only)
|
||||
"""
|
||||
_schema = []
|
||||
try:
|
||||
_table = _args['table'] if 'table' in _args else self._table
|
||||
_tableName = self._getPrefix(**_args) + f".{_table}"
|
||||
_tmp = self._session.table(_tableName).schema
|
||||
_schema = _tmp.jsonValue()['fields']
|
||||
for _item in _schema :
|
||||
del _item['nullable'],_item['metadata']
|
||||
except Exception as e:
|
||||
|
||||
pass
|
||||
return _schema
|
||||
def _getPrefix (self,**_args):
|
||||
_catName = self._catalogName if 'catalog' not in _args else _args['catalog']
|
||||
_datName = self._databaseName if 'database' not in _args else _args['database']
|
||||
|
||||
return '.'.join([_catName,_datName])
|
||||
def apply(self,_query):
|
||||
"""
|
||||
sql query/command to run against apache iceberg
|
||||
"""
|
||||
return self._session.sql(_query).toPandas()
|
||||
def has (self,**_args):
|
||||
try:
|
||||
_prefix = self._getPrefix(**_args)
|
||||
if _prefix.endswith('.') :
|
||||
return False
|
||||
return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)]
|
||||
except Exception as e:
|
||||
print (e)
|
||||
return False
|
||||
|
||||
def close(self):
|
||||
self._session.stop()
|
||||
class Reader(Iceberg) :
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
def read(self,**_args):
|
||||
_table = self._table
|
||||
_prefix = self._getPrefix(**_args)
|
||||
if 'table' in _args or _table:
|
||||
_table = _args['table'] if 'table' in _args else _table
|
||||
_table = _prefix + f'.{_table}'
|
||||
return self._session.table(_table).toPandas()
|
||||
else:
|
||||
sql = _args['sql']
|
||||
return self._session.sql(sql).toPandas()
|
||||
pass
|
||||
class Writer (Iceberg):
|
||||
"""
|
||||
Writing data to an Apache Iceberg data warehouse (using pyspark)
|
||||
"""
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
self._mode = 'append' if 'mode' not in _args else _args['mode']
|
||||
self._table = None if 'table' not in _args else _args['table']
|
||||
def format (self,_schema) :
|
||||
_iceSchema = StructType([])
|
||||
_map = {'integer':IntegerType(),'float':DoubleType(),'double':DoubleType(),'date':DateType(),
|
||||
'timestamp':TimestampType(),'datetime':TimestampType(),'string':StringType(),'varchar':StringType()}
|
||||
for _item in _schema :
|
||||
_name = _item['name']
|
||||
_type = _item['type'].lower()
|
||||
if _type not in _map :
|
||||
_iceType = StringType()
|
||||
else:
|
||||
_iceType = _map[_type]
|
||||
|
||||
_iceSchema.add (StructField(_name,_iceType,True))
|
||||
return _iceSchema if len(_iceSchema) else []
|
||||
def write(self,_data,**_args):
|
||||
_prefix = self._getPrefix(**_args)
|
||||
if 'table' not in _args and not self._table :
|
||||
raise Exception (f"Table Name should be specified for catalog/database {_prefix}")
|
||||
_schema = self.format(_args['schema']) if 'schema' in _args else []
|
||||
if not _schema :
|
||||
rdd = self._session.createDataFrame(_data,verifySchema=False)
|
||||
else :
|
||||
rdd = self._session.createDataFrame(_data,schema=_schema,verifySchema=True)
|
||||
_mode = self._mode if 'mode' not in _args else _args['mode']
|
||||
_table = self._table if 'table' not in _args else _args['table']
|
||||
|
||||
# print (_data.shape,_mode,_table)
|
||||
|
||||
if not self._session.catalog.tableExists(_table):
|
||||
# # @TODO:
|
||||
# # add partitioning information here
|
||||
rdd.writeTo(_table).using('iceberg').create()
|
||||
|
||||
# # _mode = 'overwrite'
|
||||
# # rdd.write.format('iceberg').mode(_mode).saveAsTable(_table)
|
||||
else:
|
||||
# rdd.writeTo(_table).append()
|
||||
# # _table = f'{_prefix}.{_table}'
|
||||
|
||||
rdd.coalesce(10).write.format('iceberg').mode('append').save(_table)
|
||||
Loading…
Reference in new issue