#!/usr/bin/env python __doc__ = """ (c) 2018 - 2021 data-transport steve@the-phi.com, The Phi Technology LLC https://dev.the-phi.com/git/steve/data-transport.git This program performs ETL between 9 supported data sources : Couchdb, Mongodb, Mysql, Mariadb, PostgreSQL, Netezza,Redshift, Sqlite, File Usage : transport --config --procs @TODO: Create tables if they don't exist for relational databases """ import pandas as pd import numpy as np import json import sys import transport import time from multiprocessing import Process SYS_ARGS = {} if len(sys.argv) > 1: N = len(sys.argv) for i in range(1,N): value = None if sys.argv[i].startswith('--'): key = sys.argv[i][2:] #.replace('-','') SYS_ARGS[key] = 1 if i + 1 < N: value = sys.argv[i + 1] = sys.argv[i+1].strip() if key and value and not value.startswith('--'): SYS_ARGS[key] = value i += 2 class Post(Process): def __init__(self,**args): super().__init__() self.PROVIDER = args['target']['type'] self.writer = transport.factory.instance(**args['target']) self.rows = args['rows'] def run(self): _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows self.writer.write(_info) self.writer.close() class ETL (Process): def __init__(self,**_args): super().__init__() self.name = _args['id'] self.reader = transport.factory.instance(**_args['source']) self._oargs = _args['target'] #transport.factory.instance(**_args['target']) self.JOB_COUNT = _args['jobs'] # self.logger = transport.factory.instance(**_args['logger']) def log(self,**_args) : _args['name'] = self.name print (_args) def run(self): idf = self.reader.read() idf = pd.DataFrame(idf) idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()] self.log(rows=idf.shape[0],cols=idf.shape[1]) # # writing the data to a designated data source # try: self.log(module='write',action='partitioning') rows = np.array_split(np.arange(idf.shape[0]),self.JOB_COUNT) jobs = [] for i in rows : segment = idf.loc[i,:].to_dict(orient='records') proc = Post(target = self._oargs,rows = segment) jobs.append(proc) proc.start() self.log(module='write',action='working ...') while jobs : jobs = [proc for proc in jobs if proc.is_alive()] time.sleep(2) self.log(module='write',action='completed') except Exception as e: print (e) if __name__ == '__main__' : _info = json.loads(open (SYS_ARGS['config']).read()) if 'source' in SYS_ARGS : _info['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}} for _config in _info : _config['jobs'] = 10 if 'jobs' not in SYS_ARGS else SYS_ARGS['jobs'] etl = ETL (**_config) etl.start()