From e1763b1b192bc34359a7691b6f695c0a6b319977 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 2 Apr 2024 20:59:01 -0500 Subject: [PATCH] bug fix: ETL, Mongodb --- bin/transport | 8 +++++++- transport/etl.py | 7 ++++++- transport/nosql/mongodb.py | 2 +- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/bin/transport b/bin/transport index 6d5710d..f483d94 100755 --- a/bin/transport +++ b/bin/transport @@ -62,8 +62,14 @@ def wait(jobs): time.sleep(1) @app.command(name="apply") -def move (path,index=None): +def apply (path,index=None): + """ + This function applies data transport from one source to one or several others + :path path of the configuration file + + :index index of the _item of interest (otherwise everything will be processed) + """ _proxy = lambda _object: _object.write(_object.read()) if os.path.exists(path): file = open(path) diff --git a/transport/etl.py b/transport/etl.py index 162e185..25750de 100644 --- a/transport/etl.py +++ b/transport/etl.py @@ -83,7 +83,12 @@ class Transporter(Process): _reader = transport.factory.instance(**self._source) # # If arguments are provided then a query is to be executed (not just a table dump) - return _reader.read() if 'args' not in self._source else _reader.read(**self._source['args']) + if 'cmd' in self._source or 'query' in self._source : + _query = self._source['cmd'] if 'cmd' in self._source else self._source['query'] + return _reader.read(**_query) + else: + return _reader.read() + # return _reader.read() if 'query' not in self._source else _reader.read(**self._source['query']) def _delegate_write(self,_data,**_args): """ diff --git a/transport/nosql/mongodb.py b/transport/nosql/mongodb.py index 2b94311..c498704 100644 --- a/transport/nosql/mongodb.py +++ b/transport/nosql/mongodb.py @@ -218,7 +218,7 @@ class Writer(Mongo): if type(info) == pd.DataFrame : info = info.to_dict(orient='records') # info if type(info) == list else info.to_dict(orient='records') - info = json.loads(json.dumps(info)) + info = json.loads(json.dumps(info,cls=IEncoder)) self.db[_uid].insert_many(info) else: #