From 872744c682d26c751d2dfb377e05dc2afb64f95b Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Fri, 6 Mar 2020 13:00:32 -0600 Subject: [PATCH] bug fix with queue connection dropping out --- pipeline.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/pipeline.py b/pipeline.py index c042588..65eda3e 100644 --- a/pipeline.py +++ b/pipeline.py @@ -99,7 +99,7 @@ class Components : # print (columns) info = {"rows":_df.shape[0],"cols":_df.shape[1], "partition":part_index,"logs":_args['logs'],"num_gpu":1,"part_size":PART_SIZE} - p = {"args":_args,"data":_df.to_dict(orient="records"),"info":info} + p = {"args":_args,"data":_df.to_dict(orient="records"),"input":info} part_index += 1 qwriter.write(p) # @@ -124,7 +124,8 @@ class Components : # @log : # Logging information about the training process for this partition (or not) # - info = {"rows":df.shape[0],"cols":df.shape[1], "partition":partition,"logs":_args['logs']} + info = {"rows":df.shape[0],"cols":df.shape[1], "partition":int(partition),"logs":_args['logs']} + logger.write({"module":"train","action":"train","input":info}) data.maker.train(**_args) @@ -211,7 +212,7 @@ class Components : info = {"full":{"path":_fname,"rows":_args['data'].shape[0]},"compare":{"name":_pname,"rows":data_comp.shape[0]} } if partition : info ['partition'] = int(partition) - logger.write({"module":"generate","action":"write","info":info} ) + logger.write({"module":"generate","action":"write","input":info} ) @staticmethod def callback(channel,method,header,stream): @@ -221,11 +222,11 @@ class Components : info = json.loads(stream) logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':SYS_ARGS['context']}) - logger.write({'module':'process','action':'read-partition','input':info['info']}) + logger.write({'module':'process','action':'read-partition','input':info['input']}) df = pd.DataFrame(info['data']) args = info['args'] if args['num_gpu'] > 1 : - args['gpu'] = int(info['info']['partition']) if info['info']['partition'] == 0 else info['info']['partition'] + 2 + args['gpu'] = int(info['info']['partition']) if info['input']['partition'] == 0 else info['input']['partition'] + 2 else: args['gpu'] = 0 @@ -237,11 +238,12 @@ class Components : # @TODO: Fix # There is an inconsistency in column/columns ... fix this shit! # - args['columns'] = args['column'] - (Components()).train(**args) - logger.write({"module":"process","action":"exit","info":info["info"]}) channel.close() channel.connection.close() + args['columns'] = args['column'] + (Components()).train(**args) + logger.write({"module":"process","action":"exit","input":info["input"]}) + pass if __name__ == '__main__' : @@ -280,18 +282,19 @@ if __name__ == '__main__' : if ''.join(content).isnumeric() : # # we have partitions we are working with - make = lambda args: (Components()).generate(args) + make = lambda _args: (Components()).generate(_args) jobs = [] - print (["Started ",len(jobs),"generators"]) + for id in ''.join(content) : args['partition'] = id - job = Process(target=make,args=(args,args)) - + job = Process(target=make,args=(args,)) + job.name = 'generator # '+str(id) job.start() jobs.append(job) - - while (len(jobs)> 0) : - jobs = [jobs for job in jobs if job.is_alive()] + + print (["Started ",len(jobs),"generator"+"s" if len(jobs)>1 else "" ]) + while len(jobs)> 0 : + jobs = [job for job in jobs if job.is_alive()] time.sleep(2) # generator.generate(args)