From 330d6b6ae681dcc50f647d17a777354980fa3f58 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sun, 8 Mar 2020 08:48:38 -0500 Subject: [PATCH] bug fix with partition & data -access --- data/gan.py | 43 +++++++--- data/maker/__init__.py | 27 ++++-- pipeline.py | 184 ++++++++++++++++++----------------------- 3 files changed, 131 insertions(+), 123 deletions(-) diff --git a/data/gan.py b/data/gan.py index 898d4ea..a6d35e1 100644 --- a/data/gan.py +++ b/data/gan.py @@ -532,10 +532,13 @@ class Predict(GNet): self.generator = Generator(**args) self.values = args['values'] self.ROW_COUNT = args['row_count'] + self.oROW_COUNT = self.ROW_COUNT + self.MISSING_VALUES = args['no_value'] def load_meta(self, column): super().load_meta(column) self.generator.load_meta(column) + self.ROW_COUNT = self.oROW_COUNT def apply(self,**args): # print (self.train_dir) # suffix = "-".join(self.ATTRIBUTES['synthetic']) if isinstance(self.ATTRIBUTES['synthetic'],list) else self.ATTRIBUTES['synthetic'] @@ -544,6 +547,7 @@ class Predict(GNet): demo = self._LABEL #np.zeros([self.ROW_COUNT,self.NUM_LABELS]) #args['de"shape":{"LABEL":list(self._LABEL.shape)} mo'] tf.compat.v1.reset_default_graph() z = tf.random.normal(shape=[self.ROW_COUNT, self.Z_DIM]) + y = tf.compat.v1.placeholder(shape=[self.ROW_COUNT, self.NUM_LABELS], dtype=tf.int32) if self._LABEL is not None : ma = [[i] for i in np.arange(self.NUM_LABELS - 2)] @@ -569,6 +573,8 @@ class Predict(GNet): found = [] ratio = [] + __x__ = None + __ratio=0 for i in np.arange(CANDIDATE_COUNT) : if labels : f = sess.run(fake,feed_dict={y:labels}) @@ -590,7 +596,8 @@ class Predict(GNet): if i == CANDIDATE_COUNT: break else: - + __x__ = df if __x__ is None or np.where(x > 0)[0].size > np.where(__x__ > 0)[0].size else __x__ + __ratio = np.divide( np.sum(x), x.size) if __x__ is None or np.where(x > 0)[0].size > np.where(__x__ > 0)[0].size else __ratio continue # i = df.T.index.astype(np.int32) #-- These are numeric pseudonyms @@ -600,23 +607,33 @@ class Predict(GNet): # N = len(found) _index = [i for i in range(0,N) if found[i].shape[1] == len(self.values)] - if not _index : - INDEX = np.random.choice(np.arange(len(found)),1)[0] - INDEX = ratio.index(np.max(ratio)) - else: - INDEX = _index[0] + if not _index and not found : + df = __x__ + INDEX = -1 + else : + if not _index : + INDEX = np.random.choice(np.arange(len(found)),1)[0] + INDEX = ratio.index(np.max(ratio)) + else: + INDEX = _index[0] - df = found[INDEX] + df = found[INDEX] columns = self.ATTRIBUTES['synthetic'] if isinstance(self.ATTRIBUTES['synthetic'],list)else [self.ATTRIBUTES['synthetic']] # r = np.zeros((self.ROW_COUNT,len(columns))) # r = np.zeros(self.ROW_COUNT) if self.logger : - info = {"found":len(found),"selected":INDEX, "ratio": ratio[INDEX],"rows":df.shape[0],"cols":df.shape[1],"expected":len(self.values)} + info = {"found":len(found),"rows":df.shape[0],"cols":df.shape[1],"expected":len(self.values)} + if INDEX > 0 : + info =dict(info ,**{"selected":INDEX, "ratio": ratio[INDEX] }) + else : + + info['selected'] = -1 + info['ratio'] = __ratio self.logger.write({"module":"gan-generate","action":"generate","input":info}) df.columns = self.values - if len(found): + if len(found) or df.columns.size == len(self.values): # print (len(found),NTH_VALID_CANDIDATE) # x = df * self.values # @@ -639,10 +656,14 @@ class Predict(GNet): df = pd.DataFrame( df.iloc[i].apply(lambda row: self.values[np.random.choice(np.where(row != 0)[0],1)[0]] ,axis=1)) df.columns = columns df = df[columns[0]].append(pd.Series(missing)) - + if self.logger : + + info= {"missing": i.size,"rows":df.shape[0],"cols":1} + self.logger.write({"module":"gan-generate","action":"compile.io","input":info}) + - + # print(df.head()) tf.compat.v1.reset_default_graph() df = pd.DataFrame(df) df.columns = columns diff --git a/data/maker/__init__.py b/data/maker/__init__.py index f4bce16..4be97b8 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -107,23 +107,33 @@ def train (**args) : # args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values # if 'float' not in df[col].dtypes.name : # args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values - if 'float' in df[col].dtypes.name and col in CONTINUOUS: + if col in CONTINUOUS: BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size']) args['real'] = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32) else: - args['real'] = pd.get_dummies(df[col].dropna()).astype(np.float32).values + df.to_csv('tmp-'+args['logs'].replace('/','_')+'-'+col+'.csv',index=False) + # print (df[col].dtypes) + # print (df[col].dropna/(axis=1).unique()) + args['real'] = pd.get_dummies(df[col].dropna()).astype(np.float32).values + + - args['column'] = col - args['context'] = col context = args['context'] if 'store' in args : args['store']['args']['doc'] = context logger = factory.instance(**args['store']) args['logger'] = logger + info = {"rows":args['real'].shape[0],"cols":args['real'].shape[1],"name":col} + logger.write({"module":"gan-train","action":"data-prep","input":info}) else: logger = None + args['column'] = col + args['context'] = col + + # + # If the s trainer = gan.Train(**args) trainer.apply() def post(**args): @@ -149,6 +159,7 @@ def generate(**args): """ # df = args['data'] df = args['data'] if not isinstance(args['data'],str) else pd.read_csv(args['data']) + CONTINUOUS = args['continous'] if 'continuous' in args else [] column = args['column'] if (isinstance(args['column'],list)) else [args['column']] # column_id = args['id'] @@ -168,7 +179,8 @@ def generate(**args): # values = ContinuousToDiscrete.continuous(df[col].values,BIN_SIZE) # # values = np.unique(values).tolist() # else: - values = df[col].unique().tolist() + values = df[col].dropna().unique().tolist() + args['values'] = values args['row_count'] = df.shape[0] @@ -178,8 +190,9 @@ def generate(**args): handler.load_meta(col) r = handler.apply() BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size']) - _df[col] = ContinuousToDiscrete.continuous(r[col],BIN_SIZE) if col in CONTINUOUS else r[col] - # _df[col] = r[col] + + # _df[col] = ContinuousToDiscrete.continuous(r[col],BIN_SIZE) if col in CONTINUOUS else r[col] + _df[col] = r[col] # # @TODO: log basic stats about the synthetic attribute # diff --git a/pipeline.py b/pipeline.py index 6234c26..0f2c258 100644 --- a/pipeline.py +++ b/pipeline.py @@ -30,11 +30,13 @@ class Components : condition = ' '.join([args['condition']['field'],args['condition']['qualifier'],'(',args['condition']['value'],')']) SQL = " ".join([SQL,'WHERE',condition]) - SQL = SQL.replace(':dataset',args['dataset']) #+ " LIMIT 1000 " + SQL = SQL.replace(':dataset',args['dataset']) #+ " LI " + if 'limit' in args : - SQL = SQL + 'LIMIT ' + args['limit'] + SQL = SQL + ' LIMIT ' + args['limit'] + credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') - df = pd.read_gbq(SQL,credentials=credentials,dialect='standard') + df = pd.read_gbq(SQL,credentials=credentials,dialect='standard').astype(object) return df # return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna() @@ -51,7 +53,8 @@ class Components : # # @TODO: we need to log something here about the parameters being passed # pointer = args['reader'] if 'reader' in args else lambda: Components.get(**args) - df = args['reader']() + df = args['data'] + # if df.shape[0] == 0 : # print ("CAN NOT TRAIN EMPTY DATASET ") @@ -62,85 +65,43 @@ class Components : logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) log_folder = args['logs'] if 'logs' in args else 'logs' - _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} + # _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} - _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) - _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 - _args['gpu'] = args['gpu'] if 'gpu' in args else 0 + # _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) + # _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 + # _args['gpu'] = args['gpu'] if 'gpu' in args else 0 - # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 - PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 - - if 'partition' not in args: - lbound = 0 - # bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories) - # bounds = Components.split(df,MAX_ROWS,PART_SIZE) - columns = args['columns'] - df = np.array_split(df[columns].values,PART_SIZE) - qwriter = factory.instance(type='queue.QueueWriter',args={'queue':'aou.io'}) - part_index = 0 - - # - # let's start n processes to listen & train this mother ... - # - #-- hopefully they learn as daemons + # # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 + PART_SIZE = int(args['part_size']) - for _df in df: - - # _args['logs'] = os.sep.join([log_folder,str(part_index)]) - _args['partition'] = str(part_index) - _args['logger'] = {'args':{'dbname':'aou','doc':args['context']},'type':'mongo.MongoWriter'} - - # - # We should post the the partitions to a queue server (at least the instructions on ): - # - where to get the data - # - and athe arguments to use (partition #,columns,gpu,epochs) - # + partition = args['partition'] + log_folder = os.sep.join([log_folder,args['context'],str(partition)]) + _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} + _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) - _df = pd.DataFrame(_df,columns=columns) - # print (columns) - - info = {"rows":_df.shape[0],"cols":_df.shape[1], "partition":part_index,"logs":_args['logs'],"num_gpu":1,"part_size":PART_SIZE} - p = {"args":_args,"data":_df.to_dict(orient="records"),"input":info} - part_index += 1 - qwriter.write(p) - # - # @TODO: - # - Notify that information was just posted to the queue - # In case we want slow-mode, we can store the partitions in mongodb and process (Yes|No)? - # - - logger.write({"module":"train","action":"setup-partition","input":info}) - - pass + # + # We ask the process to assume 1 gpu given the system number of GPU and that these tasks can run in parallel + # + if int(args['num_gpu']) > 1 : + _args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int)[0] else: - print ('.....') - partition = args['partition'] if 'partition' in args else '' - log_folder = os.sep.join([log_folder,args['context'],str(partition)]) - _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} - _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) - - # - # We ask the process to assume 1 gpu given the system number of GPU and that these tasks can run in parallel - # - if int(args['num_gpu']) > 1 : - _args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int)[0] - else: - _args['gpu'] = 0 - _args['num_gpu'] = 1 - os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) - - - _args['data'] = df - # - # @log : - # Logging information about the training process for this partition (or not) - # - - info = {"rows":df.shape[0],"cols":df.shape[1], "partition":int(partition),"logs":_args['logs']} - - logger.write({"module":"train","action":"train","input":info}) - data.maker.train(**_args) + _args['gpu'] = 0 + _args['num_gpu'] = 1 + os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) + + _args['store'] = {'type':'mongo.MongoWriter','args':{'dbname':'aou','doc':args['context']}} + _args['data'] = args['data'] + + # print (['partition ',partition,df.value_source_concept_id.unique()]) + # + # @log : + # Logging information about the training process for this partition (or not) + # + + info = {"rows":df.shape[0],"cols":df.shape[1], "partition":int(partition),"logs":_args['logs']} + + logger.write({"module":"train","action":"train","input":info}) + data.maker.train(**_args) pass @@ -210,6 +171,7 @@ class Components : # #-- Let us store all of this into bigquery prefix = args['notify']+'.'+_args['context'] + partition = str(partition) table = '_'.join([prefix,partition,'io']).replace('__','_') folder = os.sep.join([args['logs'],args['context'],partition,'output']) if 'file' in args : @@ -219,17 +181,19 @@ class Components : data_comp.to_csv( _pname,index=False) _args['data'].to_csv(_fname,index=False) - + _id = 'path' else: credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') _pname = os.sep.join([folder,table+'.csv']) _fname = table.replace('_io','_full_io') - data_comp.to_gbq(if_exists='replace',destination_table=_pname,credentials='credentials',chunk_size=50000) + partial = '.'.join(['io',args['context']+'_partial_io']) + complete= '.'.join(['io',args['context']+'_full_io']) + data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=50000) data_comp.to_csv(_pname,index=False) INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append' - _args['data'].to_gbq(if_exists=INSERT_FLAG,destination_table=_fname,credentials='credentials',chunk_size=50000) - - info = {"full":{"path":_fname,"rows":_args['data'].shape[0]},"compare":{"name":_pname,"rows":data_comp.shape[0]} } + _args['data'].to_gbq(if_exists=INSERT_FLAG,destination_table=complete,credentials=credentials,chunksize=50000) + _id = 'dataset' + info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} } if partition : info ['partition'] = int(partition) logger.write({"module":"generate","action":"write","input":info} ) @@ -280,18 +244,18 @@ if __name__ == '__main__' : args['logs'] = args['logs'] if 'logs' in args else 'logs' if 'dataset' not in args : args['dataset'] = 'combined20191004v2_deid' - + PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 # # @TODO: # Log what was initiated so we have context of this processing ... # if 'listen' not in SYS_ARGS : if 'file' in args : - reader = lambda: pd.read_csv(args['file']) ; + DATA = pd.read_csv(args['file']) ; else: DATA = Components().get(args) - reader = lambda: DATA - args['reader'] = reader + COLUMNS = DATA.columns + DATA = np.array_split(DATA,PART_SIZE) if 'generate' in SYS_ARGS : # @@ -299,32 +263,34 @@ if __name__ == '__main__' : content = os.listdir( os.sep.join([args['logs'],args['context']])) generator = Components() - DATA = reader() + if ''.join(content).isnumeric() : # # we have partitions we are working with jobs = [] - del args['reader'] - columns = DATA.columns.tolist() - DATA = np.array_split(DATA[args['columns']],len(content)) + + # columns = DATA.columns.tolist() + + # DATA = np.array_split(DATA,PART_SIZE) - for id in ''.join(content) : - if 'focus' in args and int(args['focus']) != int(id) : + for index in range(0,PART_SIZE) : + if 'focus' in args and int(args['focus']) != index : # # This handles failures/recoveries for whatever reason # If we are only interested in generating data for a given partition continue - - args['partition'] = id - args['data'] = pd.DataFrame(DATA[(int(id))],columns=args['columns']) + # index = id.index(id) + + args['partition'] = index + args['data'] = DATA[index] if int(args['num_gpu']) > 1 : - args['gpu'] = id + args['gpu'] = index else: args['gpu']=0 make = lambda _args: (Components()).generate(_args) job = Process(target=make,args=(args,)) - job.name = 'generator # '+str(id) + job.name = 'generator # '+str(index) job.start() jobs.append(job) @@ -370,18 +336,26 @@ if __name__ == '__main__' : # qreader.read(1) pass else: - PART_SIZE = int(args['jobs']) if 'jobs' in args else 8 - DATA = reader() - DATA = np.array_split(DATA[args['columns']],PART_SIZE) + + # DATA = np.array_split(DATA,PART_SIZE) + jobs = [] - for index in range(0,int(args['jobs'])) : + for index in range(0,PART_SIZE) : if 'focus' in args and int(args['focus']) != index : continue + args['part_size'] = PART_SIZE args['partition'] = index - _df = pd.DataFrame(DATA[index],columns=args['columns']) - args['reader'] = lambda: _df + # _df = pd.DataFrame(DATA[index],columns=args['columns']) + args['data'] = DATA[index] + args['data'].to_csv('aou-'+str(index)+'csv',index=False) + # args['reader'] = lambda: _df + if int(args['num_gpu']) > 1 : + args['gpu'] = index + else: + args['gpu']=0 + make = lambda _args: (Components()).train(**_args) - job = Process(target=make,args=(args,)) + job = Process(target=make,args=( dict(args),)) job.name = 'Trainer # ' + str(index) job.start() jobs.append(job)