parent
f6919ccd93
commit
1eda49b63a
@ -1,204 +1,23 @@
|
|||||||
# Introduction
|
# Introduction
|
||||||
|
|
||||||
This project implements an abstraction of objects that can have access to a variety of data stores, implementing read/write with a simple and expressive interface. This abstraction works with **NoSQL** and **SQL** data stores and leverages **pandas**.
|
This project implements an abstraction of objects that can have access to a variety of data stores, implementing read/write with a simple and expressive interface. This abstraction works with **NoSQL**, **SQL** and **Cloud** data stores and leverages **pandas**.
|
||||||
|
|
||||||
The supported data store providers :
|
|
||||||
|
|
||||||
| Provider | Underlying Drivers | Description |
|
|
||||||
| :---- | :----: | ----: |
|
|
||||||
| sqlite| Native SQLite|SQLite3|
|
|
||||||
| postgresql| psycopg2 | PostgreSQL
|
|
||||||
| redshift| psycopg2 | Amazon Redshift
|
|
||||||
| s3| boto3 | Amazon Simple Storage Service
|
|
||||||
| netezza| nzpsql | IBM Neteeza
|
|
||||||
| Files: CSV, TSV| pandas| pandas data-frame
|
|
||||||
| Couchdb| cloudant | Couchbase/Couchdb
|
|
||||||
| mongodb| pymongo | Mongodb
|
|
||||||
| mysql| mysql| Mysql
|
|
||||||
| bigquery| google-bigquery| Google BigQuery
|
|
||||||
| mariadb| mysql| Mariadb
|
|
||||||
| rabbitmq|pika| RabbitMQ Publish/Subscribe
|
|
||||||
|
|
||||||
# Why Use Data-Transport ?
|
# Why Use Data-Transport ?
|
||||||
|
|
||||||
Mostly data scientists that don't really care about the underlying database and would like to manipulate data transparently.
|
Mostly data scientists that don't really care about the underlying database and would like a simple and consistent way to read/write data and have will be well served. Additionally we implemented lightweight Extract Transform Loading API and command line (CLI) tool.
|
||||||
|
|
||||||
1. Familiarity with **pandas data-frames**
|
1. Familiarity with **pandas data-frames**
|
||||||
2. Connectivity **drivers** are included
|
2. Connectivity **drivers** are included
|
||||||
3. Mining data from various sources
|
3. Mining data from various sources
|
||||||
4. Useful for data migrations or ETL
|
4. Useful for data migrations or ETL
|
||||||
|
|
||||||
# Usage
|
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
Within the virtual environment perform the following :
|
Within the virtual environment perform the following :
|
||||||
|
|
||||||
pip install git+https://github.com/lnyemba/data-transport.git
|
pip install git+https://github.com/lnyemba/data-transport.git
|
||||||
|
|
||||||
Once installed **data-transport** can be used as a library in code or a command line interface (CLI), as a CLI it is used for ETL and requires a configuration file.
|
|
||||||
|
|
||||||
|
|
||||||
## Data Transport as a Library (in code)
|
|
||||||
---
|
|
||||||
|
|
||||||
The data-transport can be used within code as a library, and offers the following capabilities:
|
|
||||||
|
|
||||||
* Read/Write against [mongodb](https://github.com/lnyemba/data-transport/wiki/mongodb)
|
|
||||||
* Read/Write against tranditional [RDBMS](https://github.com/lnyemba/data-transport/wiki/rdbms)
|
|
||||||
* Read/Write against [bigquery](https://github.com/lnyemba/data-transport/wiki/bigquery)
|
|
||||||
* ETL CLI/Code [ETL](https://github.com/lnyemba/data-transport/wiki/etl)
|
|
||||||
* Support for pre/post conditions i.e it is possible to specify queries to run before or after a read or write
|
|
||||||
|
|
||||||
The read/write functions make data-transport a great candidate for **data-science**; **data-engineering** or all things pertaining to data. It enables operations across multiple data-stores(relational or not)
|
|
||||||
|
|
||||||
## ETL
|
|
||||||
|
|
||||||
**Embedded in Code**
|
|
||||||
|
|
||||||
It is possible to perform ETL within custom code as follows :
|
|
||||||
|
|
||||||
```
|
|
||||||
import transport
|
|
||||||
import time
|
|
||||||
|
|
||||||
_info = [{source:{'provider':'sqlite','path':'/home/me/foo.csv','table':'me',"pipeline":{"pre":[],"post":[]}},target:{provider:'bigquery',private_key='/home/me/key.json','table':'me','dataset':'mydataset'}}, ...]
|
|
||||||
procs = transport.factory.instance(provider='etl',info=_info)
|
|
||||||
#
|
|
||||||
#
|
|
||||||
while procs:
|
|
||||||
procs = [pthread for pthread in procs if pthread.is_alive()]
|
|
||||||
time.sleep(1)
|
|
||||||
```
|
|
||||||
|
|
||||||
**Command Line Interface (CLI):**
|
|
||||||
---
|
|
||||||
The CLI program is called **transport** and it requires a configuration file. The program is intended to move data from one location to another. Supported data stores are in the above paragraphs.
|
|
||||||
|
|
||||||
```
|
|
||||||
[
|
|
||||||
{
|
|
||||||
"id":"logs",
|
|
||||||
"source":{
|
|
||||||
"provider":"postgresql","context":"read","database":"mydb",
|
|
||||||
"cmd":{"sql":"SELECT * FROM logs limit 10"}
|
|
||||||
},
|
|
||||||
"target":{
|
|
||||||
"provider":"bigquery","private_key":"/bgqdrive/account/bq-service-account-key.json",
|
|
||||||
"dataset":"mydataset"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
]
|
|
||||||
```
|
|
||||||
|
|
||||||
Assuming the above content is stored in a file called **etl-config.json**, we would perform the following in a terminal window:
|
|
||||||
|
|
||||||
```
|
|
||||||
[steve@data-transport]$ transport --config ./etl-config.json [--index <value>]
|
|
||||||
```
|
|
||||||
|
|
||||||
**Reading/Writing Mongodb**
|
|
||||||
|
|
||||||
For this example we assume here we are tunneling through port 27018 and there is not access control:
|
|
||||||
|
|
||||||
```
|
|
||||||
import transport
|
|
||||||
reader = factory.instance(provider='mongodb',context='read',host='localhost',port='27018',db='example',doc='logs')
|
|
||||||
|
|
||||||
df = reader.read() #-- reads the entire collection
|
|
||||||
print (df.head())
|
|
||||||
#
|
|
||||||
#-- Applying mongodb command
|
|
||||||
PIPELINE = [{"$group":{"_id":None,"count":{"$sum":1}}}]
|
|
||||||
_command_={"cursor":{},"allowDiskUse":True,"aggregate":"logs","pipeline":PIPLINE}
|
|
||||||
df = reader.read(mongo=_command)
|
|
||||||
print (df.head())
|
|
||||||
reader.close()
|
|
||||||
```
|
|
||||||
**Read/Writing to Mongodb**
|
|
||||||
---
|
|
||||||
|
|
||||||
Scenario 1: Mongodb with security in place
|
|
||||||
|
|
||||||
1. Define an authentication file on disk
|
|
||||||
|
|
||||||
The semantics of the attributes are provided by mongodb, please visit [mongodb documentation](https://mongodb.org/docs). In this example the file is located on _/transport/mongo.json_
|
|
||||||
<div style="display:grid; grid-template-columns:60% auto; gap:4px">
|
|
||||||
<div>
|
|
||||||
<b>configuration file</b>
|
|
||||||
|
|
||||||
```
|
|
||||||
{
|
|
||||||
"username":"me","password":"changeme",
|
|
||||||
"mechanism":"SCRAM-SHA-1",
|
|
||||||
"authSource":"admin"
|
|
||||||
}
|
|
||||||
```
|
|
||||||
<b>Connecting to Mongodb </b>
|
|
||||||
|
|
||||||
```
|
|
||||||
import transport
|
|
||||||
PIPELINE = ... #-- do this yourself
|
|
||||||
MONGO_KEY = '/transport/mongo.json'
|
|
||||||
mreader = transport.factory.instance(provider=transport.providers.MONGODB,auth_file=MONGO_KEY,context='read',db='mydb',doc='logs')
|
|
||||||
_aggregateDF = mreader.read(mongo=PIPELINE) #--results of a aggregate pipeline
|
|
||||||
_collectionDF= mreader.read()
|
|
||||||
|
|
||||||
|
|
||||||
```
|
|
||||||
|
|
||||||
In order to enable write, change **context** attribute to **'read'**.
|
|
||||||
</div>
|
|
||||||
<div>
|
|
||||||
- The configuration file is in JSON format
|
|
||||||
- The commands passed to mongodb are the same as you would if you applied runCommand in mongodb
|
|
||||||
- The output is a pandas data-frame
|
|
||||||
- By default the transport reads, to enable write operations use **context='write'**
|
|
||||||
|
|
||||||
|parameters|description |
|
|
||||||
| --- | --- |
|
|
||||||
|db| Name of the database|
|
|
||||||
|port| Port number to connect to
|
|
||||||
|doc| Name of the collection of documents|
|
|
||||||
|username|Username |
|
|
||||||
|password|password|
|
|
||||||
|authSource|user database that has authentication info|
|
|
||||||
|mechanism|Mechnism used for authentication|
|
|
||||||
|
|
||||||
**NOTE**
|
|
||||||
|
|
||||||
Arguments like **db** or **doc** can be placed in the authentication file
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
|
|
||||||
**Limitations**
|
|
||||||
|
|
||||||
Reads and writes aren't encapsulated in the same object, this is to allow the calling code to deliberately perform actions and hopefully minimize accidents associated with data wrangling.
|
|
||||||
|
|
||||||
|
|
||||||
```
|
|
||||||
import transport
|
|
||||||
improt pandas as pd
|
|
||||||
writer = factory.instance(provider=transport.providers.MONGODB,context='write',host='localhost',port='27018',db='example',doc='logs')
|
|
||||||
|
|
||||||
df = pd.DataFrame({"names":["steve","nico"],"age":[40,30]})
|
|
||||||
writer.write(df)
|
|
||||||
writer.close()
|
|
||||||
```
|
|
||||||
|
|
||||||
|
|
||||||
|
## Learn More
|
||||||
|
|
||||||
#
|
We have available notebooks with sample code to read/write against mongodb, couchdb, Netezza, PostgreSQL, Google Bigquery, Databricks, Microsoft SQL Server, MySQL ... Visit [data-transport homepage](https://healthcareio.the-phi.com/data-transport)
|
||||||
# reading from postgresql
|
|
||||||
|
|
||||||
pgreader = factory.instance(type='postgresql',database=<database>,table=<table_name>)
|
|
||||||
pg.read() #-- will read the table by executing a SELECT
|
|
||||||
pg.read(sql=<sql query>)
|
|
||||||
|
|
||||||
#
|
|
||||||
# Reading a document and executing a view
|
|
||||||
#
|
|
||||||
document = dreader.read()
|
|
||||||
result = couchdb.view(id='<design_doc_id>',view_name=<view_name',<key=value|keys=values>)
|
|
||||||
|
|
@ -0,0 +1,160 @@
|
|||||||
|
{
|
||||||
|
"cells": [
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"#### Writing to Microsoft SQLServer\n",
|
||||||
|
"\n",
|
||||||
|
"1. Insure the Microsoft SQL Server is installed and you have access i.e account information\n",
|
||||||
|
"2. The target database must be created before hand.\n",
|
||||||
|
"3. We created an authentication file that will contain user account and location of the database\n",
|
||||||
|
"\n",
|
||||||
|
"The cell below creates a dataframe that will be stored in a Microsoft SQL Server database.\n",
|
||||||
|
"\n",
|
||||||
|
"**NOTE** This was not tested with a cloud instance"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 1,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"name": "stdout",
|
||||||
|
"output_type": "stream",
|
||||||
|
"text": [
|
||||||
|
"['data transport version ', '2.0.0']\n"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"#\n",
|
||||||
|
"# Writing to Google Bigquery database\n",
|
||||||
|
"#\n",
|
||||||
|
"import transport\n",
|
||||||
|
"from transport import providers\n",
|
||||||
|
"import pandas as pd\n",
|
||||||
|
"import os\n",
|
||||||
|
"\n",
|
||||||
|
"AUTH_FOLDER = os.environ['DT_AUTH_FOLDER'] #-- location of the service key\n",
|
||||||
|
"MSSQL_AUTH_FILE= os.sep.join([AUTH_FOLDER,'mssql.json'])\n",
|
||||||
|
"\n",
|
||||||
|
"_data = pd.DataFrame({\"name\":['James Bond','Steve Rogers','Steve Nyemba'],'age':[55,150,44]})\n",
|
||||||
|
"msw = transport.factory.instance(provider=providers.MSSQL,table='friends',context='write',auth_file=MSSQL_AUTH_FILE)\n",
|
||||||
|
"msw.write(_data,if_exists='replace') #-- default is append\n",
|
||||||
|
"print (['data transport version ', transport.__version__])\n"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"#### Reading from Microsoft SQL Server database\n",
|
||||||
|
"\n",
|
||||||
|
"The cell below reads the data that has been written by the cell above and computes the average age within an MS SQL Server (simple query). \n",
|
||||||
|
"\n",
|
||||||
|
"- Basic read of the designated table (friends) created above\n",
|
||||||
|
"- Execute an aggregate SQL against the table\n",
|
||||||
|
"\n",
|
||||||
|
"**NOTE**\n",
|
||||||
|
"\n",
|
||||||
|
"It is possible to use **transport.factory.instance** or **transport.instance** they are the same. It allows the maintainers to know that we used a factory design pattern."
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 5,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"name": "stdout",
|
||||||
|
"output_type": "stream",
|
||||||
|
"text": [
|
||||||
|
" name age\n",
|
||||||
|
"0 James Bond 55\n",
|
||||||
|
"1 Steve Rogers 150\n",
|
||||||
|
"2 Steve Nyemba 44\n",
|
||||||
|
"\n",
|
||||||
|
"--------- STATISTICS ------------\n",
|
||||||
|
"\n",
|
||||||
|
" _counts \n",
|
||||||
|
"0 3 83\n"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"\n",
|
||||||
|
"import transport\n",
|
||||||
|
"from transport import providers\n",
|
||||||
|
"import os\n",
|
||||||
|
"AUTH_FOLDER = os.environ['DT_AUTH_FOLDER'] #-- location of the service key\n",
|
||||||
|
"MSSQL_AUTH_FILE= os.sep.join([AUTH_FOLDER,'mssql.json'])\n",
|
||||||
|
"\n",
|
||||||
|
"msr = transport.instance(provider=providers.MSSQL,table='friends',auth_file=MSSQL_AUTH_FILE)\n",
|
||||||
|
"_df = msr.read()\n",
|
||||||
|
"_query = 'SELECT COUNT(*) _counts, AVG(age) from friends'\n",
|
||||||
|
"_sdf = msr.read(sql=_query)\n",
|
||||||
|
"print (_df)\n",
|
||||||
|
"print ('\\n--------- STATISTICS ------------\\n')\n",
|
||||||
|
"print (_sdf)"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"The cell bellow show the content of an auth_file, in this case if the dataset/table in question is not to be shared then you can use auth_file with information associated with the parameters.\n",
|
||||||
|
"\n",
|
||||||
|
"**NOTE**:\n",
|
||||||
|
"\n",
|
||||||
|
"The auth_file is intended to be **JSON** formatted"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 3,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"text/plain": [
|
||||||
|
"{'dataset': 'demo', 'table': 'friends'}"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"execution_count": 3,
|
||||||
|
"metadata": {},
|
||||||
|
"output_type": "execute_result"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"\n",
|
||||||
|
"{\n",
|
||||||
|
" \n",
|
||||||
|
" \"dataset\":\"demo\",\"table\":\"friends\",\"username\":\"<username>\",\"password\":\"<password>\"\n",
|
||||||
|
"}"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"metadata": {
|
||||||
|
"kernelspec": {
|
||||||
|
"display_name": "Python 3",
|
||||||
|
"language": "python",
|
||||||
|
"name": "python3"
|
||||||
|
},
|
||||||
|
"language_info": {
|
||||||
|
"codemirror_mode": {
|
||||||
|
"name": "ipython",
|
||||||
|
"version": 3
|
||||||
|
},
|
||||||
|
"file_extension": ".py",
|
||||||
|
"mimetype": "text/x-python",
|
||||||
|
"name": "python",
|
||||||
|
"nbconvert_exporter": "python",
|
||||||
|
"pygments_lexer": "ipython3",
|
||||||
|
"version": "3.9.7"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nbformat": 4,
|
||||||
|
"nbformat_minor": 2
|
||||||
|
}
|
Loading…
Reference in new issue