This framework allows read/write and ETL against many SQL, Cloud, NoSQL databases, and other persistent data stores. Additional features include support for user-defined plugin pre/post processing functions
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
Go to file
Steve Nyemba f57cd8d871
version update
1 year ago
bin bug fix: ... 2 years ago
transport version update 1 year ago
.gitignore documentation and housekeeping work 5 years ago
README.md bug fix: mongodb read 2 years ago
requirements.txt S3 Requirments file 7 years ago
setup.py bug fix: schema specification in sql handling 1 year ago

README.md

Introduction

This project implements an abstraction of objects that can have access to a variety of data stores, implementing read/write with a simple and expressive interface. This abstraction works with NoSQL and SQL data stores and leverages pandas.

The supported data store providers :

Provider Underlying Drivers Description
sqlite Native SQLite SQLite3
postgresql psycopg2 PostgreSQL
redshift psycopg2 Amazon Redshift
s3 boto3 Amazon Simple Storage Service
netezza nzpsql IBM Neteeza
Files: CSV, TSV pandas pandas data-frame
Couchdb cloudant Couchbase/Couchdb
mongodb pymongo Mongodb
mysql mysql Mysql
bigquery google-bigquery Google BigQuery
mariadb mysql Mariadb
rabbitmq pika RabbitMQ Publish/Subscribe

Why Use Data-Transport ?

Mostly data scientists that don't really care about the underlying database and would like to manipulate data transparently.

  1. Familiarity with pandas data-frames
  2. Connectivity drivers are included
  3. Mining data from various sources
  4. Useful for data migrations or ETL

Usage

Installation

Within the virtual environment perform the following :

pip install git+https://github.com/lnyemba/data-transport.git

Once installed data-transport can be used as a library in code or a command line interface (CLI), as a CLI it is used for ETL and requires a configuration file.

Data Transport as a Library (in code)


The data-transport can be used within code as a library, and offers the following capabilities:

  • Read/Write against mongodb
  • Read/Write against tranditional RDBMS
  • Read/Write against bigquery
  • ETL CLI/Code ETL
  • Support for pre/post conditions i.e it is possible to specify queries to run before or after a read or write

The read/write functions make data-transport a great candidate for data-science; data-engineering or all things pertaining to data. It enables operations across multiple data-stores(relational or not)

ETL

Embedded in Code

It is possible to perform ETL within custom code as follows :

    import transport
    import time
    
    _info = [{source:{'provider':'sqlite','path':'/home/me/foo.csv','table':'me',"pipeline":{"pre":[],"post":[]}},target:{provider:'bigquery',private_key='/home/me/key.json','table':'me','dataset':'mydataset'}}, ...]    
    procs = transport.factory.instance(provider='etl',info=_info)
    #
    #
    while procs:
        procs = [pthread for pthread in procs if pthread.is_alive()]
        time.sleep(1)

Command Line Interface (CLI):

The CLI program is called transport and it requires a configuration file. The program is intended to move data from one location to another. Supported data stores are in the above paragraphs.

[
    {
    "id":"logs",
    "source":{
        "provider":"postgresql","context":"read","database":"mydb",
        "cmd":{"sql":"SELECT * FROM logs limit 10"}
        },
    "target":{
        "provider":"bigquery","private_key":"/bgqdrive/account/bq-service-account-key.json",
        "dataset":"mydataset"
        }    
    },
    
]

Assuming the above content is stored in a file called etl-config.json, we would perform the following in a terminal window:

[steve@data-transport]$ transport --config ./etl-config.json [--index <value>]

Reading/Writing Mongodb

For this example we assume here we are tunneling through port 27018 and there is not access control:

import transport
reader = factory.instance(provider='mongodb',context='read',host='localhost',port='27018',db='example',doc='logs')

df = reader.read() #-- reads the entire collection
print (df.head())
#
#-- Applying mongodb command
PIPELINE = [{"$group":{"_id":None,"count":{"$sum":1}}}]
_command_={"cursor":{},"allowDiskUse":True,"aggregate":"logs","pipeline":PIPLINE}
df = reader.read(mongo=_command)
print (df.head())
reader.close()

Read/Writing to Mongodb

Scenario 1: Mongodb with security in place

  1. Define an authentication file on disk

    The semantics of the attributes are provided by mongodb, please visit mongodb documentation. In this example the file is located on /transport/mongo.json

configuration file
{
    "username":"me","password":"changeme",
    "mechanism":"SCRAM-SHA-1",
    "authSource":"admin"
}

Connecting to Mongodb

import transport
PIPELINE = ... #-- do this yourself
MONGO_KEY = '/transport/mongo.json'
mreader = transport.factory.instance(provider=transport.providers.MONGODB,auth_file=MONGO_KEY,context='read',db='mydb',doc='logs')
_aggregateDF = mreader.read(mongo=PIPELINE) #--results of a aggregate pipeline
_collectionDF= mreader.read()


In order to enable write, change context attribute to 'read'.

- The configuration file is in JSON format - The commands passed to mongodb are the same as you would if you applied runCommand in mongodb - The output is a pandas data-frame - By default the transport reads, to enable write operations use **context='write'**
parameters description
db Name of the database
port Port number to connect to
doc Name of the collection of documents
username Username
password password
authSource user database that has authentication info
mechanism Mechnism used for authentication

NOTE

Arguments like db or doc can be placed in the authentication file

Limitations

Reads and writes aren't encapsulated in the same object, this is to allow the calling code to deliberately perform actions and hopefully minimize accidents associated with data wrangling.

import transport
improt pandas as pd
writer = factory.instance(provider=transport.providers.MONGODB,context='write',host='localhost',port='27018',db='example',doc='logs')

df = pd.DataFrame({"names":["steve","nico"],"age":[40,30]})
writer.write(df)
writer.close()
#
# reading from postgresql

pgreader     = factory.instance(type='postgresql',database=<database>,table=<table_name>)
pg.read()   #-- will read the table by executing a SELECT
pg.read(sql=<sql query>)

#
# Reading a document and executing a view
#
document    = dreader.read()    
result      = couchdb.view(id='<design_doc_id>',view_name=<view_name',<key=value|keys=values>)