Compare commits
No commits in common. 'master' and 'v2.0' have entirely different histories.
@ -1,188 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Extract Transform Load (ETL) from Code\n",
|
||||
"\n",
|
||||
"The example below reads data from an http source (github) and will copy the data to a csv file and to a database. This example illustrates the one-to-many ETL features.\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"data": {
|
||||
"text/html": [
|
||||
"<div>\n",
|
||||
"<style scoped>\n",
|
||||
" .dataframe tbody tr th:only-of-type {\n",
|
||||
" vertical-align: middle;\n",
|
||||
" }\n",
|
||||
"\n",
|
||||
" .dataframe tbody tr th {\n",
|
||||
" vertical-align: top;\n",
|
||||
" }\n",
|
||||
"\n",
|
||||
" .dataframe thead th {\n",
|
||||
" text-align: right;\n",
|
||||
" }\n",
|
||||
"</style>\n",
|
||||
"<table border=\"1\" class=\"dataframe\">\n",
|
||||
" <thead>\n",
|
||||
" <tr style=\"text-align: right;\">\n",
|
||||
" <th></th>\n",
|
||||
" <th>id</th>\n",
|
||||
" <th>location_id</th>\n",
|
||||
" <th>address_1</th>\n",
|
||||
" <th>address_2</th>\n",
|
||||
" <th>city</th>\n",
|
||||
" <th>state_province</th>\n",
|
||||
" <th>postal_code</th>\n",
|
||||
" <th>country</th>\n",
|
||||
" </tr>\n",
|
||||
" </thead>\n",
|
||||
" <tbody>\n",
|
||||
" <tr>\n",
|
||||
" <th>0</th>\n",
|
||||
" <td>1</td>\n",
|
||||
" <td>1</td>\n",
|
||||
" <td>2600 Middlefield Road</td>\n",
|
||||
" <td>NaN</td>\n",
|
||||
" <td>Redwood City</td>\n",
|
||||
" <td>CA</td>\n",
|
||||
" <td>94063</td>\n",
|
||||
" <td>US</td>\n",
|
||||
" </tr>\n",
|
||||
" <tr>\n",
|
||||
" <th>1</th>\n",
|
||||
" <td>2</td>\n",
|
||||
" <td>2</td>\n",
|
||||
" <td>24 Second Avenue</td>\n",
|
||||
" <td>NaN</td>\n",
|
||||
" <td>San Mateo</td>\n",
|
||||
" <td>CA</td>\n",
|
||||
" <td>94401</td>\n",
|
||||
" <td>US</td>\n",
|
||||
" </tr>\n",
|
||||
" <tr>\n",
|
||||
" <th>2</th>\n",
|
||||
" <td>3</td>\n",
|
||||
" <td>3</td>\n",
|
||||
" <td>24 Second Avenue</td>\n",
|
||||
" <td>NaN</td>\n",
|
||||
" <td>San Mateo</td>\n",
|
||||
" <td>CA</td>\n",
|
||||
" <td>94403</td>\n",
|
||||
" <td>US</td>\n",
|
||||
" </tr>\n",
|
||||
" <tr>\n",
|
||||
" <th>3</th>\n",
|
||||
" <td>4</td>\n",
|
||||
" <td>4</td>\n",
|
||||
" <td>24 Second Avenue</td>\n",
|
||||
" <td>NaN</td>\n",
|
||||
" <td>San Mateo</td>\n",
|
||||
" <td>CA</td>\n",
|
||||
" <td>94401</td>\n",
|
||||
" <td>US</td>\n",
|
||||
" </tr>\n",
|
||||
" <tr>\n",
|
||||
" <th>4</th>\n",
|
||||
" <td>5</td>\n",
|
||||
" <td>5</td>\n",
|
||||
" <td>24 Second Avenue</td>\n",
|
||||
" <td>NaN</td>\n",
|
||||
" <td>San Mateo</td>\n",
|
||||
" <td>CA</td>\n",
|
||||
" <td>94401</td>\n",
|
||||
" <td>US</td>\n",
|
||||
" </tr>\n",
|
||||
" </tbody>\n",
|
||||
"</table>\n",
|
||||
"</div>"
|
||||
],
|
||||
"text/plain": [
|
||||
" id location_id address_1 address_2 city \\\n",
|
||||
"0 1 1 2600 Middlefield Road NaN Redwood City \n",
|
||||
"1 2 2 24 Second Avenue NaN San Mateo \n",
|
||||
"2 3 3 24 Second Avenue NaN San Mateo \n",
|
||||
"3 4 4 24 Second Avenue NaN San Mateo \n",
|
||||
"4 5 5 24 Second Avenue NaN San Mateo \n",
|
||||
"\n",
|
||||
" state_province postal_code country \n",
|
||||
"0 CA 94063 US \n",
|
||||
"1 CA 94401 US \n",
|
||||
"2 CA 94403 US \n",
|
||||
"3 CA 94401 US \n",
|
||||
"4 CA 94401 US "
|
||||
]
|
||||
},
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to Google Bigquery database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"import os\n",
|
||||
"\n",
|
||||
"#\n",
|
||||
"#\n",
|
||||
"source = {\"provider\": \"http\", \"url\": \"https://raw.githubusercontent.com/codeforamerica/ohana-api/master/data/sample-csv/addresses.csv\"}\n",
|
||||
"target = [{\"provider\": \"files\", \"path\": \"addresses.csv\", \"delimiter\": \",\"}, {\"provider\": \"sqlite\", \"database\": \"sample.db3\", \"table\": \"addresses\"}]\n",
|
||||
"\n",
|
||||
"_handler = transport.get.etl (source=source,target=target)\n",
|
||||
"_data = _handler.read() #-- all etl begins with data being read\n",
|
||||
"_data.head()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Extract Transform Load (ETL) from CLI\n",
|
||||
"\n",
|
||||
"The documentation for this is available at https://healthcareio.the-phi.com/data-transport \"Docs\" -> \"Terminal CLI\"\n",
|
||||
"\n",
|
||||
"The entire process is documented including how to generate an ETL configuration file."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -1,138 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Writing to Apache Iceberg\n",
|
||||
"\n",
|
||||
"1. Insure you have a Google Bigquery service account key on disk\n",
|
||||
"2. The service key location is set as an environment variable **BQ_KEY**\n",
|
||||
"3. The dataset will be automatically created within the project associated with the service key\n",
|
||||
"\n",
|
||||
"The cell below creates a dataframe that will be stored within Google Bigquery"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 15,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"['data transport version ', '2.4.0']\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to Google Bigquery database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"import os\n",
|
||||
"\n",
|
||||
"PRIVATE_KEY = os.environ['BQ_KEY'] #-- location of the service key\n",
|
||||
"DATASET = 'demo'\n",
|
||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||
"# bqw = transport.get.writer(provider=providers.ICEBERG,catalog='mz',database='edw.mz',table='friends')\n",
|
||||
"bqw = transport.get.writer(provider=providers.ICEBERG,table='edw.mz.friends')\n",
|
||||
"bqw.write(_data,if_exists='replace') #-- default is append\n",
|
||||
"print (['data transport version ', transport.__version__])\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Reading from Google Bigquery\n",
|
||||
"\n",
|
||||
"The cell below reads the data that has been written by the cell above and computes the average age within a Google Bigquery (simple query). \n",
|
||||
"\n",
|
||||
"- Basic read of the designated table (friends) created above\n",
|
||||
"- Execute an aggregate SQL against the table\n",
|
||||
"\n",
|
||||
"**NOTE**\n",
|
||||
"\n",
|
||||
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
|
||||
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 14,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
" name age\n",
|
||||
"0 James Bond 55\n",
|
||||
"1 Steve Rogers 150\n",
|
||||
"2 Steve Nyemba 44\n",
|
||||
"--------- STATISTICS ------------\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import os\n",
|
||||
"PRIVATE_KEY=os.environ['BQ_KEY']\n",
|
||||
"pgr = transport.get.reader(provider=providers.ICEBERG,database='edw.mz')\n",
|
||||
"_df = pgr.read(table='friends')\n",
|
||||
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
|
||||
"_sdf = pgr.read(sql=_query)\n",
|
||||
"print (_df)\n",
|
||||
"print ('--------- STATISTICS ------------')\n",
|
||||
"# print (_sdf)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"An **auth-file** is a file that contains database parameters used to access the database. \n",
|
||||
"For code in shared environments, we recommend \n",
|
||||
"\n",
|
||||
"1. Having the **auth-file** stored on disk \n",
|
||||
"2. and the location of the file is set to an environment variable.\n",
|
||||
"\n",
|
||||
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -1,149 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Writing data-transport plugins\n",
|
||||
"\n",
|
||||
"The data-transport plugins are designed to automate pre/post processing i.e\n",
|
||||
"\n",
|
||||
" - Read -> Post processing\n",
|
||||
" - Write-> Pre processing\n",
|
||||
" \n",
|
||||
"In this example we will assume, data and write both pre/post processing to any supported infrastructure. We will equally show how to specify the plugins within a configuration file"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to Google Bigquery database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"import os\n",
|
||||
"import shutil\n",
|
||||
"#\n",
|
||||
"#\n",
|
||||
"\n",
|
||||
"DATABASE = '/home/steve/tmp/demo.db3'\n",
|
||||
"if os.path.exists(DATABASE) :\n",
|
||||
" os.remove(DATABASE)\n",
|
||||
"#\n",
|
||||
"# \n",
|
||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||
"litew = transport.get.writer(provider=providers.SQLITE,database=DATABASE)\n",
|
||||
"litew.write(_data,table='friends')"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Reading from SQLite\n",
|
||||
"\n",
|
||||
"The cell below reads the data that has been written by the cell above and computes the average age from a plugin function we will write. \n",
|
||||
"\n",
|
||||
"- Basic read of the designated table (friends) created above\n",
|
||||
"- Read with pipeline functions defined in code\n",
|
||||
"\n",
|
||||
"**NOTE**\n",
|
||||
"\n",
|
||||
"It is possible to use **transport.factory.instance** or **transport.instance** or **transport.get.<[reader|writer]>** they are the same. It allows the maintainers to know that we used a factory design pattern."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
" name age\n",
|
||||
"0 James Bond 55\n",
|
||||
"1 Steve Rogers 150\n",
|
||||
"2 Steve Nyemba 44\n",
|
||||
"\n",
|
||||
"\n",
|
||||
" name age autoinc\n",
|
||||
"0 James Bond 5.5 0\n",
|
||||
"1 Steve Rogers 15.0 1\n",
|
||||
"2 Steve Nyemba 4.4 2\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import os\n",
|
||||
"import numpy as np\n",
|
||||
"def _autoincrement (_data,**kwargs) :\n",
|
||||
" \"\"\"\n",
|
||||
" This function will add an autoincrement field to the table\n",
|
||||
" \"\"\"\n",
|
||||
" _data['autoinc'] = np.arange(_data.shape[0])\n",
|
||||
" \n",
|
||||
" return _data\n",
|
||||
"def reduce(_data,**_args) :\n",
|
||||
" \"\"\"\n",
|
||||
" This function will reduce the age of the data frame\n",
|
||||
" \"\"\"\n",
|
||||
" _data.age /= 10\n",
|
||||
" return _data\n",
|
||||
"reader = transport.get.reader(provider=providers.SQLITE,database=DATABASE,table='friends')\n",
|
||||
"#\n",
|
||||
"# basic read of the data created in the first cell\n",
|
||||
"_df = reader.read()\n",
|
||||
"print (_df)\n",
|
||||
"print ()\n",
|
||||
"print()\n",
|
||||
"#\n",
|
||||
"# read of the data with pipeline function provided to alter the database\n",
|
||||
"print (reader.read(pipeline=[_autoincrement,reduce]))"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"The parameters for instianciating a transport object (reader or writer) can be found at [data-transport home](https://healthcareio.the-phi.com/data-transport)\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -1,131 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Writing to AWS S3\n",
|
||||
"\n",
|
||||
"We have setup our demo environment with the label **aws** passed to reference our s3 access_key and secret_key and file (called friends.csv). In the cell below we will write the data to our aws s3 bucket named **com.phi.demo**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"2.2.1\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"#\n",
|
||||
"# Writing to mongodb database\n",
|
||||
"#\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||
"mgw = transport.get.writer(label='aws')\n",
|
||||
"mgw.write(_data)\n",
|
||||
"print (transport.__version__)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Reading from AWS S3\n",
|
||||
"\n",
|
||||
"The cell below reads the data that has been written by the cell above and computes the average age within a mongodb pipeline. The code in the background executes an aggregation using\n",
|
||||
"\n",
|
||||
"- Basic read of the designated file **friends.csv**\n",
|
||||
"- Compute average age using standard pandas functions\n",
|
||||
"\n",
|
||||
"**NOTE**\n",
|
||||
"\n",
|
||||
"By design **read** object are separated from **write** objects in order to avoid accidental writes to the database.\n",
|
||||
"Read objects are created with **transport.get.reader** whereas write objects are created with **transport.get.writer**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
" bname age\n",
|
||||
"0 James Bond 55\n",
|
||||
"1 Steve Rogers 150\n",
|
||||
"2 Steve Nyemba 44\n",
|
||||
"--------- STATISTICS ------------\n",
|
||||
"83.0\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"\n",
|
||||
"import transport\n",
|
||||
"from transport import providers\n",
|
||||
"import pandas as pd\n",
|
||||
"\n",
|
||||
"def cast(stream) :\n",
|
||||
" print (stream)\n",
|
||||
" return pd.DataFrame(str(stream))\n",
|
||||
"mgr = transport.get.reader(label='aws')\n",
|
||||
"_df = mgr.read()\n",
|
||||
"print (_df)\n",
|
||||
"print ('--------- STATISTICS ------------')\n",
|
||||
"print (_df.age.mean())"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"An **auth-file** is a file that contains database parameters used to access the database. \n",
|
||||
"For code in shared environments, we recommend \n",
|
||||
"\n",
|
||||
"1. Having the **auth-file** stored on disk \n",
|
||||
"2. and the location of the file is set to an environment variable.\n",
|
||||
"\n",
|
||||
"To generate a template of the **auth-file** open the **file generator wizard** found at visit https://healthcareio.the-phi.com/data-transport"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.9.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
@ -1,62 +0,0 @@
|
||||
[build-system]
|
||||
requires = ["setuptools>=61.0", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "data-transport"
|
||||
dynamic = ["version"]
|
||||
authors = [
|
||||
{name="Steve L. Nyemba" , email = "info@the-phi.com"},
|
||||
]
|
||||
description = ""
|
||||
readme = "README.md"
|
||||
license = {text = "LICENSE"}
|
||||
keywords = ["mongodb","duckdb","couchdb","rabbitmq","file","read","write","s3","sqlite"]
|
||||
classifiers = [
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Topic :: Utilities",
|
||||
]
|
||||
dependencies = [
|
||||
"termcolor","sqlalchemy", "aiosqlite","duckdb-engine",
|
||||
"typer","pandas","numpy","sqlalchemy","pyarrow",
|
||||
"plugin-ix@git+https://github.com/lnyemba/plugins-ix"
|
||||
]
|
||||
[project.optional-dependencies]
|
||||
sql = ["mysql-connector-python","psycopg2-binary","nzpy","pymssql","duckdb-engine","aiosqlite"]
|
||||
nosql = ["pymongo","cloudant"]
|
||||
cloud = ["pandas-gbq","google-cloud-bigquery","google-cloud-bigquery-storage", "databricks-sqlalchemy","pyncclient","boto3","boto","botocore"]
|
||||
warehouse = ["pydrill","pyspark","sqlalchemy_drill"]
|
||||
rabbitmq = ["pika"]
|
||||
sqlite = ["aiosqlite"]
|
||||
aws3 = ["boto3","boto","botocore"]
|
||||
nextcloud = ["pyncclient"]
|
||||
mongodb = ["pymongo"]
|
||||
netezza = ["nzpy"]
|
||||
mysql = ["mysql-connector-python"]
|
||||
postgresql = ["psycopg2-binary"]
|
||||
sqlserver = ["pymssql"]
|
||||
http = ["flask-session"]
|
||||
all = ["mysql-connector-python","psycopg2-binary","nzpy","pymssql","duckdb-engine","aiosqlite","pymongo","cloudant","pandas-gbq","google-cloud-bigquery","google-cloud-bigquery-storage", "databricks-sqlalchemy","pyncclient","boto3","boto","botocore","pydrill","pyspark","sqlalchemy_drill", "pika","aiosqlite","boto3","boto","botocore", "pyncclient"]
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://healthcareio.the-phi.com/git/code/transport.git"
|
||||
|
||||
#[project.scripts]
|
||||
#transport = "transport:main"
|
||||
|
||||
[tool.setuptools]
|
||||
include-package-data = true
|
||||
zip-safe = false
|
||||
script-files = ["bin/transport"]
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
include = ["info","info.*", "transport", "transport.*"]
|
||||
|
||||
[tool.setuptools.dynamic]
|
||||
version = {attr = "info.__version__"}
|
||||
#authors = {attr = "meta.__author__"}
|
||||
|
||||
# If you have a info.py file, you might also want to include the author dynamically:
|
||||
# [tool.setuptools.dynamic]
|
||||
# version = {attr = "info.__version__"}
|
||||
# authors = {attr = "info.__author__"}
|
@ -0,0 +1,31 @@
|
||||
"""
|
||||
This is a build file for the
|
||||
"""
|
||||
from setuptools import setup, find_packages
|
||||
import os
|
||||
import sys
|
||||
# from version import __version__,__author__
|
||||
from info import __version__, __author__
|
||||
|
||||
|
||||
# __author__ = 'The Phi Technology'
|
||||
# __version__= '1.8.0'
|
||||
|
||||
def read(fname):
|
||||
return open(os.path.join(os.path.dirname(__file__), fname)).read()
|
||||
args = {
|
||||
"name":"data-transport",
|
||||
"version":__version__,
|
||||
"author":__author__,"author_email":"info@the-phi.com",
|
||||
"license":"MIT",
|
||||
# "packages":["transport","info","transport/sql"]},
|
||||
|
||||
"packages": find_packages(include=['info','transport', 'transport.*'])}
|
||||
args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite']
|
||||
args["install_requires"] = ['pyncclient','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql']
|
||||
args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git"
|
||||
args['scripts'] = ['bin/transport']
|
||||
# if sys.version_info[0] == 2 :
|
||||
# args['use_2to3'] = True
|
||||
# args['use_2to3_exclude_fixers']=['lib2to3.fixes.fix_import']
|
||||
setup(**args)
|
@ -1,19 +0,0 @@
|
||||
"""
|
||||
This file will be intended to handle duckdb database
|
||||
"""
|
||||
|
||||
import duckdb
|
||||
from transport.common import Reader,Writer
|
||||
|
||||
class Duck(Reader):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
self._path = None if 'path' not in _args else _args['path']
|
||||
self._handler = duckdb.connect() if not self._path else duckdb.connect(self._path)
|
||||
|
||||
|
||||
class DuckReader(Duck) :
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
def read(self,**_args) :
|
||||
pass
|
@ -1 +1 @@
|
||||
from . import files, http, rabbitmq, callback, files, console
|
||||
from . import files, http, rabbitmq, callback, files
|
@ -1,114 +0,0 @@
|
||||
import os
|
||||
import json
|
||||
from info import __version__
|
||||
import copy
|
||||
import transport
|
||||
import importlib
|
||||
import importlib.util
|
||||
import shutil
|
||||
from io import StringIO
|
||||
|
||||
"""
|
||||
This class manages data from the registry and allows (read only)
|
||||
@TODO: add property to the DATA attribute
|
||||
"""
|
||||
if 'HOME' in os.environ :
|
||||
REGISTRY_PATH=os.sep.join([os.environ['HOME'],'.data-transport'])
|
||||
else:
|
||||
REGISTRY_PATH=os.sep.join([os.environ['USERPROFILE'],'.data-transport'])
|
||||
|
||||
#
|
||||
# This path can be overriden by an environment variable ...
|
||||
#
|
||||
if 'DATA_TRANSPORT_REGISTRY_PATH' in os.environ :
|
||||
REGISTRY_PATH = os.environ['DATA_TRANSPORT_REGISTRY_PATH']
|
||||
REGISTRY_FILE= 'transport-registry.json'
|
||||
DATA = {}
|
||||
|
||||
|
||||
def isloaded ():
|
||||
return DATA not in [{},None]
|
||||
def exists (path=REGISTRY_PATH,_file=REGISTRY_FILE) :
|
||||
"""
|
||||
This function determines if there is a registry at all
|
||||
"""
|
||||
p = os.path.exists(path)
|
||||
q = os.path.exists( os.sep.join([path,_file]))
|
||||
|
||||
return p and q
|
||||
def load (_path=REGISTRY_PATH,_file=REGISTRY_FILE):
|
||||
global DATA
|
||||
|
||||
if exists(_path) :
|
||||
path = os.sep.join([_path,_file])
|
||||
f = open(path)
|
||||
DATA = json.loads(f.read())
|
||||
f.close()
|
||||
def init (email,path=REGISTRY_PATH,override=False,_file=REGISTRY_FILE):
|
||||
"""
|
||||
Initializing the registry and will raise an exception in the advent of an issue
|
||||
"""
|
||||
p = '@' in email
|
||||
q = False if '.' not in email else email.split('.')[-1] in ['edu','com','io','ai','org']
|
||||
if p and q :
|
||||
_config = {"email":email,'version':__version__}
|
||||
if not os.path.exists(path):
|
||||
os.makedirs(path)
|
||||
filename = os.sep.join([path,_file])
|
||||
if not os.path.exists(filename) or override == True :
|
||||
|
||||
f = open(filename,'w')
|
||||
f.write( json.dumps(_config))
|
||||
f.close()
|
||||
# _msg = f"""{CHECK_MARK} Successfully wrote configuration to {path} from {email}"""
|
||||
|
||||
else:
|
||||
raise Exception (f"""Unable to write configuration, Please check parameters (or help) and try again""")
|
||||
else:
|
||||
raise Exception (f"""Invalid Input, {email} is not well formatted, provide an email with adequate format""")
|
||||
def lookup (label):
|
||||
global DATA
|
||||
return label in DATA
|
||||
has = lookup
|
||||
|
||||
def get (label='default') :
|
||||
global DATA
|
||||
return copy.copy(DATA[label]) if label in DATA else {}
|
||||
|
||||
def set (label, auth_file, default=False,path=REGISTRY_PATH) :
|
||||
"""
|
||||
This function will add a label (auth-file data) into the registry and can set it as the default
|
||||
"""
|
||||
if label == 'default' :
|
||||
raise Exception ("""Invalid label name provided, please change the label name and use the switch""")
|
||||
reg_file = os.sep.join([path,REGISTRY_FILE])
|
||||
if os.path.exists(path) and os.path.exists(reg_file):
|
||||
if type(auth_file) == str and os.path.exists (auth_file) :
|
||||
f = open(auth_file)
|
||||
elif type(auth_file) == StringIO:
|
||||
f = auth_file
|
||||
_info = json.loads(f.read())
|
||||
f.close()
|
||||
f = open(reg_file)
|
||||
_config = json.loads(f.read())
|
||||
f.close()
|
||||
|
||||
#
|
||||
# set the proposed label
|
||||
_object = transport.factory.instance(**_info)
|
||||
if _object :
|
||||
_config[label] = _info
|
||||
if default :
|
||||
_config['default'] = _info
|
||||
#
|
||||
# now we need to write this to the location
|
||||
f = open(reg_file,'w')
|
||||
f.write(json.dumps(_config))
|
||||
f.close()
|
||||
else:
|
||||
raise Exception( f"""Unable to load file locate at {path},\nLearn how to generate auth-file with wizard found at https://healthcareio.the-phi.com/data-transport""")
|
||||
pass
|
||||
else:
|
||||
pass
|
||||
pass
|
||||
|
@ -1,26 +0,0 @@
|
||||
"""
|
||||
This module implements the handler for duckdb (in memory or not)
|
||||
"""
|
||||
from transport.sql.common import Base, BaseReader, BaseWriter
|
||||
|
||||
class Duck :
|
||||
def __init__(self,**_args):
|
||||
#
|
||||
# duckdb with none as database will operate as an in-memory database
|
||||
#
|
||||
self.database = _args['database'] if 'database' in _args else ''
|
||||
def get_provider(self):
|
||||
return "duckdb"
|
||||
|
||||
def _get_uri(self,**_args):
|
||||
return f"""duckdb:///{self.database}"""
|
||||
class Reader(Duck,BaseReader) :
|
||||
def __init__(self,**_args):
|
||||
Duck.__init__(self,**_args)
|
||||
BaseReader.__init__(self,**_args)
|
||||
def _get_uri(self,**_args):
|
||||
return super()._get_uri(**_args),{'connect_args':{'read_only':True}}
|
||||
class Writer(Duck,BaseWriter):
|
||||
def __init__(self,**_args):
|
||||
Duck.__init__(self,**_args)
|
||||
BaseWriter.__init__(self,**_args)
|
@ -1,7 +0,0 @@
|
||||
"""
|
||||
This namespace/package is intended to handle read/writes against data warehouse solutions like :
|
||||
- apache iceberg
|
||||
- clickhouse (...)
|
||||
"""
|
||||
|
||||
from . import iceberg, drill
|
@ -1,55 +0,0 @@
|
||||
import sqlalchemy
|
||||
import pandas as pd
|
||||
from .. sql.common import BaseReader , BaseWriter
|
||||
import sqlalchemy as sqa
|
||||
|
||||
class Drill :
|
||||
__template = {'host':None,'port':None,'ssl':None,'table':None,'database':None}
|
||||
def __init__(self,**_args):
|
||||
|
||||
self._host = _args['host'] if 'host' in _args else 'localhost'
|
||||
self._port = _args['port'] if 'port' in _args else self.get_default_port()
|
||||
self._ssl = False if 'ssl' not in _args else _args['ssl']
|
||||
|
||||
self._table = _args['table'] if 'table' in _args else None
|
||||
if self._table and '.' in self._table :
|
||||
_seg = self._table.split('.')
|
||||
if len(_seg) > 2 :
|
||||
self._schema,self._database = _seg[:2]
|
||||
else:
|
||||
|
||||
self._database=_args['database']
|
||||
self._schema = self._database.split('.')[0]
|
||||
|
||||
def _get_uri(self,**_args):
|
||||
return f'drill+sadrill://{self._host}:{self._port}/{self._database}?use_ssl={self._ssl}'
|
||||
def get_provider(self):
|
||||
return "drill+sadrill"
|
||||
def get_default_port(self):
|
||||
return "8047"
|
||||
def meta(self,**_args):
|
||||
_table = _args['table'] if 'table' in _args else self._table
|
||||
if '.' in _table :
|
||||
_schema = _table.split('.')[:2]
|
||||
_schema = '.'.join(_schema)
|
||||
_table = _table.split('.')[-1]
|
||||
else:
|
||||
_schema = self._schema
|
||||
|
||||
# _sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( 125 )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
|
||||
_sql = f"select COLUMN_NAME AS name, CASE WHEN DATA_TYPE ='CHARACTER VARYING' THEN 'CHAR ( '||COLUMN_SIZE||' )' ELSE DATA_TYPE END AS type from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='{_schema}' and TABLE_NAME='{_table}'"
|
||||
try:
|
||||
_df = pd.read_sql(_sql,self._engine)
|
||||
return _df.to_dict(orient='records')
|
||||
except Exception as e:
|
||||
print (e)
|
||||
pass
|
||||
return []
|
||||
class Reader (Drill,BaseReader) :
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
self._chunksize = 0 if 'chunksize' not in _args else _args['chunksize']
|
||||
self._engine= sqa.create_engine(self._get_uri(),future=True)
|
||||
class Writer(Drill,BaseWriter):
|
||||
def __init__(self,**_args):
|
||||
super().__init__(self,**_args)
|
@ -1,151 +0,0 @@
|
||||
"""
|
||||
dependency:
|
||||
- spark and SPARK_HOME environment variable must be set
|
||||
NOTE:
|
||||
When using streaming option, insure that it is inline with default (1000 rows) or increase it in spark-defaults.conf
|
||||
|
||||
"""
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark import SparkContext
|
||||
from pyspark.sql.types import *
|
||||
from pyspark.sql.functions import col, to_date, to_timestamp
|
||||
import copy
|
||||
|
||||
class Iceberg :
|
||||
def __init__(self,**_args):
|
||||
"""
|
||||
providing catalog meta information (you must get this from apache iceberg)
|
||||
"""
|
||||
#
|
||||
# Turning off logging (it's annoying & un-professional)
|
||||
#
|
||||
# _spconf = SparkContext()
|
||||
# _spconf.setLogLevel("ERROR")
|
||||
#
|
||||
# @TODO:
|
||||
# Make arrangements for additional configuration elements
|
||||
#
|
||||
self._session = SparkSession.builder.appName("data-transport").getOrCreate()
|
||||
self._session.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
|
||||
# self._session.sparkContext.setLogLevel("ERROR")
|
||||
self._catalog = self._session.catalog
|
||||
self._table = _args['table'] if 'table' in _args else None
|
||||
|
||||
if 'catalog' in _args :
|
||||
#
|
||||
# Let us set the default catalog
|
||||
self._catalog.setCurrentCatalog(_args['catalog'])
|
||||
|
||||
else:
|
||||
# No current catalog has been set ...
|
||||
pass
|
||||
if 'database' in _args :
|
||||
self._database = _args['database']
|
||||
self._catalog.setCurrentDatabase(self._database)
|
||||
else:
|
||||
#
|
||||
# Should we set the default as the first one if available ?
|
||||
#
|
||||
pass
|
||||
self._catalogName = self._catalog.currentCatalog()
|
||||
self._databaseName = self._catalog.currentDatabase()
|
||||
def meta (self,**_args) :
|
||||
"""
|
||||
This function should return the schema of a table (only)
|
||||
"""
|
||||
_schema = []
|
||||
try:
|
||||
_table = _args['table'] if 'table' in _args else self._table
|
||||
_tableName = self._getPrefix(**_args) + f".{_table}"
|
||||
_tmp = self._session.table(_tableName).schema
|
||||
_schema = _tmp.jsonValue()['fields']
|
||||
for _item in _schema :
|
||||
del _item['nullable'],_item['metadata']
|
||||
except Exception as e:
|
||||
|
||||
pass
|
||||
return _schema
|
||||
def _getPrefix (self,**_args):
|
||||
_catName = self._catalogName if 'catalog' not in _args else _args['catalog']
|
||||
_datName = self._databaseName if 'database' not in _args else _args['database']
|
||||
|
||||
return '.'.join([_catName,_datName])
|
||||
def apply(self,_query):
|
||||
"""
|
||||
sql query/command to run against apache iceberg
|
||||
"""
|
||||
return self._session.sql(_query).toPandas()
|
||||
def has (self,**_args):
|
||||
try:
|
||||
_prefix = self._getPrefix(**_args)
|
||||
if _prefix.endswith('.') :
|
||||
return False
|
||||
return _args['table'] in [_item.name for _item in self._catalog.listTables(_prefix)]
|
||||
except Exception as e:
|
||||
print (e)
|
||||
return False
|
||||
|
||||
def close(self):
|
||||
self._session.stop()
|
||||
class Reader(Iceberg) :
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
def read(self,**_args):
|
||||
_table = self._table
|
||||
_prefix = self._getPrefix(**_args)
|
||||
if 'table' in _args or _table:
|
||||
_table = _args['table'] if 'table' in _args else _table
|
||||
_table = _prefix + f'.{_table}'
|
||||
return self._session.table(_table).toPandas()
|
||||
else:
|
||||
sql = _args['sql']
|
||||
return self._session.sql(sql).toPandas()
|
||||
pass
|
||||
class Writer (Iceberg):
|
||||
"""
|
||||
Writing data to an Apache Iceberg data warehouse (using pyspark)
|
||||
"""
|
||||
def __init__(self,**_args):
|
||||
super().__init__(**_args)
|
||||
self._mode = 'append' if 'mode' not in _args else _args['mode']
|
||||
self._table = None if 'table' not in _args else _args['table']
|
||||
def format (self,_schema) :
|
||||
_iceSchema = StructType([])
|
||||
_map = {'integer':IntegerType(),'float':DoubleType(),'double':DoubleType(),'date':DateType(),
|
||||
'timestamp':TimestampType(),'datetime':TimestampType(),'string':StringType(),'varchar':StringType()}
|
||||
for _item in _schema :
|
||||
_name = _item['name']
|
||||
_type = _item['type'].lower()
|
||||
if _type not in _map :
|
||||
_iceType = StringType()
|
||||
else:
|
||||
_iceType = _map[_type]
|
||||
|
||||
_iceSchema.add (StructField(_name,_iceType,True))
|
||||
return _iceSchema if len(_iceSchema) else []
|
||||
def write(self,_data,**_args):
|
||||
_prefix = self._getPrefix(**_args)
|
||||
if 'table' not in _args and not self._table :
|
||||
raise Exception (f"Table Name should be specified for catalog/database {_prefix}")
|
||||
_schema = self.format(_args['schema']) if 'schema' in _args else []
|
||||
if not _schema :
|
||||
rdd = self._session.createDataFrame(_data,verifySchema=False)
|
||||
else :
|
||||
rdd = self._session.createDataFrame(_data,schema=_schema,verifySchema=True)
|
||||
_mode = self._mode if 'mode' not in _args else _args['mode']
|
||||
_table = self._table if 'table' not in _args else _args['table']
|
||||
|
||||
# print (_data.shape,_mode,_table)
|
||||
|
||||
if not self._session.catalog.tableExists(_table):
|
||||
# # @TODO:
|
||||
# # add partitioning information here
|
||||
rdd.writeTo(_table).using('iceberg').create()
|
||||
|
||||
# # _mode = 'overwrite'
|
||||
# # rdd.write.format('iceberg').mode(_mode).saveAsTable(_table)
|
||||
else:
|
||||
# rdd.writeTo(_table).append()
|
||||
# # _table = f'{_prefix}.{_table}'
|
||||
|
||||
rdd.coalesce(10).write.format('iceberg').mode('append').save(_table)
|
Loading…
Reference in new issue