diff --git a/setup.py b/setup.py index 9d9fdcf..483ea87 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { "name":"data-transport", - "version":"1.6.6", + "version":"1.6.8", "author":"The Phi Technology LLC","author_email":"info@the-phi.com", "license":"MIT", "packages":["transport"]} diff --git a/transport/__init__.py b/transport/__init__.py index baa960e..f43a184 100644 --- a/transport/__init__.py +++ b/transport/__init__.py @@ -55,6 +55,7 @@ import os class providers : POSTGRESQL = 'postgresql' MONGODB = 'mongodb' + BIGQUERY ='bigquery' FILE = 'file' ETL = 'etl' @@ -72,8 +73,10 @@ class providers : # synonyms of the above BQ = BIGQUERY MONGO = MONGODB + FERRETDB= MONGODB PG = POSTGRESQL PSQL = POSTGRESQL + PGSQL = POSTGRESQL class IEncoder (json.JSONEncoder): def default (self,object): diff --git a/transport/common.py b/transport/common.py index 73f42ac..39df6a3 100644 --- a/transport/common.py +++ b/transport/common.py @@ -22,6 +22,7 @@ import numpy as np import json import importlib from multiprocessing import RLock +import queue # import couch # import mongo @@ -115,6 +116,8 @@ class Console(Writer): finally: if self.lock : Console.lock.release() + + """ @NOTE : Experimental !! """ diff --git a/transport/mongo.py b/transport/mongo.py index 0ceb1e0..8ab4418 100644 --- a/transport/mongo.py +++ b/transport/mongo.py @@ -124,7 +124,15 @@ class MongoReader(Mongo,Reader): return pd.DataFrame(r) else: - collection = self.db[self.uid] + if 'table' in args or 'collection' in args : + if 'table' in args: + _uid = args['table'] + elif 'collection' in args : + _uid = args['collection'] + else: + _uid = self.uid + + collection = self.db[_uid] _filter = args['filter'] if 'filter' in args else {} _df = pd.DataFrame(collection.find(_filter)) columns = _df.columns.tolist()[1:] @@ -185,7 +193,10 @@ class MongoWriter(Mongo,Writer): # self.db[self.uid].insert_many(info) # else: try: - _uid = self.uid if 'doc' not in _args else _args['doc'] + if 'table' in _args or 'collection' in _args : + _uid = _args['table'] if 'table' in _args else _args['collection'] + else: + _uid = self.uid if 'doc' not in _args else _args['doc'] if self._lock : Mongo.lock.acquire() if type(info) == list or type(info) == pd.DataFrame : diff --git a/transport/sql.py b/transport/sql.py index f6e196c..05d254d 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -278,7 +278,9 @@ class SQLWriter(SQLRW,Writer): try: table = _args['table'] if 'table' in _args else self.table + self.schema = _args['schema'] if 'schema' in _args else self.schema table = self._tablename(table) + _sql = "INSERT INTO :table (:fields) VALUES (:values)".replace(":table",table) #.replace(":table",self.table).replace(":fields",_fields) if type(info) == list :