From b239a5149ff1f7497f069a2bfc84b4a7ce0ad4ac Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sat, 29 Jan 2022 11:15:45 -0600 Subject: [PATCH] bug fixes and simplifying interfaces --- setup.py | 2 +- transport/__init__.py | 53 +++++++++++++++--------------------- transport/disk.py | 63 ++++++++++++++++++++++++++++++++++--------- transport/sql.py | 48 ++++++++++++++++++++++++++------- 4 files changed, 111 insertions(+), 55 deletions(-) diff --git a/setup.py b/setup.py index 9d4ff7e..ed82db4 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { "name":"data-transport", - "version":"1.4.1", + "version":"1.4.4", "author":"The Phi Technology LLC","author_email":"info@the-phi.com", "license":"MIT", "packages":["transport"]} diff --git a/transport/__init__.py b/transport/__init__.py index 5283f11..94b01eb 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -1,45 +1,24 @@ """ -Data Transport - 1.0 -Steve L. Nyemba, The Phi Technology LLC +Data Transport, The Phi Technology LLC +Steve L. Nyemba, steve@the-phi.com -This module is designed to serve as a wrapper to a set of supported data stores : +This library is designed to serve as a wrapper to a set of supported data stores : - couchdb - mongodb - Files (character delimited) - Queues (RabbmitMq) - Session (Flask) - s3 + - sqlite The supported operations are read/write and providing meta data to the calling code Requirements : pymongo boto couldant The configuration for the data-store is as follows : - couchdb: - { - args:{ - url:, - username:, - password:, - dbname:, - doc: - } - } - RabbitMQ: - { - - } - Mongodb: - { - args:{ - host:, #localhost:27017 - username:, - password:, - dbname:, - doc:s - - } - } + e.g: + mongodb + provider:'mongodb',[port:27017],[host:localhost],db:,doc:<_name>,context: """ __author__ = 'The Phi Technology' import pandas as pd @@ -90,9 +69,17 @@ class factory : PROVIDERS['mongodb'] = PROVIDERS['mongo'] PROVIDERS['couchdb'] = PROVIDERS['couch'] PROVIDERS['sqlite3'] = PROVIDERS['sqlite'] - + @staticmethod - def instance(**args): + def instance(**_args): + if 'type' in _args : + # + # Legacy code being returned + return factory._instance(**_args); + else: + return instance(**_args) + @staticmethod + def _instance(**args): """ This class will create an instance of a transport when providing :type name of the type we are trying to create @@ -131,7 +118,7 @@ def instance(**_args): """ provider = _args['provider'] - context = _args['context'] + context = _args['context']if 'context' in _args else None _id = context if context in ['read','write'] else 'read' if _id : args = {'provider':_id} @@ -142,6 +129,7 @@ def instance(**_args): args[key] = value # # + args = dict(args,**_args) # print (provider in factory.PROVIDERS) @@ -149,6 +137,7 @@ def instance(**_args): pointer = factory.PROVIDERS[provider]['class'][_id] else: pointer = sql.SQLReader if _id == 'read' else sql.SQLWriter + return pointer(**args) - return None \ No newline at end of file + return None diff --git a/transport/disk.py b/transport/disk.py index 89ab75b..14bb8a0 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -111,13 +111,47 @@ class DiskWriter(Writer): pass finally: DiskWriter.THREAD_LOCK.release() -class SQLiteReader (DiskReader): - def __init__(self,**args): - DiskReader.__init__(self,**args) - self.path = args['database'] if 'database' in args else args['path'] - self.conn = sqlite3.connect(self.path,isolation_level=None) +class SQLite : + def __init__(self,**_args) : + self.path = _args['database'] if 'database' in _args else _args['path'] + self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE") self.conn.row_factory = sqlite3.Row - self.table = args['table'] + self.fields = _args['fields'] if 'fields' in _args else [] + def has (self,**_args): + found = False + try: + if 'table' in _args : + table = _args['table'] + sql = "SELECT * FROM :table limit 1".replace(":table",table) + _df = pd.read_sql(sql,self.conn) + found = _df.columns.size > 0 + except Exception as e: + pass + return found + def close(self): + try: + self.conn.close() + except Exception as e : + print(e) + def apply(self,sql): + try: + if not sql.lower().startswith('select'): + cursor = self.conn.cursor() + cursor.execute(sql) + cursor.close() + self.conn.commit() + else: + return pd.read_sql(sql,self.conn) + except Exception as e: + print (e) +class SQLiteReader (SQLite,DiskReader): + def __init__(self,**args): + super().__init__(**args) + # DiskReader.__init__(self,**args) + # self.path = args['database'] if 'database' in args else args['path'] + # self.conn = sqlite3.connect(self.path,isolation_level=None) + # self.conn.row_factory = sqlite3.Row + self.table = args['table'] if 'table' in args else None def read(self,**args): if 'sql' in args : sql = args['sql'] @@ -135,7 +169,7 @@ class SQLiteReader (DiskReader): except Exception as e : pass -class SQLiteWriter(DiskWriter) : +class SQLiteWriter(SQLite,DiskWriter) : connection = None LOCK = Lock() def __init__(self,**args): @@ -143,12 +177,13 @@ class SQLiteWriter(DiskWriter) : :path :fields json|csv """ - DiskWriter.__init__(self,**args) - self.table = args['table'] + # DiskWriter.__init__(self,**args) + super().__init__(**args) + self.table = args['table'] if 'table' in args else None - self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE") - self.conn.row_factory = sqlite3.Row - self.fields = args['fields'] if 'fields' in args else [] + # self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE") + # self.conn.row_factory = sqlite3.Row + # self.fields = args['fields'] if 'fields' in args else [] if self.fields and not self.isready(): self.init(self.fields) @@ -185,7 +220,7 @@ class SQLiteWriter(DiskWriter) : if not self.fields : self.init(list(info.keys())) - if type(info) == object : + if type(info) == dict : info = [info] elif type(info) == pd.DataFrame : info = info.to_dict(orient='records') @@ -196,6 +231,8 @@ class SQLiteWriter(DiskWriter) : cursor = self.conn.cursor() sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(:values)"]) for row in info : + print (row) + print (row.values()) stream =["".join(["",value,""]) if type(value) == str else value for value in row.values()] stream = json.dumps(stream).replace("[","").replace("]","") diff --git a/transport/sql.py b/transport/sql.py index 6d693d8..c5c8f89 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -75,7 +75,15 @@ class SQLRW : _info['securityLevel'] = 0 del _info['dbname'] self.conn = _handler.connect(**_info) - + def has(self,**_args): + found = False + try: + table = _args['table'] + sql = "SELECT * FROM :table LIMIT 1".replace(":table",table) + found = pd.read_sql(sql,self.conn).shape[0] + except Exception as e: + pass + return found def isready(self): _sql = "SELECT * FROM :table LIMIT 1".replace(":table",self.table) try: @@ -201,8 +209,14 @@ class SQLWriter(SQLRW,Writer): # values = [ "".join(["'",str(_row[key]),"'"]) if np.nan(_row[key]).isnumeric() else str(_row[key]) for key in _row] # print (values) query = _sql.replace(":fields",",".join(fields)).replace(":values",values) - - cursor.execute(query,_row.values()) + if type(info) == pd.DataFrame : + _values = info.values.tolist() + elif type(info) == list and type(info[0]) == dict: + print ('........') + _values = [tuple(item.values()) for item in info] + else: + _values = info; + cursor.execute(query,_values) pass @@ -210,14 +224,23 @@ class SQLWriter(SQLRW,Writer): _fields = ",".join(self.fields) # _sql = _sql.replace(":fields",_fields) # _sql = _sql.replace(":values",",".join(["%("+name+")s" for name in self.fields])) - _sql = _sql.replace("(:fields)","") + # _sql = _sql.replace("(:fields)","") + _sql = _sql.replace(":fields",_fields) values = ", ".join("?"*len(self.fields)) if self._provider == 'netezza' else ",".join(["%s" for name in self.fields]) _sql = _sql.replace(":values",values) - - # for row in info : - # values = ["'".join(["",value,""]) if not str(value).isnumeric() else value for value in row.values()] - - cursor.executemany(_sql,info) + if type(info) == pd.DataFrame : + _info = info[self.fields].values.tolist() + elif type(info) == dict : + _info = info.values() + else: + # _info = [] + + _info = pd.DataFrame(info)[self.fields].values.tolist() + # for row in info : + + # if type(row) == dict : + # _info.append( list(row.values())) + cursor.executemany(_sql,_info) # self.conn.commit() except Exception as e: @@ -250,6 +273,13 @@ class BigQuery: client = bq.Client.from_service_account_json(self.path) ref = client.dataset(self.dataset).table(table) return client.get_table(ref).schema + def has(self,**_args): + found = False + try: + found = self.meta(**_args) is not None + except Exception as e: + pass + return found class BQReader(BigQuery,Reader) : def __init__(self,**_args):