From 66f43a98c1230013ac39a9cc9f2ea75421be7ee2 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 30 May 2023 15:47:10 -0500 Subject: [PATCH] bug fixes: etl, mongodb lexicon --- transport/etl.py | 10 +++++++--- transport/mongo.py | 7 +++++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/transport/etl.py b/transport/etl.py index d509005..9d520d4 100644 --- a/transport/etl.py +++ b/transport/etl.py @@ -106,9 +106,11 @@ class ETL (Process): # self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None # self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None # self._oargs = _args['target'] #transport.factory.instance(**_args['target']) - self._source = _args['source'] #dict(_args ['source'],**{'context':'write'}) - self._target = _args['target'] #dict(_args['target'],**{'context':'read','lock':True}) - + self._source = _args ['source'] + self._target = _args['target'] + self._source['context'] = 'read' + self._target['context'] = 'write' + self.JOB_COUNT = _args['jobs'] self.jobs = [] # self.logger = transport.factory.instance(**_args['logger']) @@ -131,6 +133,8 @@ class ETL (Process): # writing the data to a designated data source # try: + + _log = {"name":self.name,"rows":{"input":0,"output":0}} _reader = transport.factory.instance(**self._source) if 'table' in self._source : diff --git a/transport/mongo.py b/transport/mongo.py index 8ab4418..ae07bce 100644 --- a/transport/mongo.py +++ b/transport/mongo.py @@ -37,7 +37,7 @@ class Mongo : self.mechanism= 'SCRAM-SHA-256' if 'mechanism' not in args else args['mechanism'] # authSource=(args['authSource'] if 'authSource' in args else self.dbname) self._lock = False if 'lock' not in args else args['lock'] - + self.dbname = None username = password = None if 'auth_file' in args : _info = json.loads((open(args['auth_file'])).read()) @@ -46,17 +46,20 @@ class Mongo : else: _info = {} _args = dict(args,**_info) + _map = {'dbname':'db','database':'db','table':'uid','collection':'uid','col':'uid','doc':'uid'} for key in _args : if key in ['username','password'] : username = _args['username'] if key=='username' else username password = _args['password'] if key == 'password' else password continue value = _args[key] + if key in _map : + key = _map[key] self.setattr(key,value) # # Let us perform aliasing in order to remain backwards compatible - + self.dbname = self.db if hasattr(self,'db')else self.dbname self.uid = _args['table'] if 'table' in _args else (_args['doc'] if 'doc' in _args else (_args['collection'] if 'collection' in _args else None)) if username and password :