diff --git a/pipeline.py b/pipeline.py index 15d562a..59745a9 100644 --- a/pipeline.py +++ b/pipeline.py @@ -143,29 +143,36 @@ class Components : _args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) - _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 - os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0' + # _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 + + if args['num_gpu'] > 1 : + _args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int)[0] + else: + _args['gpu'] = 0 + _args['num_gpu'] = 1 + _args['no_value']= args['no_value'] # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 # credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') # _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna() - reader = args['reader'] - df = reader() + # reader = args['reader'] + # df = reader() + df = args['reader']() if 'reader' in args else args['data'] # bounds = Components.split(df,MAX_ROWS,PART_SIZE) - if partition != '' : - columns = args['columns'] - df = np.array_split(df[columns].values,PART_SIZE) - df = pd.DataFrame(df[ int (partition) ],columns = columns) - info = {"parition":int(partition),"rows":df.shape[0],"cols":df.shape[0],"part_size":PART_SIZE} - logger.write({"module":"generate","action":"partition","input":info}) - + # if partition != '' : + # columns = args['columns'] + # df = np.array_split(df[columns].values,PART_SIZE) + # df = pd.DataFrame(df[ int (partition) ],columns = columns) + info = {"parition":int(partition),"rows":df.shape[0],"cols":df.shape[0],"part_size":PART_SIZE} + logger.write({"module":"generate","action":"partition","input":info}) + _args['data'] = df # _args['data'] = reader() #_args['data'] = _args['data'].astype(object) - _args['num_gpu'] = 1 - _args['gpu'] = partition + # _args['num_gpu'] = 1 + _dc = data.maker.generate(**_args) # # We need to post the generate the data in order to : @@ -226,7 +233,7 @@ class Components : df = pd.DataFrame(info['data']) args = info['args'] if args['num_gpu'] > 1 : - args['gpu'] = int(info['info']['partition']) if info['input']['partition'] == 0 else info['input']['partition'] + 2 + args['gpu'] = int(info['input']['partition']) if info['input']['partition'] < 8 else np.random.choice(np.arange(8),1).astype(int)[0] else: args['gpu'] = 0 @@ -269,8 +276,8 @@ if __name__ == '__main__' : if 'file' in args : reader = lambda: pd.read_csv(args['file']) ; else: - _df = Components().get(args) - reader = lambda: _df + DATA = Components().get(args) + reader = lambda: DATA args['reader'] = reader if 'generate' in SYS_ARGS : @@ -279,15 +286,23 @@ if __name__ == '__main__' : content = os.listdir( os.sep.join([args['logs'],args['context']])) generator = Components() - + DATA = reader() if ''.join(content).isnumeric() : # # we have partitions we are working with make = lambda _args: (Components()).generate(_args) jobs = [] - + del args['reader'] + columns = DATA.columns.tolist() + DATA = np.array_split(DATA[args['columns']],len(content)) for id in ''.join(content) : args['partition'] = id + args['data'] = pd.DataFrame(DATA[(int(id))],columns=args['columns']) + if args['num_gpu'] > 0 : + args['gpu'] = id + else: + args['gpu']=0 + args['num_gpu']=1 job = Process(target=make,args=(args,)) job.name = 'generator # '+str(id) job.start()