diff --git a/pipeline.py b/pipeline.py index 917026d..c5a16d8 100644 --- a/pipeline.py +++ b/pipeline.py @@ -92,7 +92,7 @@ class Components : _df = pd.DataFrame(_df,columns=columns) # print (columns) - info = {"rows":_df.shape[0],"cols":_df.shape[1], "partition":part_index,"logs":_args['logs'],"num_gpu":2,"part_size":PART_SIZE} + info = {"rows":_df.shape[0],"cols":_df.shape[1], "partition":part_index,"logs":_args['logs'],"num_gpu":1,"part_size":PART_SIZE} p = {"args":_args,"data":_df.to_dict(orient="records"),"info":info} part_index += 1 qwriter.write(p) @@ -134,7 +134,7 @@ class Components : partition = args['partition'] if 'partition' in args else '' log_folder = os.sep.join([log_folder,args['context'],str(partition)]) - _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} + _args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0' @@ -147,15 +147,18 @@ class Components : reader = args['reader'] df = reader() # bounds = Components.split(df,MAX_ROWS,PART_SIZE) - if partition != '' and os.path.exists(log_folder): + if partition != '' : columns = args['columns'] df = np.array_split(df[columns].values,PART_SIZE) df = pd.DataFrame(df[ int (partition) ],columns = columns) + info = {"parition":int(partition),"rows":df.shape[0],"cols":df.shape[0],"part_size":PART_SIZE} + logger.write({"module":"generate","action":"partition","input":info}) _args['data'] = df # _args['data'] = reader() #_args['data'] = _args['data'].astype(object) - _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 + _args['num_gpu'] = 1 + _args['gpu'] = partition _dc = data.maker.generate(**_args) # # We need to post the generate the data in order to : @@ -205,7 +208,9 @@ class Components : logger.write({"module":"generate","action":"write","info":info} ) @staticmethod def callback(channel,method,header,stream): - + if stream.decode('utf8') in ['QUIT','EXIT','END'] : + channel.close() + channel.connection.close() info = json.loads(stream) logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':SYS_ARGS['context']}) @@ -214,10 +219,10 @@ class Components : args = info['args'] if args['num_gpu'] > 1 : args['gpu'] = int(info['info']['partition']) if info['info']['partition'] == 0 else info['info']['partition'] + 2 - args['num_gpu'] = 2 + else: args['gpu'] = 0 - args['num_gpu'] = 1 + args['num_gpu'] = 1 # if int(args['num_gpu']) > 1 and args['gpu'] > 0: # args['gpu'] = args['gpu'] + args['num_gpu'] if args['gpu'] + args['num_gpu'] < 8 else args['gpu'] #-- 8 max gpus args['reader'] = lambda: df @@ -242,8 +247,7 @@ if __name__ == '__main__' : args = (PIPELINE[index]) args = dict(args,**SYS_ARGS) - args['max_rows'] = int(args['max_rows']) if 'max_rows' in args else 3 - args['part_size'] = int(args['part_size']) if 'part_size' in args else 4 + args['logs'] = args['logs'] if 'logs' in args else 'logs' if 'dataset' not in args : args['dataset'] = 'combined20191004v2_deid'