From 883a6ef22f8fa5b08b84fb4f0511eb982c29b96f Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Mon, 19 Sep 2022 10:01:34 -0500 Subject: [PATCH] bug fix & new feature --- transport/__init__.py | 1 + transport/common.py | 19 ++++++++++++++++++- transport/mongo.py | 32 ++++++++++---------------------- 3 files changed, 29 insertions(+), 23 deletions(-) diff --git a/transport/__init__.py b/transport/__init__.py index b4e80fb..baa960e 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -59,6 +59,7 @@ class providers : FILE = 'file' ETL = 'etl' SQLITE = 'sqlite' + SQLITE3= 'sqlite' REDSHIFT = 'redshift' NETEZZA = 'netezza' MYSQL = 'mysql' diff --git a/transport/common.py b/transport/common.py index 2ed7cd2..73f42ac 100644 --- a/transport/common.py +++ b/transport/common.py @@ -115,4 +115,21 @@ class Console(Writer): finally: if self.lock : Console.lock.release() - +""" +@NOTE : Experimental !! +""" +class Proxy : + """ + This class will forward a call to a function that is provided by the user code + """ + def __init__(self,**_args): + self.callback = _args['callback'] + def read(self,**_args) : + try: + return self.callback(**_args) + except Exception as e: + return self.callback() + + pass + def write(self,data,**_args): + self.callback(data,**_args) diff --git a/transport/mongo.py b/transport/mongo.py index bf20482..eb5f2da 100644 --- a/transport/mongo.py +++ b/transport/mongo.py @@ -10,7 +10,7 @@ from bson.binary import Binary import nujson as json from datetime import datetime import pandas as pd - +import numpy as np import gridfs # from transport import Reader,Writer import sys @@ -33,33 +33,15 @@ class Mongo : :username username for authentication :password password for current user """ - # port = str(args['port']) if 'port' in args else '27017' - # host = args['host'] if 'host' in args else 'localhost' - # host = ":".join([host,port]) #-- Formatting host information here - # self.uid = args['doc'] if 'doc' in args else None #-- document identifier - # self.dbname = args['dbname'] if 'dbname' in args else args['db'] + self.authMechanism= 'SCRAM-SHA-256' if 'mechanism' not in args else args['mechanism'] # authSource=(args['authSource'] if 'authSource' in args else self.dbname) self._lock = False if 'lock' not in args else args['lock'] username = password = None - # if 'username' in args and 'password' in args: - # username = args['username'] - # password=args['password'] if 'auth_file' in args : _info = json.loads((open(args['auth_file'])).read()) - # username = _info['username'] - # password = _info['password'] - # if 'mechanism' in _info: - # authMechanism = _info['mechanism'] - # if 'authSource' in _info: - # authSource = _info['authSource'] - # # - # # We are allowing the authentication file to set collection and databases too - # if 'db' in _info : - # self.dbname = _info['db'] - # if 'doc' in _info : - # self.uid = _info['doc'] + else: _info = {} @@ -100,7 +82,8 @@ class Mongo : pass def close(self): self.client.close() - + def meta(self,**_args): + return [] class MongoReader(Mongo,Reader): """ This class will read from a mongodb data store and return the content of a document (not a collection) @@ -113,6 +96,11 @@ class MongoReader(Mongo,Reader): # # @TODO: cmd = args['mongo'] + if "aggregate" in cmd : + if "allowDiskUse" not in cmd : + cmd["allowDiskUse"] = True + if "cursor" not in cmd : + cmd["cursor"] = {} r = [] out = self.db.command(cmd) #@TODO: consider using a yield (generator) works wonders