From e4a1ef8dd7644dceb6da3478898f58212774607f Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sat, 26 Sep 2020 16:53:33 -0500 Subject: [PATCH] bug fix --- setup.py | 2 +- transport/disk.py | 6 +++--- transport/mongo.py | 32 +++++++++++++++++++++++++++++--- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/setup.py b/setup.py index 0745b4e..3676055 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { "name":"data-transport", - "version":"1.2.0", + "version":"1.2.2", "author":"The Phi Technology LLC","author_email":"info@the-phi.com", "license":"MIT", "packages":["transport"]} diff --git a/transport/disk.py b/transport/disk.py index c36489d..25ff336 100644 --- a/transport/disk.py +++ b/transport/disk.py @@ -106,7 +106,7 @@ class DiskWriter(Writer): finally: DiskWriter.THREAD_LOCK.release() class SQLiteReader (DiskReader): - def __init__(self,**args) + def __init__(self,**args): DiskReader.__init__(self,**args) self.conn = sqlite3.connect(self.path,isolation_level=None) self.conn.row_factory = sqlite3.Row @@ -114,10 +114,10 @@ class SQLiteReader (DiskReader): def read(self,**args): if 'sql' in args : sql = args['sql'] - else if 'filter' in args : + elif 'filter' in args : sql = "SELECT :fields FROM ",self.table, "WHERE (:filter)".replace(":filter",args['filter']) sql = sql.replace(":fields",args['fields']) if 'fields' in args else sql.replace(":fields","*") - return = pd.read_sql(sql,self.conn) + return pd.read_sql(sql,self.conn) def close(self): try: self.conn.close() diff --git a/transport/mongo.py b/transport/mongo.py index 72a439c..2fa0f60 100644 --- a/transport/mongo.py +++ b/transport/mongo.py @@ -17,6 +17,7 @@ if sys.version_info[0] > 2 : else: from common import Reader, Writer import json +import re class Mongo : """ Basic mongodb functions are captured here @@ -54,9 +55,34 @@ class MongoReader(Mongo,Reader): def __init__(self,**args): Mongo.__init__(self,**args) def read(self,**args): - collection = self.db[self.uid] - _filter = args['filter'] if 'filter' in args else {} - return collection.find(_filter) + if 'mongo' in args : + # + # @TODO: + cmd = args['mongo'] + r = [] + out = self.db.command(cmd) + #@TODO: consider using a yield (generator) works wonders + while True : + if 'cursor' in out : + key = 'firstBatch' if 'firstBatch' in out['cursor'] else 'nextBatch' + else: + key = 'n' + if 'cursor' in out and out['cursor'][key] : + r += list(out['cursor'][key]) + elif out[key]: + r.append (out[key]) + # yield out['cursor'][key] + if key not in ['firstBatch','nextBatch'] or ('cursor' in out and out['cursor']['id'] == 0) : + break + else: + out = self.db.command({"getMore":out['cursor']['id'],"collection":out['cursor']['ns'].split(".")[-1]}) + + + return r + else: + collection = self.db[self.uid] + _filter = args['filter'] if 'filter' in args else {} + return collection.find(_filter) def view(self,**args): """ This function is designed to execute a view (map/reduce) operation