diff --git a/pipeline.py b/pipeline.py index 5442935..917026d 100644 --- a/pipeline.py +++ b/pipeline.py @@ -61,27 +61,23 @@ class Components : logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) log_folder = args['logs'] if 'logs' in args else 'logs' _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} + _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 _args['gpu'] = args['gpu'] if 'gpu' in args else 0 - MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 - PART_SIZE = args['part_size'] if 'part_size' in args else 0 + # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 + PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 - if df.shape[0] > MAX_ROWS and 'partition' not in args: + if 'partition' not in args: lbound = 0 - bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories) + # bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories) # bounds = Components.split(df,MAX_ROWS,PART_SIZE) - + columns = args['columns'] + df = np.array_split(df[columns].values,PART_SIZE) qwriter = factory.instance(type='queue.QueueWriter',args={'queue':'aou.io'}) - - for b in bounds : - part_index = bounds.index(b) - ubound = int(b.right) - - - _data = df.iloc[lbound:ubound][args['columns']] - lbound = ubound + part_index = 0 + for _df in df: # _args['logs'] = os.sep.join([log_folder,str(part_index)]) _args['partition'] = str(part_index) @@ -92,14 +88,20 @@ class Components : # - where to get the data # - and athe arguments to use (partition #,columns,gpu,epochs) # - info = {"rows":_data.shape[0],"cols":_data.shape[1], "partition":part_index,"logs":_args['logs']} - p = {"args":_args,"data":_data.to_dict(orient="records"),"info":info} + + _df = pd.DataFrame(_df,columns=columns) + # print (columns) + + info = {"rows":_df.shape[0],"cols":_df.shape[1], "partition":part_index,"logs":_args['logs'],"num_gpu":2,"part_size":PART_SIZE} + p = {"args":_args,"data":_df.to_dict(orient="records"),"info":info} + part_index += 1 qwriter.write(p) # # @TODO: # - Notify that information was just posted to the queue - info['max_rows'] = MAX_ROWS - info['part_size'] = PART_SIZE + # In case we want slow-mode, we can store the partitions in mongodb and process (Yes|No)? + # + logger.write({"module":"train","action":"setup-partition","input":info}) pass @@ -137,37 +139,18 @@ class Components : _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0' _args['no_value']= args['no_value'] - MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 - PART_SIZE = args['part_size'] if 'part_size' in args else 0 + # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 + PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 # credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') # _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna() reader = args['reader'] df = reader() - bounds = Components.split(df,MAX_ROWS,PART_SIZE) + # bounds = Components.split(df,MAX_ROWS,PART_SIZE) if partition != '' and os.path.exists(log_folder): - bounds = Components.split(df,MAX_ROWS,PART_SIZE) - # bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories) - lbound = int(bounds[int(partition)].left) - ubound = int(bounds[int(partition)].right) - df = df.iloc[lbound:ubound] - else: - # - # We have an implicit partition here - # bounds = Components.split(df,MAX_ROWS,PART_SIZE) - logger.write({"module":"generate","action":"virtual-parititions","input":{"rows":df.shape[0],"max_rows":MAX_ROWS,"part_size":PART_SIZE}}) - for item in bounds : - - lbound = int(item.left) - ubound = int(item.right) - args['reader'] = lambda: df[lbound:ubound] - args['partition'] = bounds.index(item) - - self.generate(args) - return ; - if not os.path.exists(log_folder) : - log_folder = log_folder.replace(partition,'') - _args['logs'] = log_folder + columns = args['columns'] + df = np.array_split(df[columns].values,PART_SIZE) + df = pd.DataFrame(df[ int (partition) ],columns = columns) _args['data'] = df # _args['data'] = reader() @@ -189,7 +172,7 @@ class Components : _args['data'][name] = _dc[name] info = {"module":"generate","action":"io","input":{"rows":_dc[name].shape[0],"name":name}} if partition != '' : - info['partition'] = partition + info['partition'] = int(partition) logger.write(info) # filename = os.sep.join([log_folder,'output',name+'.csv']) # data_comp[[name]].to_csv(filename,index=False) @@ -218,7 +201,7 @@ class Components : info = {"full":{"path":_fname,"rows":_args['data'].shape[0]},"compare":{"name":_pname,"rows":data_comp.shape[0]} } if partition : - info ['partition'] = partition + info ['partition'] = int(partition) logger.write({"module":"generate","action":"write","info":info} ) @staticmethod def callback(channel,method,header,stream): @@ -229,8 +212,12 @@ class Components : logger.write({'module':'process','action':'read-partition','input':info['info']}) df = pd.DataFrame(info['data']) args = info['args'] - MAX_GPUS = 8 - args['gpu'] = int(info['info']['partition']) if info['info']['partition'] < MAX_GPUS else np.random.choice(np.arange(MAX_GPUS),1).astype(int).tolist()[0] + if args['num_gpu'] > 1 : + args['gpu'] = int(info['info']['partition']) if info['info']['partition'] == 0 else info['info']['partition'] + 2 + args['num_gpu'] = 2 + else: + args['gpu'] = 0 + args['num_gpu'] = 1 # if int(args['num_gpu']) > 1 and args['gpu'] > 0: # args['gpu'] = args['gpu'] + args['num_gpu'] if args['gpu'] + args['num_gpu'] < 8 else args['gpu'] #-- 8 max gpus args['reader'] = lambda: df @@ -296,7 +283,7 @@ if __name__ == '__main__' : SYS_ARGS = dict(args) #-- things get lost in context if 'read' in SYS_ARGS : QUEUE_TYPE = 'queue.QueueReader' - pointer = lambda qreader: qreader.read(1) + pointer = lambda qreader: qreader.read() else: QUEUE_TYPE = 'queue.QueueListener' pointer = lambda qlistener: qlistener.listen() diff --git a/setup.py b/setup.py index 02f49a2..bcacb62 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ import sys def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() -args = {"name":"data-maker","version":"1.1.8","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT", +args = {"name":"data-maker","version":"1.1.9","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT", "packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]} args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo'] args['url'] = 'https://hiplab.mc.vanderbilt.edu/git/aou/data-maker.git'