From ccc05acc01f436770a6e2cd0260d9b96ee5d6dbc Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Thu, 9 Dec 2021 15:25:58 -0600 Subject: [PATCH] bug fix: etl engine, sqlite inserts --- bin/transport | 52 +++++++++++++++++++++++++++++++++++++------ setup.py | 2 +- transport/__init__.py | 27 +++++++++++----------- transport/disk.py | 22 ++++++++++++------ 4 files changed, 74 insertions(+), 29 deletions(-) mode change 100644 => 100755 bin/transport diff --git a/bin/transport b/bin/transport old mode 100644 new mode 100755 index b2c2503..01c5f71 --- a/bin/transport +++ b/bin/transport @@ -16,7 +16,17 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLI Usage : transport --config --procs @TODO: Create tables if they don't exist for relational databases +example of configuration : +1. Move data from a folder to a data-store + transport [--folder ] --config #-- assuming the configuration doesn't have folder + transport --folder --provider -- --table|doc +In this case the configuration should look like : + {folder:..., target:{}} +2. Move data from one source to another + transport --config + {source:{..},target:{..}} or [{source:{..},target:{..}},{source:{..},target:{..}}] + """ import pandas as pd @@ -46,11 +56,23 @@ if len(sys.argv) > 1: class Post(Process): def __init__(self,**args): super().__init__() - self.PROVIDER = args['target']['type'] - self.writer = transport.factory.instance(**args['target']) + + if 'provider' not in args['target'] : + self.PROVIDER = args['target']['type'] + self.writer = transport.factory.instance(**args['target']) + else: + self.PROVIDER = args['target']['provider'] + args['target']['context'] = 'write' + + self.writer = transport.instance(**args['target']) + # + # If the table doesn't exists maybe create it ? + # self.rows = args['rows'] + def run(self): - _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows + _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows + self.writer.write(_info) self.writer.close() @@ -59,7 +81,19 @@ class ETL (Process): def __init__(self,**_args): super().__init__() self.name = _args['id'] - self.reader = transport.factory.instance(**_args['source']) + if 'provider' not in _args['source'] : + #@deprecate + self.reader = transport.factory.instance(**_args['source']) + else: + # + # This is the new interface + _args['source']['context'] = 'read' + + self.reader = transport.instance(**_args['source']) + # + # do we have an sql query provided or not .... + # self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None + self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None self._oargs = _args['target'] #transport.factory.instance(**_args['target']) self.JOB_COUNT = _args['jobs'] self.jobs = [] @@ -68,8 +102,11 @@ class ETL (Process): _args['name'] = self.name print (_args) def run(self): - idf = self.reader.read() - idf = pd.DataFrame(idf) + if self.cmd : + idf = self.reader.read(**self.cmd) + else: + idf = self.reader.read() + idf = pd.DataFrame(idf) idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()] self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT) @@ -79,7 +116,8 @@ class ETL (Process): try: self.log(module='write',action='partitioning') rows = np.array_split(np.arange(idf.shape[0]),self.JOB_COUNT) - + # + # @TODO: locks for i in rows : _id = 'segment #'.join([str(rows.index(i)),self.name]) segment = idf.loc[i,:] #.to_dict(orient='records') diff --git a/setup.py b/setup.py index fdb98a6..9d4ff7e 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { "name":"data-transport", - "version":"1.4.0", + "version":"1.4.1", "author":"The Phi Technology LLC","author_email":"info@the-phi.com", "license":"MIT", "packages":["transport"]} diff --git a/transport/__init__.py b/transport/__init__.py index 55ab7b0..5283f11 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -71,16 +71,7 @@ from google.cloud import bigquery as bq import nzpy as nz #--- netezza drivers import os -RDBMS = { - - "postgresql":{"port":"5432","driver":pg}, - "redshift":{"port":"5432","driver":pg}, - "netezza":{"port":"5480","driver":nz}, - "mysql":{"port":"3306","driver":my}, - "mariadb":{"port":"3306","driver":my}, - "mongodb":{"port":"27017","class":{"read"}}, - "couchdb":{"port":"5984"} -} + class factory : TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}} PROVIDERS = { @@ -91,9 +82,14 @@ class factory : "bigquery":{"class":{"read":sql.BQReader,"write":sql.BQWriter}}, "mysql":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"}}, "mariadb":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"}}, - "mongo":{"port":27017,"host":"localhost","class":{"read":mongo.MongoReader,"write":mongo.MongoWriter}}, - "couch":{"port":5984,"host":"localhost","class":{"read":couch.CouchReader,"write":couch.CouchWriter}}, + "mongo":{"port":27017,"host":"localhost","class":{"read":mongo.MongoReader,"write":mongo.MongoWriter}}, + "couch":{"port":5984,"host":"localhost","class":{"read":couch.CouchReader,"write":couch.CouchWriter}}, "netezza":{"port":5480,"driver":nz,"default":{"type":"VARCHAR(256)"}}} + # + # creating synonyms + PROVIDERS['mongodb'] = PROVIDERS['mongo'] + PROVIDERS['couchdb'] = PROVIDERS['couch'] + PROVIDERS['sqlite3'] = PROVIDERS['sqlite'] @staticmethod def instance(**args): @@ -126,14 +122,17 @@ class factory : return anObject import time -def instance(provider,context,**_args): +def instance(**_args): """ @param provider {file,sqlite,postgresql,redshift,bigquery,netezza,mongo,couch ...} @param context read|write|rw @param _args argument to got with the datastore (username,password,host,port ...) """ - _id = context if context in ['read','write'] else None + + provider = _args['provider'] + context = _args['context'] + _id = context if context in ['read','write'] else 'read' if _id : args = {'provider':_id} for key in factory.PROVIDERS[provider] : diff --git a/transport/disk.py b/transport/disk.py index 0f1f6fb..89ab75b 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -114,6 +114,7 @@ class DiskWriter(Writer): class SQLiteReader (DiskReader): def __init__(self,**args): DiskReader.__init__(self,**args) + self.path = args['database'] if 'database' in args else args['path'] self.conn = sqlite3.connect(self.path,isolation_level=None) self.conn.row_factory = sqlite3.Row self.table = args['table'] @@ -145,7 +146,7 @@ class SQLiteWriter(DiskWriter) : DiskWriter.__init__(self,**args) self.table = args['table'] - self.conn = sqlite3.connect(self.path,isolation_level=None) + self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE") self.conn.row_factory = sqlite3.Row self.fields = args['fields'] if 'fields' in args else [] @@ -184,20 +185,27 @@ class SQLiteWriter(DiskWriter) : if not self.fields : self.init(list(info.keys())) - if type(info) != list : + if type(info) == object : info = [info] + elif type(info) == pd.DataFrame : + info = info.to_dict(orient='records') SQLiteWriter.LOCK.acquire() try: + cursor = self.conn.cursor() - sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(':values')"]) + sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(:values)"]) for row in info : - stream = json.dumps(row) - stream = stream.replace("'","''") - cursor.execute(sql.replace(":values",stream) ) + stream =["".join(["",value,""]) if type(value) == str else value for value in row.values()] + stream = json.dumps(stream).replace("[","").replace("]","") + + + self.conn.execute(sql.replace(":values",stream) ) + # cursor.commit() - # self.conn.commit() + self.conn.commit() # print (sql) except Exception as e : + print (e) pass SQLiteWriter.LOCK.release() \ No newline at end of file