diff --git a/data/gan.py b/data/gan.py index 1df26a3..898d4ea 100644 --- a/data/gan.py +++ b/data/gan.py @@ -581,7 +581,6 @@ class Predict(GNet): df = pd.DataFrame(np.round(f)).astype(np.int32) - p = 0 not in df.sum(axis=1).values x = df.sum(axis=1).values @@ -599,7 +598,8 @@ class Predict(GNet): # # In case we are dealing with actual values like diagnosis codes we can perform # - _index = [found.index(item) for item in found if item.shape[1] == len(self.values)] + N = len(found) + _index = [i for i in range(0,N) if found[i].shape[1] == len(self.values)] if not _index : INDEX = np.random.choice(np.arange(len(found)),1)[0] INDEX = ratio.index(np.max(ratio)) diff --git a/data/maker/__init__.py b/data/maker/__init__.py index 080939c..f4bce16 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -37,11 +37,14 @@ class ContinuousToDiscrete : index = BOUNDS.index(row) x_[index] = 1 break - + # + # for items in BOUNDS : + # index = BOUNDS.index(items) return _matrix @staticmethod def bounds(x,n): + # return np.array_split(x,n) return list(pd.cut(np.array( np.round(x,ContinuousToDiscrete.ROUND_UP) ),n).categories) @@ -175,7 +178,8 @@ def generate(**args): handler.load_meta(col) r = handler.apply() BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size']) - _df[col] = ContinuousToDiscrete.continuous(r[col],BIN_SIZE) if 'float' in df[col].dtypes.name or col in CONTINUOUS else r[col] + _df[col] = ContinuousToDiscrete.continuous(r[col],BIN_SIZE) if col in CONTINUOUS else r[col] + # _df[col] = r[col] # # @TODO: log basic stats about the synthetic attribute # diff --git a/pipeline.py b/pipeline.py index 9d9c097..6234c26 100644 --- a/pipeline.py +++ b/pipeline.py @@ -50,11 +50,12 @@ class Components : """ # # @TODO: we need to log something here about the parameters being passed - pointer = args['reader'] if 'reader' in args else lambda: Components.get(**args) - df = pointer() - if df.shape[0] == 0 : - print ("CAN NOT TRAIN EMPTY DATASET ") - return + # pointer = args['reader'] if 'reader' in args else lambda: Components.get(**args) + df = args['reader']() + + # if df.shape[0] == 0 : + # print ("CAN NOT TRAIN EMPTY DATASET ") + # return # # Now we can parse the arguments and submit the entire thing to training # @@ -113,18 +114,29 @@ class Components : pass else: + print ('.....') partition = args['partition'] if 'partition' in args else '' - log_folder = os.sep.join([log_folder,args['context'],partition]) + log_folder = os.sep.join([log_folder,args['context'],str(partition)]) _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) - _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 - os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0' + + # + # We ask the process to assume 1 gpu given the system number of GPU and that these tasks can run in parallel + # + if int(args['num_gpu']) > 1 : + _args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int)[0] + else: + _args['gpu'] = 0 + _args['num_gpu'] = 1 + os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) + _args['data'] = df # # @log : # Logging information about the training process for this partition (or not) # + info = {"rows":df.shape[0],"cols":df.shape[1], "partition":int(partition),"logs":_args['logs']} logger.write({"module":"train","action":"train","input":info}) @@ -291,7 +303,7 @@ if __name__ == '__main__' : if ''.join(content).isnumeric() : # # we have partitions we are working with - make = lambda _args: (Components()).generate(_args) + jobs = [] del args['reader'] columns = DATA.columns.tolist() @@ -310,13 +322,13 @@ if __name__ == '__main__' : args['gpu'] = id else: args['gpu']=0 - + make = lambda _args: (Components()).generate(_args) job = Process(target=make,args=(args,)) job.name = 'generator # '+str(id) job.start() jobs.append(job) - print (["Started ",len(jobs),"generator"+"s" if len(jobs)>1 else "" ]) + print (["Started ",len(jobs),"generators" if len(jobs)>1 else "generator" ]) while len(jobs)> 0 : jobs = [job for job in jobs if job.is_alive()] time.sleep(2) @@ -358,9 +370,31 @@ if __name__ == '__main__' : # qreader.read(1) pass else: + PART_SIZE = int(args['jobs']) if 'jobs' in args else 8 + DATA = reader() + DATA = np.array_split(DATA[args['columns']],PART_SIZE) + jobs = [] + for index in range(0,int(args['jobs'])) : + if 'focus' in args and int(args['focus']) != index : + continue + args['partition'] = index + _df = pd.DataFrame(DATA[index],columns=args['columns']) + args['reader'] = lambda: _df + make = lambda _args: (Components()).train(**_args) + job = Process(target=make,args=(args,)) + job.name = 'Trainer # ' + str(index) + job.start() + jobs.append(job) + # args['gpu'] + print (["Started ",len(jobs),"trainers" if len(jobs)>1 else "trainer" ]) + while len(jobs)> 0 : + jobs = [job for job in jobs if job.is_alive()] + time.sleep(2) + + # trainer = Components() + # trainer.train(**args) + - trainer = Components() - trainer.train(**args) # Components.train(**args) #for args in PIPELINE : #args['dataset'] = 'combined20190510'