From e5fadc64a06200274b0e47bcb67ce94f4ec4854a Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sat, 19 Mar 2022 00:02:53 -0500 Subject: [PATCH] optimizations mongodb --- bin/transport | 14 +++++++------- transport/mongo.py | 18 ++++++++++++++---- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/bin/transport b/bin/transport index 47979db..6680f64 100755 --- a/bin/transport +++ b/bin/transport @@ -63,8 +63,8 @@ class Post(Process): else: self.PROVIDER = args['target']['provider'] args['target']['context'] = 'write' - - self.writer = transport.instance(**args['target']) + self.store = args['target'] + # self.writer = transport.instance(**args['target']) # # If the table doesn't exists maybe create it ? # @@ -86,9 +86,9 @@ class Post(Process): else: value = '' _info[name] = _info[name].fillna(value) - - self.writer.write(_info) - self.writer.close() + writer = transport.factory.instance(**self.store) + writer.write(_info) + writer.close() class ETL (Process): @@ -139,11 +139,11 @@ class ETL (Process): # # @TODO: locks for i in np.arange(self.JOB_COUNT) : - print () - print (i) _id = 'segment # '.join([str(i),' ',self.name]) indexes = rows[i] segment = idf.loc[indexes,:].copy() #.to_dict(orient='records') + if segment.shape[0] == 0 : + continue proc = Post(target = self._oargs,rows = segment,name=_id) self.jobs.append(proc) proc.start() diff --git a/transport/mongo.py b/transport/mongo.py index d1ee9ef..9f4ff11 100644 --- a/transport/mongo.py +++ b/transport/mongo.py @@ -20,7 +20,9 @@ else: from common import Reader, Writer import json import re +from multiprocessing import Lock, RLock class Mongo : + lock = RLock() """ Basic mongodb functions are captured here """ @@ -44,6 +46,7 @@ class Mongo : self.uid = args['doc'] #-- document identifier self.dbname = args['dbname'] if 'dbname' in args else args['db'] self.db = self.client[self.dbname] + self._lock = False if 'lock' not in args else args['lock'] def isready(self): p = self.dbname in self.client.list_database_names() @@ -144,10 +147,17 @@ class MongoWriter(Mongo,Writer): # if type(info) == list : # self.db[self.uid].insert_many(info) # else: - if type(info) == list or type(info) == pd.DataFrame : - self.db[self.uid].insert_many(info if type(info) == list else info.to_dict(orient='records')) - else: - self.db[self.uid].insert_one(info) + try: + + if self._lock : + Mongo.lock.acquire() + if type(info) == list or type(info) == pd.DataFrame : + self.db[self.uid].insert_many(info if type(info) == list else info.to_dict(orient='records')) + else: + self.db[self.uid].insert_one(info) + finally: + if self._lock : + Mongo.lock.release() def set(self,document): """ if no identifier is provided the function will delete the entire collection and set the new document.