bug fix with partition & data -access

dev
Steve L. Nyemba 5 years ago
parent 718e578401
commit 330d6b6ae6

@ -532,10 +532,13 @@ class Predict(GNet):
self.generator = Generator(**args) self.generator = Generator(**args)
self.values = args['values'] self.values = args['values']
self.ROW_COUNT = args['row_count'] self.ROW_COUNT = args['row_count']
self.oROW_COUNT = self.ROW_COUNT
self.MISSING_VALUES = args['no_value'] self.MISSING_VALUES = args['no_value']
def load_meta(self, column): def load_meta(self, column):
super().load_meta(column) super().load_meta(column)
self.generator.load_meta(column) self.generator.load_meta(column)
self.ROW_COUNT = self.oROW_COUNT
def apply(self,**args): def apply(self,**args):
# print (self.train_dir) # print (self.train_dir)
# suffix = "-".join(self.ATTRIBUTES['synthetic']) if isinstance(self.ATTRIBUTES['synthetic'],list) else self.ATTRIBUTES['synthetic'] # suffix = "-".join(self.ATTRIBUTES['synthetic']) if isinstance(self.ATTRIBUTES['synthetic'],list) else self.ATTRIBUTES['synthetic']
@ -544,6 +547,7 @@ class Predict(GNet):
demo = self._LABEL #np.zeros([self.ROW_COUNT,self.NUM_LABELS]) #args['de"shape":{"LABEL":list(self._LABEL.shape)} mo'] demo = self._LABEL #np.zeros([self.ROW_COUNT,self.NUM_LABELS]) #args['de"shape":{"LABEL":list(self._LABEL.shape)} mo']
tf.compat.v1.reset_default_graph() tf.compat.v1.reset_default_graph()
z = tf.random.normal(shape=[self.ROW_COUNT, self.Z_DIM]) z = tf.random.normal(shape=[self.ROW_COUNT, self.Z_DIM])
y = tf.compat.v1.placeholder(shape=[self.ROW_COUNT, self.NUM_LABELS], dtype=tf.int32) y = tf.compat.v1.placeholder(shape=[self.ROW_COUNT, self.NUM_LABELS], dtype=tf.int32)
if self._LABEL is not None : if self._LABEL is not None :
ma = [[i] for i in np.arange(self.NUM_LABELS - 2)] ma = [[i] for i in np.arange(self.NUM_LABELS - 2)]
@ -569,6 +573,8 @@ class Predict(GNet):
found = [] found = []
ratio = [] ratio = []
__x__ = None
__ratio=0
for i in np.arange(CANDIDATE_COUNT) : for i in np.arange(CANDIDATE_COUNT) :
if labels : if labels :
f = sess.run(fake,feed_dict={y:labels}) f = sess.run(fake,feed_dict={y:labels})
@ -590,7 +596,8 @@ class Predict(GNet):
if i == CANDIDATE_COUNT: if i == CANDIDATE_COUNT:
break break
else: else:
__x__ = df if __x__ is None or np.where(x > 0)[0].size > np.where(__x__ > 0)[0].size else __x__
__ratio = np.divide( np.sum(x), x.size) if __x__ is None or np.where(x > 0)[0].size > np.where(__x__ > 0)[0].size else __ratio
continue continue
# i = df.T.index.astype(np.int32) #-- These are numeric pseudonyms # i = df.T.index.astype(np.int32) #-- These are numeric pseudonyms
@ -600,23 +607,33 @@ class Predict(GNet):
# #
N = len(found) N = len(found)
_index = [i for i in range(0,N) if found[i].shape[1] == len(self.values)] _index = [i for i in range(0,N) if found[i].shape[1] == len(self.values)]
if not _index : if not _index and not found :
INDEX = np.random.choice(np.arange(len(found)),1)[0] df = __x__
INDEX = ratio.index(np.max(ratio)) INDEX = -1
else: else :
INDEX = _index[0] if not _index :
INDEX = np.random.choice(np.arange(len(found)),1)[0]
INDEX = ratio.index(np.max(ratio))
else:
INDEX = _index[0]
df = found[INDEX] df = found[INDEX]
columns = self.ATTRIBUTES['synthetic'] if isinstance(self.ATTRIBUTES['synthetic'],list)else [self.ATTRIBUTES['synthetic']] columns = self.ATTRIBUTES['synthetic'] if isinstance(self.ATTRIBUTES['synthetic'],list)else [self.ATTRIBUTES['synthetic']]
# r = np.zeros((self.ROW_COUNT,len(columns))) # r = np.zeros((self.ROW_COUNT,len(columns)))
# r = np.zeros(self.ROW_COUNT) # r = np.zeros(self.ROW_COUNT)
if self.logger : if self.logger :
info = {"found":len(found),"selected":INDEX, "ratio": ratio[INDEX],"rows":df.shape[0],"cols":df.shape[1],"expected":len(self.values)} info = {"found":len(found),"rows":df.shape[0],"cols":df.shape[1],"expected":len(self.values)}
if INDEX > 0 :
info =dict(info ,**{"selected":INDEX, "ratio": ratio[INDEX] })
else :
info['selected'] = -1
info['ratio'] = __ratio
self.logger.write({"module":"gan-generate","action":"generate","input":info}) self.logger.write({"module":"gan-generate","action":"generate","input":info})
df.columns = self.values df.columns = self.values
if len(found): if len(found) or df.columns.size == len(self.values):
# print (len(found),NTH_VALID_CANDIDATE) # print (len(found),NTH_VALID_CANDIDATE)
# x = df * self.values # x = df * self.values
# #
@ -639,10 +656,14 @@ class Predict(GNet):
df = pd.DataFrame( df.iloc[i].apply(lambda row: self.values[np.random.choice(np.where(row != 0)[0],1)[0]] ,axis=1)) df = pd.DataFrame( df.iloc[i].apply(lambda row: self.values[np.random.choice(np.where(row != 0)[0],1)[0]] ,axis=1))
df.columns = columns df.columns = columns
df = df[columns[0]].append(pd.Series(missing)) df = df[columns[0]].append(pd.Series(missing))
if self.logger :
info= {"missing": i.size,"rows":df.shape[0],"cols":1}
self.logger.write({"module":"gan-generate","action":"compile.io","input":info})
# print(df.head())
tf.compat.v1.reset_default_graph() tf.compat.v1.reset_default_graph()
df = pd.DataFrame(df) df = pd.DataFrame(df)
df.columns = columns df.columns = columns

@ -107,23 +107,33 @@ def train (**args) :
# args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values # args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values
# if 'float' not in df[col].dtypes.name : # if 'float' not in df[col].dtypes.name :
# args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values # args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values
if 'float' in df[col].dtypes.name and col in CONTINUOUS: if col in CONTINUOUS:
BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size']) BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
args['real'] = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32) args['real'] = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32)
else: else:
args['real'] = pd.get_dummies(df[col].dropna()).astype(np.float32).values df.to_csv('tmp-'+args['logs'].replace('/','_')+'-'+col+'.csv',index=False)
# print (df[col].dtypes)
# print (df[col].dropna/(axis=1).unique())
args['real'] = pd.get_dummies(df[col].dropna()).astype(np.float32).values
args['column'] = col
args['context'] = col
context = args['context'] context = args['context']
if 'store' in args : if 'store' in args :
args['store']['args']['doc'] = context args['store']['args']['doc'] = context
logger = factory.instance(**args['store']) logger = factory.instance(**args['store'])
args['logger'] = logger args['logger'] = logger
info = {"rows":args['real'].shape[0],"cols":args['real'].shape[1],"name":col}
logger.write({"module":"gan-train","action":"data-prep","input":info})
else: else:
logger = None logger = None
args['column'] = col
args['context'] = col
#
# If the s
trainer = gan.Train(**args) trainer = gan.Train(**args)
trainer.apply() trainer.apply()
def post(**args): def post(**args):
@ -149,6 +159,7 @@ def generate(**args):
""" """
# df = args['data'] # df = args['data']
df = args['data'] if not isinstance(args['data'],str) else pd.read_csv(args['data']) df = args['data'] if not isinstance(args['data'],str) else pd.read_csv(args['data'])
CONTINUOUS = args['continous'] if 'continuous' in args else [] CONTINUOUS = args['continous'] if 'continuous' in args else []
column = args['column'] if (isinstance(args['column'],list)) else [args['column']] column = args['column'] if (isinstance(args['column'],list)) else [args['column']]
# column_id = args['id'] # column_id = args['id']
@ -168,7 +179,8 @@ def generate(**args):
# values = ContinuousToDiscrete.continuous(df[col].values,BIN_SIZE) # values = ContinuousToDiscrete.continuous(df[col].values,BIN_SIZE)
# # values = np.unique(values).tolist() # # values = np.unique(values).tolist()
# else: # else:
values = df[col].unique().tolist() values = df[col].dropna().unique().tolist()
args['values'] = values args['values'] = values
args['row_count'] = df.shape[0] args['row_count'] = df.shape[0]
@ -178,8 +190,9 @@ def generate(**args):
handler.load_meta(col) handler.load_meta(col)
r = handler.apply() r = handler.apply()
BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size']) BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
_df[col] = ContinuousToDiscrete.continuous(r[col],BIN_SIZE) if col in CONTINUOUS else r[col]
# _df[col] = r[col] # _df[col] = ContinuousToDiscrete.continuous(r[col],BIN_SIZE) if col in CONTINUOUS else r[col]
_df[col] = r[col]
# #
# @TODO: log basic stats about the synthetic attribute # @TODO: log basic stats about the synthetic attribute
# #

@ -30,11 +30,13 @@ class Components :
condition = ' '.join([args['condition']['field'],args['condition']['qualifier'],'(',args['condition']['value'],')']) condition = ' '.join([args['condition']['field'],args['condition']['qualifier'],'(',args['condition']['value'],')'])
SQL = " ".join([SQL,'WHERE',condition]) SQL = " ".join([SQL,'WHERE',condition])
SQL = SQL.replace(':dataset',args['dataset']) #+ " LIMIT 1000 " SQL = SQL.replace(':dataset',args['dataset']) #+ " LI "
if 'limit' in args : if 'limit' in args :
SQL = SQL + 'LIMIT ' + args['limit'] SQL = SQL + ' LIMIT ' + args['limit']
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
df = pd.read_gbq(SQL,credentials=credentials,dialect='standard') df = pd.read_gbq(SQL,credentials=credentials,dialect='standard').astype(object)
return df return df
# return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna() # return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna()
@ -51,7 +53,8 @@ class Components :
# #
# @TODO: we need to log something here about the parameters being passed # @TODO: we need to log something here about the parameters being passed
# pointer = args['reader'] if 'reader' in args else lambda: Components.get(**args) # pointer = args['reader'] if 'reader' in args else lambda: Components.get(**args)
df = args['reader']() df = args['data']
# if df.shape[0] == 0 : # if df.shape[0] == 0 :
# print ("CAN NOT TRAIN EMPTY DATASET ") # print ("CAN NOT TRAIN EMPTY DATASET ")
@ -62,85 +65,43 @@ class Components :
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
log_folder = args['logs'] if 'logs' in args else 'logs' log_folder = args['logs'] if 'logs' in args else 'logs'
_args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} # _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
_args['gpu'] = args['gpu'] if 'gpu' in args else 0
# MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
if 'partition' not in args:
lbound = 0
# bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories)
# bounds = Components.split(df,MAX_ROWS,PART_SIZE)
columns = args['columns']
df = np.array_split(df[columns].values,PART_SIZE)
qwriter = factory.instance(type='queue.QueueWriter',args={'queue':'aou.io'})
part_index = 0
#
# let's start n processes to listen & train this mother ...
#
#-- hopefully they learn as daemons
for _df in df: # _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
# _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
# _args['logs'] = os.sep.join([log_folder,str(part_index)]) # _args['gpu'] = args['gpu'] if 'gpu' in args else 0
_args['partition'] = str(part_index)
_args['logger'] = {'args':{'dbname':'aou','doc':args['context']},'type':'mongo.MongoWriter'}
#
# We should post the the partitions to a queue server (at least the instructions on ):
# - where to get the data
# - and athe arguments to use (partition #,columns,gpu,epochs)
#
_df = pd.DataFrame(_df,columns=columns)
# print (columns)
info = {"rows":_df.shape[0],"cols":_df.shape[1], "partition":part_index,"logs":_args['logs'],"num_gpu":1,"part_size":PART_SIZE} # # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
p = {"args":_args,"data":_df.to_dict(orient="records"),"input":info} PART_SIZE = int(args['part_size'])
part_index += 1
qwriter.write(p)
#
# @TODO:
# - Notify that information was just posted to the queue
# In case we want slow-mode, we can store the partitions in mongodb and process (Yes|No)?
#
logger.write({"module":"train","action":"setup-partition","input":info}) partition = args['partition']
log_folder = os.sep.join([log_folder,args['context'],str(partition)])
_args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
pass #
# We ask the process to assume 1 gpu given the system number of GPU and that these tasks can run in parallel
#
if int(args['num_gpu']) > 1 :
_args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int)[0]
else: else:
print ('.....') _args['gpu'] = 0
partition = args['partition'] if 'partition' in args else '' _args['num_gpu'] = 1
log_folder = os.sep.join([log_folder,args['context'],str(partition)]) os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu'])
_args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
#
# We ask the process to assume 1 gpu given the system number of GPU and that these tasks can run in parallel
#
if int(args['num_gpu']) > 1 :
_args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int)[0]
else:
_args['gpu'] = 0
_args['num_gpu'] = 1
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu'])
_args['store'] = {'type':'mongo.MongoWriter','args':{'dbname':'aou','doc':args['context']}}
_args['data'] = args['data']
_args['data'] = df # print (['partition ',partition,df.value_source_concept_id.unique()])
# #
# @log : # @log :
# Logging information about the training process for this partition (or not) # Logging information about the training process for this partition (or not)
# #
info = {"rows":df.shape[0],"cols":df.shape[1], "partition":int(partition),"logs":_args['logs']} info = {"rows":df.shape[0],"cols":df.shape[1], "partition":int(partition),"logs":_args['logs']}
logger.write({"module":"train","action":"train","input":info}) logger.write({"module":"train","action":"train","input":info})
data.maker.train(**_args) data.maker.train(**_args)
pass pass
@ -210,6 +171,7 @@ class Components :
# #
#-- Let us store all of this into bigquery #-- Let us store all of this into bigquery
prefix = args['notify']+'.'+_args['context'] prefix = args['notify']+'.'+_args['context']
partition = str(partition)
table = '_'.join([prefix,partition,'io']).replace('__','_') table = '_'.join([prefix,partition,'io']).replace('__','_')
folder = os.sep.join([args['logs'],args['context'],partition,'output']) folder = os.sep.join([args['logs'],args['context'],partition,'output'])
if 'file' in args : if 'file' in args :
@ -219,17 +181,19 @@ class Components :
data_comp.to_csv( _pname,index=False) data_comp.to_csv( _pname,index=False)
_args['data'].to_csv(_fname,index=False) _args['data'].to_csv(_fname,index=False)
_id = 'path'
else: else:
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
_pname = os.sep.join([folder,table+'.csv']) _pname = os.sep.join([folder,table+'.csv'])
_fname = table.replace('_io','_full_io') _fname = table.replace('_io','_full_io')
data_comp.to_gbq(if_exists='replace',destination_table=_pname,credentials='credentials',chunk_size=50000) partial = '.'.join(['io',args['context']+'_partial_io'])
complete= '.'.join(['io',args['context']+'_full_io'])
data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=50000)
data_comp.to_csv(_pname,index=False) data_comp.to_csv(_pname,index=False)
INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append' INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append'
_args['data'].to_gbq(if_exists=INSERT_FLAG,destination_table=_fname,credentials='credentials',chunk_size=50000) _args['data'].to_gbq(if_exists=INSERT_FLAG,destination_table=complete,credentials=credentials,chunksize=50000)
_id = 'dataset'
info = {"full":{"path":_fname,"rows":_args['data'].shape[0]},"compare":{"name":_pname,"rows":data_comp.shape[0]} } info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} }
if partition : if partition :
info ['partition'] = int(partition) info ['partition'] = int(partition)
logger.write({"module":"generate","action":"write","input":info} ) logger.write({"module":"generate","action":"write","input":info} )
@ -280,18 +244,18 @@ if __name__ == '__main__' :
args['logs'] = args['logs'] if 'logs' in args else 'logs' args['logs'] = args['logs'] if 'logs' in args else 'logs'
if 'dataset' not in args : if 'dataset' not in args :
args['dataset'] = 'combined20191004v2_deid' args['dataset'] = 'combined20191004v2_deid'
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
# #
# @TODO: # @TODO:
# Log what was initiated so we have context of this processing ... # Log what was initiated so we have context of this processing ...
# #
if 'listen' not in SYS_ARGS : if 'listen' not in SYS_ARGS :
if 'file' in args : if 'file' in args :
reader = lambda: pd.read_csv(args['file']) ; DATA = pd.read_csv(args['file']) ;
else: else:
DATA = Components().get(args) DATA = Components().get(args)
reader = lambda: DATA COLUMNS = DATA.columns
args['reader'] = reader DATA = np.array_split(DATA,PART_SIZE)
if 'generate' in SYS_ARGS : if 'generate' in SYS_ARGS :
# #
@ -299,32 +263,34 @@ if __name__ == '__main__' :
content = os.listdir( os.sep.join([args['logs'],args['context']])) content = os.listdir( os.sep.join([args['logs'],args['context']]))
generator = Components() generator = Components()
DATA = reader()
if ''.join(content).isnumeric() : if ''.join(content).isnumeric() :
# #
# we have partitions we are working with # we have partitions we are working with
jobs = [] jobs = []
del args['reader']
columns = DATA.columns.tolist()
DATA = np.array_split(DATA[args['columns']],len(content))
for id in ''.join(content) : # columns = DATA.columns.tolist()
if 'focus' in args and int(args['focus']) != int(id) :
# DATA = np.array_split(DATA,PART_SIZE)
for index in range(0,PART_SIZE) :
if 'focus' in args and int(args['focus']) != index :
# #
# This handles failures/recoveries for whatever reason # This handles failures/recoveries for whatever reason
# If we are only interested in generating data for a given partition # If we are only interested in generating data for a given partition
continue continue
# index = id.index(id)
args['partition'] = id args['partition'] = index
args['data'] = pd.DataFrame(DATA[(int(id))],columns=args['columns']) args['data'] = DATA[index]
if int(args['num_gpu']) > 1 : if int(args['num_gpu']) > 1 :
args['gpu'] = id args['gpu'] = index
else: else:
args['gpu']=0 args['gpu']=0
make = lambda _args: (Components()).generate(_args) make = lambda _args: (Components()).generate(_args)
job = Process(target=make,args=(args,)) job = Process(target=make,args=(args,))
job.name = 'generator # '+str(id) job.name = 'generator # '+str(index)
job.start() job.start()
jobs.append(job) jobs.append(job)
@ -370,18 +336,26 @@ if __name__ == '__main__' :
# qreader.read(1) # qreader.read(1)
pass pass
else: else:
PART_SIZE = int(args['jobs']) if 'jobs' in args else 8
DATA = reader() # DATA = np.array_split(DATA,PART_SIZE)
DATA = np.array_split(DATA[args['columns']],PART_SIZE)
jobs = [] jobs = []
for index in range(0,int(args['jobs'])) : for index in range(0,PART_SIZE) :
if 'focus' in args and int(args['focus']) != index : if 'focus' in args and int(args['focus']) != index :
continue continue
args['part_size'] = PART_SIZE
args['partition'] = index args['partition'] = index
_df = pd.DataFrame(DATA[index],columns=args['columns']) # _df = pd.DataFrame(DATA[index],columns=args['columns'])
args['reader'] = lambda: _df args['data'] = DATA[index]
args['data'].to_csv('aou-'+str(index)+'csv',index=False)
# args['reader'] = lambda: _df
if int(args['num_gpu']) > 1 :
args['gpu'] = index
else:
args['gpu']=0
make = lambda _args: (Components()).train(**_args) make = lambda _args: (Components()).train(**_args)
job = Process(target=make,args=(args,)) job = Process(target=make,args=( dict(args),))
job.name = 'Trainer # ' + str(index) job.name = 'Trainer # ' + str(index)
job.start() job.start()
jobs.append(job) jobs.append(job)

Loading…
Cancel
Save