diff --git a/bin/transport b/bin/transport new file mode 100644 index 0000000..9f016e4 --- /dev/null +++ b/bin/transport @@ -0,0 +1,91 @@ +#!/usr/bin/env python +__doc__ = """ +(c) 2018 - 2021 data-transport +steve@the-phi.com, The Phi Technology LLC +https://dev.the-phi.com/git/steve/data-transport.git + +This program performs ETL between 9 supported data sources : Couchdb, Mongodb, Mysql, Mariadb, PostgreSQL, Netezza,Redshift, Sqlite, File +Usage : + transport --config --procs +@TODO: Create tables if they don't exist for relational databases +""" +import pandas as pd +import numpy as np +import json +import sys +import transport +import time +from multiprocessing import Process +SYS_ARGS = {} +if len(sys.argv) > 1: + + N = len(sys.argv) + for i in range(1,N): + value = None + if sys.argv[i].startswith('--'): + key = sys.argv[i][2:] #.replace('-','') + SYS_ARGS[key] = 1 + if i + 1 < N: + value = sys.argv[i + 1] = sys.argv[i+1].strip() + if key and value and not value.startswith('--'): + SYS_ARGS[key] = value + + + i += 2 + +class Post(Process): + def __init__(self,**args): + super().__init__() + self.PROVIDER = args['target']['type'] + self.writer = transport.factory.instance(**args['target']) + self.rows = args['rows'] + def run(self): + _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows + self.writer.write(_info) + self.writer.close() + + +class ETL (Process): + def __init__(self,**_args): + super().__init__() + self.name = _args['id'] + self.reader = transport.factory.instance(**_args['source']) + self._oargs = _args['target'] #transport.factory.instance(**_args['target']) + self.JOB_COUNT = _args['jobs'] + # self.logger = transport.factory.instance(**_args['logger']) + def log(self,**_args) : + _args['name'] = self.name + print (_args) + def run(self): + idf = self.reader.read() + idf = pd.DataFrame(idf) + idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()] + self.log(rows=idf.shape[0],cols=idf.shape[1]) + + # + # writing the data to a designated data source + # + try: + self.log(module='write',action='partitioning') + rows = np.array_split(np.arange(idf.shape[0]),self.JOB_COUNT) + jobs = [] + for i in rows : + segment = idf.loc[i,:].to_dict(orient='records') + proc = Post(target = self._oargs,rows = segment) + jobs.append(proc) + proc.start() + + self.log(module='write',action='working ...') + while jobs : + jobs = [proc for proc in jobs if proc.is_alive()] + time.sleep(2) + self.log(module='write',action='completed') + except Exception as e: + print (e) +if __name__ == '__main__' : + _config = json.loads(open (SYS_ARGS['config']).read()) + _config['jobs'] = 10 if 'jobs' not in SYS_ARGS else SYS_ARGS['jobs'] + + for _config in _info : + etl = ETL (**_config) + etl.start() \ No newline at end of file diff --git a/setup.py b/setup.py index a599ca0..a974ef7 100644 --- a/setup.py +++ b/setup.py @@ -8,14 +8,14 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { "name":"data-transport", - "version":"1.3.8.6.1", + "version":"1.3.8.8", "author":"The Phi Technology LLC","author_email":"info@the-phi.com", "license":"MIT", "packages":["transport"]} args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] args["install_requires"] = ['pymongo','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" - +args['scripts'] = ['bin/transport'] if sys.version_info[0] == 2 : args['use_2to3'] = True args['use_2to3_exclude_fixers']=['lib2to3.fixes.fix_import'] diff --git a/transport/__init__.py b/transport/__init__.py index 14df482..289678e 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -42,7 +42,8 @@ The configuration for the data-store is as follows : } """ __author__ = 'The Phi Technology' -import numpy as np +import pandas as pd +import numpy as np import json import importlib import sys @@ -97,6 +98,9 @@ class factory : print(['Error ',e]) return anObject +import time + + # class Reader: # def __init__(self): # self.nrows = 0 diff --git a/transport/sql.py b/transport/sql.py index 72e12c6..96f3489 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -87,13 +87,14 @@ class SQLRW : # Executing a command i.e no expected return values ... cursor.execute(_sql) self.conn.commit() - + except Exception as e : + print (e) finally: self.conn.commit() cursor.close() def close(self): try: - self.connect.close() + self.conn.close() except Exception as error : print (error) pass @@ -112,6 +113,12 @@ class SQLReader(SQLRW,Reader) : if 'limit' in _args : _sql = _sql + " LIMIT "+str(_args['limit']) return self.apply(_sql) + def close(self) : + try: + self.conn.close() + except Exception as error : + print (error) + pass class SQLWriter(SQLRW,Writer): def __init__(self,**_args) : @@ -122,7 +129,7 @@ class SQLWriter(SQLRW,Writer): # NOTE: Proper data type should be set on the target system if their source is unclear. self._inspect = False if 'inspect' not in _args else _args['inspect'] self._cast = False if 'cast' not in _args else _args['cast'] - def init(self,fields): + def init(self,fields=None): if not fields : try: self.fields = pd.read_sql("SELECT * FROM :table LIMIT 1".replace(":table",self.table),self.conn).columns.tolist() @@ -192,6 +199,7 @@ class SQLWriter(SQLRW,Writer): # self.conn.commit() except Exception as e: + print(e) pass finally: self.conn.commit()