From 38e1bce6c2f02a0af33801c338a6bf72741b22e1 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sat, 12 Mar 2022 12:25:29 -0600 Subject: [PATCH] bug fixes and optimizations --- README.md | 37 ++++++++++++++++++++++++++++++++++++- setup.py | 2 +- transport/sql.py | 15 +++++++++------ 3 files changed, 46 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 805fb8f..5ec2113 100644 --- a/README.md +++ b/README.md @@ -35,9 +35,44 @@ Within the virtual environment perform the following : pip install git+https://dev.the-phi.com/git/steve/data-transport.git +Once installed **data-transport** can be used as a library in code or a command line interface (CLI) +## Data Transport as a Library (in code) +--- + +The data-transport can be used within code as a library +* Read/Write against [mongodb](https://github.com/lnyemba/data-transport/wiki/mongodb) +* Read/Write against tranditional [RDBMS](https://github.com/lnyemba/data-transport/wiki/rdbms) +* Read/Write against [bigquery](https://github.com/lnyemba/data-transport/wiki/bigquery) + +The read/write functions make data-transport a great candidate for **data-science**; **data-engineering** or all things pertaining to data. It enables operations across multiple data-stores(relational or not) + +## Command Line Interface (CLI) +--- +The CLI program is called **transport** and it requires a configuratio file + +``` +[ + { + "id":"logs", + "source":{ + "provider":"postgresql","context":"read","database":"mydb", + "cmd":{"sql":"SELECT * FROM logs limit 10"} + }, + "target":{ + "provider":"bigquery","private_key":"/bgqdrive/account/bq-service-account-key.json", + "dataset":"mydataset" + } + }, + +] +``` -## In code (Embedded) +Assuming the above content is stored in a file called **etl-config.json**, we would perform the following in a terminal window: + +``` +[steve@data-transport]$ transport --config ./etl-config.json [--index ] +``` **Reading/Writing Mongodb** diff --git a/setup.py b/setup.py index 066ebc7..c4ca8ea 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ args = { "license":"MIT", "packages":["transport"]} args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] -args["install_requires"] = ['pymongo','pandas','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] +args["install_requires"] = ['pymongo','sqlalchemy','pandas','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" args['scripts'] = ['bin/transport'] if sys.version_info[0] == 2 : diff --git a/transport/sql.py b/transport/sql.py index 6d44976..19d8ecc 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -125,9 +125,9 @@ class SQLRW : _out = None try: if "select" in _sql.lower() : - cursor.close() - _conn = self._engine.connect() if self._engine else self.conn - return pd.read_sql(_sql,_conn) + + # _conn = self._engine if self._engine else self.conn + return pd.read_sql(_sql,self.conn) else: # Executing a command i.e no expected return values ... cursor.execute(_sql) @@ -151,7 +151,8 @@ class SQLReader(SQLRW,Reader) : if 'sql' in _args : _sql = (_args['sql']) else: - _sql = "SELECT :fields FROM "+self.table + table = self.table if self.table is not None else _args['table'] + _sql = "SELECT :fields FROM "+self._tablename(table) if 'filter' in _args : _sql = _sql +" WHERE "+_args['filter'] _fields = '*' if not self.fields else ",".join(self.fields) @@ -220,7 +221,7 @@ class SQLWriter(SQLRW,Writer): # cursor.close() self.conn.commit() pass - def write(self,info): + def write(self,info,**_args): """ :param info writes a list of data to a given set of fields """ @@ -324,7 +325,8 @@ class BQReader(BigQuery,Reader) : def __init__(self,**_args): super().__init__(**_args) - + def apply(self,sql): + self.read(sql=sql) pass def read(self,**_args): SQL = None @@ -359,6 +361,7 @@ class BQWriter(BigQuery,Writer): try: if self.parallel or 'lock' in _args : BQWriter.lock.acquire() + _args['table'] = self.table if 'table' not in _args else _args['table'] self._write(_info,**_args) finally: if self.parallel: