bug fixes: design improvements

dev
Steve L. Nyemba 4 years ago
parent 71097103da
commit 97bae5ef92

@ -1,2 +1,17 @@
import data.params as params import data.params as params
from data.params import SYS_ARGS
import transport
from multiprocessing import Process, Queue
from data.maker import prepare
class Trainer (Process) :
pass
class Maker(Process):
pass
if __name__ == '__main__' :
logger = transport.factory.instance(SYS_ARGS['store']['logger'])

@ -111,15 +111,15 @@ class GNet :
self.train_dir = os.sep.join([self.log_dir,'train',self.CONTEXT]) self.train_dir = os.sep.join([self.log_dir,'train',self.CONTEXT])
self.out_dir = os.sep.join([self.log_dir,'output',self.CONTEXT]) self.out_dir = os.sep.join([self.log_dir,'output',self.CONTEXT])
if self.logger : # if self.logger :
#
# We will clear the logs from the data-store # We will clear the logs from the data-store
#
column = self.ATTRIBUTES['synthetic'] # column = self.ATTRIBUTES['synthetic']
db = self.logger.db # db = self.logger.db
if db[column].count() > 0 : # if db[column].count() > 0 :
db.backup.insert({'name':column,'logs':list(db[column].find()) }) # db.backup.insert({'name':column,'logs':list(db[column].find()) })
db[column].drop() # db[column].drop()
def load_meta(self,column): def load_meta(self,column):
""" """
@ -127,7 +127,7 @@ class GNet :
Because prediction and training can happen independently Because prediction and training can happen independently
""" """
# suffix = "-".join(column) if isinstance(column,list)else column # suffix = "-".join(column) if isinstance(column,list)else column
suffix = self.get.suffix() suffix = self.CONTEXT #self.get.suffix()
_name = os.sep.join([self.out_dir,'meta-'+suffix+'.json']) _name = os.sep.join([self.out_dir,'meta-'+suffix+'.json'])
if os.path.exists(_name) : if os.path.exists(_name) :
attr = json.loads((open(_name)).read()) attr = json.loads((open(_name)).read())
@ -159,7 +159,7 @@ class GNet :
value= args['value'] value= args['value']
object[key] = value object[key] = value
# suffix = "-".join(self.column) if isinstance(self.column,list) else self.column # suffix = "-".join(self.column) if isinstance(self.column,list) else self.column
suffix = self.get.suffix() suffix = self.CONTEXT #self.get.suffix()
_name = os.sep.join([self.out_dir,'meta-'+suffix]) _name = os.sep.join([self.out_dir,'meta-'+suffix])
f = open(_name+'.json','w') f = open(_name+'.json','w')
@ -351,7 +351,7 @@ class Train (GNet):
self.discriminator = Discriminator(**args) self.discriminator = Discriminator(**args)
self._REAL = args['real'] self._REAL = args['real']
self._LABEL= args['label'] if 'label' in args else None self._LABEL= args['label'] if 'label' in args else None
self.column = args['column'] # self.column = args['column']
# print ([" *** ",self.BATCHSIZE_PER_GPU]) # print ([" *** ",self.BATCHSIZE_PER_GPU])
self.meta = self.log_meta() self.meta = self.log_meta()
@ -438,6 +438,11 @@ class Train (GNet):
per_gpu_w = [] per_gpu_w = []
iterator, features_placeholder, labels_placeholder = self.input_fn() iterator, features_placeholder, labels_placeholder = self.input_fn()
with tf.compat.v1.variable_scope(tf.compat.v1.get_variable_scope()): with tf.compat.v1.variable_scope(tf.compat.v1.get_variable_scope()):
#
# @TODO: Find a way to handle this across multiple CPU in case the GPU are not available
# - abstract hardware specification
# - determine if the GPU/CPU are busy
#
for i in range(self.NUM_GPUS): for i in range(self.NUM_GPUS):
with tf.device('/gpu:%d' % i): with tf.device('/gpu:%d' % i):
with tf.name_scope('%s_%d' % ('TOWER', i)) as scope: with tf.name_scope('%s_%d' % ('TOWER', i)) as scope:
@ -510,7 +515,7 @@ class Train (GNet):
# if epoch % self.MAX_EPOCHS == 0: # if epoch % self.MAX_EPOCHS == 0:
if epoch in [5,10,20,50,75, self.MAX_EPOCHS] : if epoch in [5,10,20,50,75, self.MAX_EPOCHS] :
# suffix = "-".join(self.ATTRIBUTES['synthetic']) if isinstance(self.ATTRIBUTES['synthetic'],list) else self.ATTRIBUTES['synthetic'] # suffix = "-".join(self.ATTRIBUTES['synthetic']) if isinstance(self.ATTRIBUTES['synthetic'],list) else self.ATTRIBUTES['synthetic']
suffix = self.get.suffix() suffix = self.CONTEXT #self.get.suffix()
_name = os.sep.join([self.train_dir,suffix]) _name = os.sep.join([self.train_dir,suffix])
# saver.save(sess, self.train_dir, write_meta_graph=False, global_step=epoch) # saver.save(sess, self.train_dir, write_meta_graph=False, global_step=epoch)
saver.save(sess, _name, write_meta_graph=False, global_step=epoch) saver.save(sess, _name, write_meta_graph=False, global_step=epoch)
@ -539,7 +544,8 @@ class Predict(GNet):
# self.MISSING_VALUES = np.nan_to_num(np.nan) # self.MISSING_VALUES = np.nan_to_num(np.nan)
# if 'no_value' in args and args['no_value'] not in ['na','','NA'] : # if 'no_value' in args and args['no_value'] not in ['na','','NA'] :
# self.MISSING_VALUES = args['no_value'] # self.MISSING_VALUES = args['no_value']
self.MISSING_VALUES = args['missing'] self.MISSING_VALUES = args['missing'] if 'missing' in args else []
# self.MISSING_VALUES = args['no_value'] # self.MISSING_VALUES = args['no_value']
# self.MISSING_VALUES = int(args['no_value']) if args['no_value'].isnumeric() else np.na if args['no_value'] in ['na','NA','N/A'] else args['no_value'] # self.MISSING_VALUES = int(args['no_value']) if args['no_value'].isnumeric() else np.na if args['no_value'] in ['na','NA','N/A'] else args['no_value']
@ -548,9 +554,56 @@ class Predict(GNet):
self.generator.load_meta(column) self.generator.load_meta(column)
self.ROW_COUNT = self.oROW_COUNT self.ROW_COUNT = self.oROW_COUNT
def apply(self,**args): def apply(self,**args):
suffix = self.CONTEXT #self.get.suffix()
model_dir = os.sep.join([self.train_dir,suffix+'-'+str(self.MAX_EPOCHS)])
demo = self._LABEL #np.zeros([self.ROW_COUNT,self.NUM_LABELS]) #args['de"shape":{"LABEL":list(self._LABEL.shape)} mo']
#
# setup computational graph
tf.compat.v1.reset_default_graph()
z = tf.random.normal(shape=[self.ROW_COUNT, self.Z_DIM])
y = tf.compat.v1.placeholder(shape=[self.ROW_COUNT, self.NUM_LABELS], dtype=tf.int32)
if self._LABEL is not None :
ma = [[i] for i in np.arange(self.NUM_LABELS - 2)]
label = y[:, 1] * len(ma) + tf.squeeze(tf.matmul(y[:, 2:], tf.constant(ma, dtype=tf.int32)))
else:
label = None
fake = self.generator.network(inputs=z, label=label)
init = tf.compat.v1.global_variables_initializer()
saver = tf.compat.v1.train.Saver()
df = pd.DataFrame()
CANDIDATE_COUNT = args['candidates'] if 'candidates' in args else 1 #0 if self.ROW_COUNT < 1000 else 100
candidates = []
with tf.compat.v1.Session() as sess:
saver.restore(sess, model_dir)
if self._LABEL is not None :
# labels = np.zeros((self.ROW_COUNT,self.NUM_LABELS) )
labels= demo
else:
labels = None
for i in np.arange(CANDIDATE_COUNT) :
if labels :
_matrix = sess.run(fake,feed_dict={y:labels})
else:
_matrix = sess.run(fake)
#
# if we are dealing with numeric values only we can perform a simple marginal sum against the indexes
# The code below will insure we have some acceptable cardinal relationships between id and synthetic values
#
# df = pd.DataFrame(np.round(f)).astype(np.int32)
candidates.append (np.round(_matrix).astype(np.int64))
# return candidates[0] if len(candidates) == 1 else candidates
return candidates
def _apply(self,**args):
# print (self.train_dir) # print (self.train_dir)
# suffix = "-".join(self.ATTRIBUTES['synthetic']) if isinstance(self.ATTRIBUTES['synthetic'],list) else self.ATTRIBUTES['synthetic'] # suffix = "-".join(self.ATTRIBUTES['synthetic']) if isinstance(self.ATTRIBUTES['synthetic'],list) else self.ATTRIBUTES['synthetic']
suffix = self.get.suffix() suffix = self.CONTEXT #self.get.suffix()
model_dir = os.sep.join([self.train_dir,suffix+'-'+str(self.MAX_EPOCHS)]) model_dir = os.sep.join([self.train_dir,suffix+'-'+str(self.MAX_EPOCHS)])
demo = self._LABEL #np.zeros([self.ROW_COUNT,self.NUM_LABELS]) #args['de"shape":{"LABEL":list(self._LABEL.shape)} mo'] demo = self._LABEL #np.zeros([self.ROW_COUNT,self.NUM_LABELS]) #args['de"shape":{"LABEL":list(self._LABEL.shape)} mo']
tf.compat.v1.reset_default_graph() tf.compat.v1.reset_default_graph()
@ -567,11 +620,12 @@ class Predict(GNet):
init = tf.compat.v1.global_variables_initializer() init = tf.compat.v1.global_variables_initializer()
saver = tf.compat.v1.train.Saver() saver = tf.compat.v1.train.Saver()
df = pd.DataFrame() df = pd.DataFrame()
CANDIDATE_COUNT = 10 #0 if self.ROW_COUNT < 1000 else 100 CANDIDATE_COUNT = 5 #0 if self.ROW_COUNT < 1000 else 100
NTH_VALID_CANDIDATE = count = np.random.choice(np.arange(2,60),2)[0] NTH_VALID_CANDIDATE = count = np.random.choice(np.arange(2,60),2)[0]
with tf.compat.v1.Session() as sess: with tf.compat.v1.Session() as sess:
# sess.run(init) # sess.run(init)
saver.restore(sess, model_dir) saver.restore(sess, model_dir)
if self._LABEL is not None : if self._LABEL is not None :
labels = np.zeros((self.ROW_COUNT,self.NUM_LABELS) ) labels = np.zeros((self.ROW_COUNT,self.NUM_LABELS) )
@ -585,109 +639,110 @@ class Predict(GNet):
__ratio=0 __ratio=0
for i in np.arange(CANDIDATE_COUNT) : for i in np.arange(CANDIDATE_COUNT) :
if labels : if labels :
f = sess.run(fake,feed_dict={y:labels}) _matrix = sess.run(fake,feed_dict={y:labels})
else: else:
f = sess.run(fake) _matrix = sess.run(fake)
# #
# if we are dealing with numeric values only we can perform a simple marginal sum against the indexes # if we are dealing with numeric values only we can perform a simple marginal sum against the indexes
# The code below will insure we have some acceptable cardinal relationships between id and synthetic values # The code below will insure we have some acceptable cardinal relationships between id and synthetic values
# #
# df = pd.DataFrame(np.round(f)).astype(np.int32) # df = pd.DataFrame(np.round(f)).astype(np.int32)
df = pd.DataFrame(np.round(f),dtype=int) found.append (np.round(_matrix).astype(np.int64))
# df = pd.DataFrame(np.round(_matrix),dtype=int)
p = 0 not in df.sum(axis=1).values p = 0 not in df.sum(axis=1).values
x = df.sum(axis=1).values # x = df.sum(axis=1).values
if np.divide( np.sum(x), x.size) > .9 or p and np.sum(x) == x.size : # if np.divide( np.sum(x), x.size) > .9 or p and np.sum(x) == x.size :
ratio.append(np.divide( np.sum(x), x.size)) # ratio.append(np.divide( np.sum(x), x.size))
found.append(df) # found.append(df)
# break # # break
if len(found) == CANDIDATE_COUNT: # if len(found) == CANDIDATE_COUNT:
break # break
else: # else:
__x__ = df if __x__ is None or np.where(x > 0)[0].size > np.where(__x__ > 0)[0].size else __x__ # __x__ = df if __x__ is None or np.where(x > 0)[0].size > np.where(__x__ > 0)[0].size else __x__
__ratio = np.divide( np.sum(x), x.size) if __x__ is None or np.where(x > 0)[0].size > np.where(__x__ > 0)[0].size else __ratio # __ratio = np.divide( np.sum(x), x.size) if __x__ is None or np.where(x > 0)[0].size > np.where(__x__ > 0)[0].size else __ratio
continue # continue
# i = df.T.index.astype(np.int32) #-- These are numeric pseudonyms # i = df.T.index.astype(np.int32) #-- These are numeric pseudonyms
# df = (i * df).sum(axis=1) # df = (i * df).sum(axis=1)
# #
# In case we are dealing with actual values like diagnosis codes we can perform # In case we are dealing with actual values like diagnosis codes we can perform
# #
N = len(found) # N = len(found)
_index = [i for i in range(0,N) if found[i].shape[1] == len(self.values)] # _index = [i for i in range(0,N) if found[i].shape[1] == len(self.values)]
if not _index and not found : # if not _index and not found :
df = __x__ # df = __x__
INDEX = -1 # INDEX = -1
else : # else :
if not _index : # if not _index :
INDEX = np.random.choice(np.arange(len(found)),1)[0] # INDEX = np.random.choice(np.arange(len(found)),1)[0]
INDEX = ratio.index(np.max(ratio)) # INDEX = ratio.index(np.max(ratio))
else: # else:
INDEX = _index[0] # INDEX = _index[0]
df = found[INDEX] # df = found[INDEX]
columns = self.ATTRIBUTES['synthetic'] if isinstance(self.ATTRIBUTES['synthetic'],list)else [self.ATTRIBUTES['synthetic']] # columns = self.ATTRIBUTES['synthetic'] if isinstance(self.ATTRIBUTES['synthetic'],list)else [self.ATTRIBUTES['synthetic']]
# r = np.zeros((self.ROW_COUNT,len(columns))) # r = np.zeros((self.ROW_COUNT,len(columns)))
# r = np.zeros(self.ROW_COUNT) # r = np.zeros(self.ROW_COUNT)
if self.logger : # if self.logger :
info = {"found":len(found),"rows":df.shape[0],"cols":df.shape[1],"expected":len(self.values)} # info = {"found":len(found),"rows":df.shape[0],"cols":df.shape[1],"expected":len(self.values)}
if df.shape[1] > len(self.values) : # if df.shape[1] > len(self.values) :
df = df.iloc[:len(self.values)] # df = df.iloc[:len(self.values)]
if INDEX > 0 : # if INDEX > 0 :
info =dict(info ,**{"selected":INDEX, "ratio": ratio[INDEX] }) # info =dict(info ,**{"selected":INDEX, "ratio": ratio[INDEX] })
else : # else :
info['selected'] = -1 # info['selected'] = -1
info['ratio'] = __ratio # info['ratio'] = __ratio
info['partition'] = self.PARTITION # info['partition'] = self.PARTITION
self.logger.write({"module":"gan-generate","action":"generate","input":info}) # self.logger.write({"module":"gan-generate","action":"generate","input":info})
# df.columns = self.values # # df.columns = self.values
if len(found) or df.columns.size <= len(self.values): # if len(found) or df.columns.size <= len(self.values):
ii = df.apply(lambda row: np.sum(row) == 0 ,axis=1) # ii = df.apply(lambda row: np.sum(row) == 0 ,axis=1)
missing = []
if ii.sum() > 0 :
#
# If the generator had a reductive effect we should be able to get random values from either :
# - The space of outliers
# - existing values for smaller spaces that have suffered over training
#
N = ii.sum()
missing_values = self.MISSING_VALUES if self.MISSING_VALUES else self.values
missing = np.random.choice(missing_values,N)
# missing = [] # missing = []
# # if ii.sum() > 0 :
# @TODO: # #
# Log the findings here in terms of ratio, missing, candidate count # # If the generator had a reductive effect we should be able to get random values from either :
# print ([np.max(ratio),len(missing),len(found),i]) # # - The space of outliers
i = np.where(ii == 0)[0] # # - existing values for smaller spaces that have suffered over training
# #
# N = ii.sum()
# missing_values = self.MISSING_VALUES if self.MISSING_VALUES else self.values
# missing = np.random.choice(missing_values,N)
# # missing = []
# #
# # @TODO:
# # Log the findings here in terms of ratio, missing, candidate count
# # print ([np.max(ratio),len(missing),len(found),i])
# i = np.where(ii == 0)[0]
df = pd.DataFrame( df.iloc[i].apply(lambda row: self.values[np.random.choice(np.where(row != 0)[0],1)[0]] ,axis=1))
df.columns = columns
df = df[columns[0]].append(pd.Series(missing))
# df = pd.DataFrame( df.iloc[i].apply(lambda row: self.values[np.random.choice(np.where(row != 0)[0],1)[0]] ,axis=1))
# df.columns = columns
# df = df[columns[0]].append(pd.Series(missing))
if self.logger :
info= {"missing": i.size,"rows":df.shape[0],"cols":1,'partition':self.PARTITION} # if self.logger :
self.logger.write({"module":"gan-generate","action":"compile.io","input":info})
# info= {"missing": i.size,"rows":df.shape[0],"cols":1,'partition':self.PARTITION}
# self.logger.write({"module":"gan-generate","action":"compile.io","input":info})
# print(df.head()) # print(df.head())
tf.compat.v1.reset_default_graph() tf.compat.v1.reset_default_graph()
df = pd.DataFrame(df) # df = pd.DataFrame(df)
df.columns = columns # df.columns = columns
np.random.shuffle(df[columns[0]].values) # np.random.shuffle(df[columns[0]].values)
return df.to_dict(orient='list') # return df.to_dict(orient='list')
return _matrix
if __name__ == '__main__' : if __name__ == '__main__' :

@ -14,6 +14,11 @@ import data.gan as gan
from transport import factory from transport import factory
from data.bridge import Binary from data.bridge import Binary
import threading as thread import threading as thread
from data.maker import prepare
import copy
import os
import json
class ContinuousToDiscrete : class ContinuousToDiscrete :
ROUND_UP = 2 ROUND_UP = 2
@staticmethod @staticmethod
@ -77,8 +82,62 @@ class ContinuousToDiscrete :
def train (**_args):
"""
:params sql
:params store
"""
#
# Let us prepare the data by calling the utility function
#
if 'file' in _args :
#
# We are reading data from a file
_args['data'] = pd.read_csv(_args['file'])
else:
#
# data will be read from elsewhere (a data-store)...
pass
# if 'ignore' in _args and 'columns' in _args['ignore']:
def train (**args) : _inputhandler = prepare.Input(**_args)
values,_matrix = _inputhandler.convert()
args = {"real":_matrix,"context":_args['context']}
_map = {}
if 'store' in _args :
#
# This
args['store'] = copy.deepcopy(_args['store']['logs'])
args['store']['args']['doc'] = _args['context']
logger = factory.instance(**args['store'])
args['logger'] = logger
for key in _inputhandler._map :
beg = _inputhandler._map[key]['beg']
end = _inputhandler._map[key]['end']
values = _inputhandler._map[key]['values'].tolist()
_map[key] = {"beg":beg,"end":end,"values":np.array(values).astype(str).tolist()}
info = {"rows":_matrix.shape[0],"cols":_matrix.shape[1],"map":_map}
logger.write({"module":"gan-train","action":"data-prep","context":_args['context'],"input":info})
args['logs'] = _args['logs'] if 'logs' in _args else 'logs'
args ['max_epochs'] = _args['max_epochs']
args['matrix_size'] = _matrix.shape[0]
args['batch_size'] = 2000
args['partition'] = 0 if 'partition' not in _args else _args['partition']
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0'
trainer = gan.Train(**args)
#
# @TODO: Write the map.json in the output directory for the logs
#
f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json']),'w')
f.write(json.dumps(_map))
f.close()
trainer.apply()
pass
def _train (**args) :
""" """
This function is intended to train the GAN in order to learn about the distribution of the features This function is intended to train the GAN in order to learn about the distribution of the features
:column columns that need to be synthesized (discrete) :column columns that need to be synthesized (discrete)
@ -122,18 +181,53 @@ def train (**args) :
# If the s # If the s
trainer = gan.Train(**args) trainer = gan.Train(**args)
trainer.apply() trainer.apply()
def post(**args):
"""
This uploads the tensorflow checkpoint to a data-store (mongodb, biguqery, s3)
"""
pass
def get(**args): def get(**args):
""" """
This function will restore a checkpoint from a persistant storage on to disk This function will restore a checkpoint from a persistant storage on to disk
""" """
pass pass
def generate(**args): def generate(**_args):
"""
This function will generate a set of records, before we must load the parameters needed
:param data
:param context
:param logs
"""
f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json']))
_map = json.loads(f.read())
f.close()
if 'file' in _args :
df = pd.read_csv(_args['file'])
else:
df = _args['data'] if not isinstance(_args['data'],str) else pd.read_csv(_args['data'])
args = {"context":_args['context'],"max_epochs":_args['max_epochs'],"candidates":_args['candidates']}
args['logs'] = _args['logs'] if 'logs' in _args else 'logs'
args ['max_epochs'] = _args['max_epochs']
# args['matrix_size'] = _matrix.shape[0]
args['batch_size'] = 2000
args['partition'] = 0 if 'partition' not in _args else _args['partition']
args['row_count'] = df.shape[0]
#
# @TODO: perhaps get the space of values here ... (not sure it's a good idea)
#
_args['map'] = _map
_inputhandler = prepare.Input(**_args)
values,_matrix = _inputhandler.convert()
args['values'] = np.array(values)
if 'gpu' in _args :
os.environ['CUDA_VISIBLE_DEVICES'] = str(_args['gpu'])
handler = gan.Predict (**args)
handler.load_meta(None)
#
# Let us now format the matrices as we expect them to be
#
candidates = handler.apply(candidates=args['candidates'])
return [_inputhandler.revert(matrix=_matrix) for _matrix in candidates]
def _generate(**args):
""" """
This function will generate a synthetic dataset on the basis of a model that has been learnt for the dataset This function will generate a synthetic dataset on the basis of a model that has been learnt for the dataset
@return pandas.DataFrame @return pandas.DataFrame

@ -1,32 +0,0 @@
import pandas as pd
import data.maker
from data.params import SYS_ARGS
import json
from scipy.stats import wasserstein_distance as wd
import risk
import numpy as np
if 'config' in SYS_ARGS :
ARGS = json.loads(open(SYS_ARGS['config']).read())
if 'generate' not in SYS_ARGS :
data.maker.train(**ARGS)
else:
#
#
ARGS['no_value'] = ''
_df = data.maker.generate(**ARGS)
odf = pd.read_csv (ARGS['data'])
odf.columns = [name.lower() for name in odf.columns]
column = ARGS['column'] if isinstance(ARGS['column'],list) else [ARGS['column']]
# print (odf.head())
# print (_df.head())
print(odf.join(_df[column],rsuffix='_io'))
# print (_df[column].risk.evaluate(flag='synth'))
# print (odf[column].risk.evaluate(flag='original'))
# _x = pd.get_dummies(_df[column]).values
# y = pd.get_dummies(odf[column]).values
# N = _df.shape[0]
# print (np.mean([ wd(_x[i],y[i])for i in range(0,N)]))
# print (wd(_x[0],y[0]) )
# column = SYS_ARGS['column']
# odf = open(SYS_ARGS['data'])

@ -9,7 +9,7 @@ import pandas as pd
from google.oauth2 import service_account from google.oauth2 import service_account
from google.cloud import bigquery as bq from google.cloud import bigquery as bq
import data.maker import data.maker
import copy
from data.params import SYS_ARGS from data.params import SYS_ARGS
# #
@ -69,53 +69,45 @@ class Components :
This function will perform training on the basis of a given pointer that reads data This function will perform training on the basis of a given pointer that reads data
""" """
# schema = None
# @TODO: we need to log something here about the parameters being passed if 'file' in args :
# pointer = args['reader'] if 'reader' in args else lambda: Components.get(**args)
df = args['data']
#
# Now we can parse the arguments and submit the entire thing to training
#
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) df = pd.read_csv(args['file'])
log_folder = args['logs'] if 'logs' in args else 'logs' del args['file']
PART_SIZE = int(args['part_size']) elif 'data' not in args :
reader = factory.instance(**args['store']['source'])
partition = args['partition'] if 'row_limit' in args :
log_folder = os.sep.join([log_folder,args['context'],str(partition)]) df = reader.read(sql=args['sql'],limit=args['row_limit'])
_args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
if 'batch_size' in args :
_args['batch_size'] = int(args['batch_size'])
_args['matrix_size'] = args['matrix_size'] if 'matrix_size' in args else 128 #
# We ask the process to assume 1 gpu given the system number of GPU and that these tasks can run in parallel
#
if int(args['num_gpu']) > 1 :
_args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int)
else: else:
_args['gpu'] = 0 df = reader.read(sql=args['sql'])
_args['num_gpu'] = 1 schema = reader.meta(table=args['from']) if hasattr(reader,'meta') and 'from' in args else None
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) else:
_args['partition'] = int(partition) df = args['data']
_args['continuous']= args['continuous'] if 'continuous' in args else []
_args['store'] = {'type':'mongo.MongoWriter','args':{'dbname':'aou','doc':args['context']}}
_args['data'] = args['data']
# print (['partition ',partition,df.value_source_concept_id.unique()])
#
# @log :
# Logging information about the training process for this partition (or not)
#
info = {"rows":df.shape[0],"cols":df.shape[1], "partition":int(partition),"logs":_args['logs']}
logger.write({"module":"train","action":"train","input":info}) # df = df.fillna('')
if schema :
_schema = {}
for _item in schema :
_type = int
_value = 0
if _item.field_type == 'FLOAT' :
_type =float
elif _item.field_type != 'INTEGER' :
_type = str
_value = ''
_schema[_item.name] = _type
df[_item.name] = df[_item.name].fillna(_value).astype(_type)
args['schema'] = _schema
# df[_item.name] = df[_item.name].astype(_type)
_args = copy.deepcopy(args)
# _args['store'] = args['store']['source']
_args['data'] = df
data.maker.train(**_args) data.maker.train(**_args)
if 'autopilot' in ( list(args.keys())) : if 'autopilot' in ( list(args.keys())) :
print (['autopilot mode enabled ....']) print (['autopilot mode enabled ....',args['context']])
self.generate(args) self.generate(args)
pass pass
@ -129,141 +121,167 @@ class Components :
""" """
This function will generate data and store it to a given, This function will generate data and store it to a given,
""" """
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) store = args['store']['logs']
log_folder = args['logs'] if 'logs' in args else 'logs' store['doc'] = args['context']
partition = args['partition'] if 'partition' in args else '' logger = factory.instance(**store) #type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
log_folder = os.sep.join([log_folder,args['context'],str(partition)])
ostore = args['store']['target']
_args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} writer = factory.instance(**ostore)
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) # log_folder = args['logs'] if 'logs' in args else 'logs'
# partition = args['partition'] if 'partition' in args else ''
# log_folder = os.sep.join([log_folder,args['context'],str(partition)])
# _args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
# _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
# _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 # _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
if 'batch_size' in args : # if 'batch_size' in args :
_args['batch_size'] = int(args['batch_size']) # _args['batch_size'] = int(args['batch_size'])
if int(args['num_gpu']) > 1 : # if int(args['num_gpu']) > 1 :
_args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int) # _args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int)
else: # else:
_args['gpu'] = 0 # _args['gpu'] = 0
_args['num_gpu'] = 1 # _args['num_gpu'] = 1
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) # os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu'])
# _args['no_value']= args['no_value'] # # _args['no_value']= args['no_value']
_args['matrix_size'] = args['matrix_size'] if 'matrix_size' in args else 128 # _args['matrix_size'] = args['matrix_size'] if 'matrix_size' in args else 128
# MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 # # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 # PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
# credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') # credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
# _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna() # _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna()
# reader = args['reader'] # reader = args['reader']
# df = reader() # df = reader()
df = args['reader']() if 'reader' in args else args['data'] schema = args['schema'] if 'schema' in args else None
if 'file' in args :
# if 'slice' in args and 'max_rows' in args['slice']: df = pd.read_csv(args['file'])
else:
if 'data' not in args :
reader = factory.instance(**args['store']['source'])
if 'row_limit' in args :
df = reader.read(sql=args['sql'],limit=args['row_limit'])
else:
df = reader.read(sql=args['sql'])
if 'schema' not in args and hasattr(reader,'meta'):
schema = reader.meta(table=args['from'])
# max_rows = args['slice']['max_rows']
# if df.shape[0] > max_rows :
# print (".. slicing ")
# i = np.random.choice(df.shape[0],max_rows,replace=False)
# df = df.iloc[i]
else:
#
# This will account for autopilot mode ...
df = args['data']
_info = {"module":"gan-prep","action":"read","shape":{"rows":df.shape[0],"columns":df.shape[0]}}
# bounds = Components.split(df,MAX_ROWS,PART_SIZE)
# if partition != '' : _dc = pd.DataFrame()
# columns = args['columns'] # for mdf in df :
# df = np.array_split(df[columns].values,PART_SIZE) args['data'] = df
# df = pd.DataFrame(df[ int (partition) ],columns = columns) args['candidates'] = 1 if 'candidates' not in args else int(args['candidates'])
# max_rows = int(args['partition_max_rows']) if 'partition_max_rows' in args else 1000000
# N = np.divide(df.shape[0],max_rows).astype(int) + 1 candidates = (data.maker.generate(**args))
info = {"name":args['columns'],"parition":int(partition),"gpu":_args["gpu"],"rows":int(df.shape[0]),"cols":int(df.shape[1]),"space":df[args['columns'][0]].unique().size, "part_size":int(PART_SIZE)} if 'sql.BQWriter' in ostore['type'] :
logger.write({"module":"generate","action":"partition","input":info}) #table = ".".join([ostore['['dataset'],args['context']])
_args['partition'] = int(partition) # writer = factory.instance(**ostore)
_args['continuous']= args['continuous'] if 'continuous' in args else [] _columns = None
skip_columns = []
_schema = [{"name":field.name,"type":field.field_type,"description":field.description} for field in schema] if schema else []
for _df in candidates :
# #
# How many rows sub-partition must we divide this into ? # we need to format the fields here to make sure we have something cohesive
# let us fix the data types here every _id field will be an np.int64...
# #
schema = args['schema'] if not skip_columns :
for item in schema : # _columns = set(df.columns) - set(_df.columns)
if item.field_type == 'INTEGER' and df[item.name].dtype != np.int64: if 'ignore' in args and 'columns' in args['ignore'] :
df[item.name] = np.array(df[item.name].values,dtype=np.int64)
elif item.field_type == 'STRING' and df[item.name].dtype != object :
df[item.name] = np.array(df[item.name],dtype=object)
for name in args['ignore']['columns'] :
for _name in _df.columns:
if _name in name:
skip_columns.append(_name)
#
# We perform a series of set operations to insure that the following conditions are met:
# - the synthetic dataset only has fields that need to be synthesized
# - The original dataset has all the fields except those that need to be synthesized
#
_df = _df[list(set(_df.columns) - set(skip_columns))]
# for name in df.columns.tolist(): if set(df.columns) & set(_df.columns) :
_columns = set(df.columns) - set(_df.columns)
df = df[_columns]
# if name.endswith('_id') : #
# if df[name].isnull().sum() > 0 and name not in ['unique_device_id']: # Let us merge the dataset here and and have a comprehensive dataset
# df[name].fillna(np.nan_to_num(np.nan),inplace=True)
# df[name] = df[name].astype(int)
_df = pd.DataFrame.join(df,_df)
_dc = pd.DataFrame() writer.write(_df,schema=_schema,table=args['from'])
# for mdf in df : # writer.write(df,table=table)
_args['data'] = df pass
else:
pass
_dc = _dc.append(data.maker.generate(**_args))
# # #
# We need to post the generate the data in order to : # # We need to post the generate the data in order to :
# 1. compare immediately # # 1. compare immediately
# 2. synthetic copy # # 2. synthetic copy
# # #
cols = _dc.columns.tolist() # cols = _dc.columns.tolist()
data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query) # data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query)
# # #
# performing basic analytics on the synthetic data generated (easy to quickly asses) # # performing basic analytics on the synthetic data generated (easy to quickly asses)
# # #
info = {"module":"generate","action":"io.metrics","input":{"rows":data_comp.shape[0],"partition":partition,"logs":[]}} # info = {"module":"generate","action":"io.metrics","input":{"rows":data_comp.shape[0],"partition":partition,"logs":[]}}
# # #
# @TODO: Send data over to a process for analytics # # @TODO: Send data over to a process for analytics
base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it) # base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it)
cols = _dc.columns.tolist() # cols = _dc.columns.tolist()
for name in cols : # for name in cols :
_args['data'][name] = _dc[name] # _args['data'][name] = _dc[name]
# # #
#-- Let us store all of this into bigquery # #-- Let us store all of this into bigquery
prefix = args['notify']+'.'+_args['context'] # prefix = args['notify']+'.'+_args['context']
partition = str(partition) # partition = str(partition)
table = '_'.join([prefix,partition,'io']).replace('__','_') # table = '_'.join([prefix,partition,'io']).replace('__','_')
folder = os.sep.join([args['logs'],args['context'],partition,'output']) # folder = os.sep.join([args['logs'],args['context'],partition,'output'])
if 'file' in args : # if 'file' in args :
_fname = os.sep.join([folder,table.replace('_io','_full_io.csv')]) # _fname = os.sep.join([folder,table.replace('_io','_full_io.csv')])
_pname = os.sep.join([folder,table])+'.csv' # _pname = os.sep.join([folder,table])+'.csv'
data_comp.to_csv( _pname,index=False) # data_comp.to_csv( _pname,index=False)
_args['data'].to_csv(_fname,index=False) # _args['data'].to_csv(_fname,index=False)
_id = 'path' # _id = 'path'
else: # else:
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') # credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
_pname = os.sep.join([folder,table+'.csv']) # _pname = os.sep.join([folder,table+'.csv'])
_fname = table.replace('_io','_full_io') # _fname = table.replace('_io','_full_io')
partial = '.'.join(['io',args['context']+'_partial_io']) # partial = '.'.join(['io',args['context']+'_partial_io'])
complete= '.'.join(['io',args['context']+'_full_io']) # complete= '.'.join(['io',args['context']+'_full_io'])
data_comp.to_csv(_pname,index=False) # data_comp.to_csv(_pname,index=False)
if 'dump' in args : # if 'dump' in args :
print (_args['data'].head()) # print (_args['data'].head())
else: # else:
Components.lock.acquire() # Components.lock.acquire()
data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) # data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
_args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000) # _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000)
Components.lock.release() # Components.lock.release()
_id = 'dataset' # _id = 'dataset'
info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} } # info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} }
if partition : # if partition :
info ['partition'] = int(partition) # info ['partition'] = int(partition)
logger.write({"module":"generate","action":"write","input":info} ) # logger.write({"module":"generate","action":"write","input":info} )
@ -308,98 +326,95 @@ if __name__ == '__main__' :
# Log what was initiated so we have context of this processing ... # Log what was initiated so we have context of this processing ...
# #
# if 'listen' not in SYS_ARGS : # if 'listen' not in SYS_ARGS :
if 'file' in args : # if 'file' in args :
DATA = pd.read_csv(args['file']) ; # DATA = pd.read_csv(args['file']) ;
schema = [] # schema = []
else: # else:
DATA = Components().get(args) # DATA = Components().get(args)
client = bq.Client.from_service_account_json(args["private_key"]) # client = bq.Client.from_service_account_json(args["private_key"])
schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema # schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema
COLUMNS = DATA.columns # COLUMNS = DATA.columns
DATA = np.array_split(DATA,PART_SIZE) # DATA = np.array_split(DATA,PART_SIZE)
args['schema'] = schema # args['schema'] = schema
if 'generate' in SYS_ARGS : if 'generate' in SYS_ARGS :
# #
# Let us see if we have partitions given the log folder # Let us see if we have partitions given the log folder
content = os.listdir( os.sep.join([args['logs'],args['context']])) content = os.listdir( os.sep.join([args['logs'],'train',args['context']]))
generator = Components() generator = Components()
if ''.join(content).isnumeric() : # if ''.join(content).isnumeric() :
# # #
# we have partitions we are working with # # we have partitions we are working with
jobs = [] # jobs = []
# columns = DATA.columns.tolist() # # columns = DATA.columns.tolist()
# DATA = np.array_split(DATA,PART_SIZE) # # DATA = np.array_split(DATA,PART_SIZE)
for index in range(0,PART_SIZE) : # for index in range(0,PART_SIZE) :
if 'focus' in args and int(args['focus']) != index : # if 'focus' in args and int(args['focus']) != index :
# # #
# This handles failures/recoveries for whatever reason # # This handles failures/recoveries for whatever reason
# If we are only interested in generating data for a given partition # # If we are only interested in generating data for a given partition
continue # continue
# index = id.index(id) # # index = id.index(id)
args['partition'] = index # args['partition'] = index
args['data'] = DATA[index] # args['data'] = DATA[index]
if int(args['num_gpu']) > 1 : # if int(args['num_gpu']) > 1 :
args['gpu'] = index # args['gpu'] = index
else: # else:
args['gpu']=0 # args['gpu']=0
make = lambda _args: (Components()).generate(_args) # make = lambda _args: (Components()).generate(_args)
job = Process(target=make,args=(args,)) # job = Process(target=make,args=(args,))
job.name = 'generator # '+str(index) # job.name = 'generator # '+str(index)
job.start() # job.start()
jobs.append(job) # jobs.append(job)
# if len(jobs) == 1 : # # if len(jobs) == 1 :
# job.join() # # job.join()
print (["Started ",len(jobs),"generators" if len(jobs)>1 else "generator" ]) # print (["Started ",len(jobs),"generators" if len(jobs)>1 else "generator" ])
while len(jobs)> 0 : # while len(jobs)> 0 :
jobs = [job for job in jobs if job.is_alive()] # jobs = [job for job in jobs if job.is_alive()]
time.sleep(2) # time.sleep(2)
# # generator.generate(args)
# else:
# generator.generate(args) # generator.generate(args)
else:
generator.generate(args)
# Components.generate(args) # Components.generate(args)
elif 'shuffle' in SYS_ARGS: generator.generate(args)
for data in DATA :
args['data'] = data
_df = (Components()).shuffle(args)
else: else:
# DATA = np.array_split(DATA,PART_SIZE) # DATA = np.array_split(DATA,PART_SIZE)
agent = Components()
jobs = [] agent.train(**args)
for index in range(0,PART_SIZE) : # jobs = []
if 'focus' in args and int(args['focus']) != index : # for index in range(0,PART_SIZE) :
continue # if 'focus' in args and int(args['focus']) != index :
args['part_size'] = PART_SIZE # continue
args['partition'] = index # args['part_size'] = PART_SIZE
args['data'] = DATA[index] # args['partition'] = index
if int(args['num_gpu']) > 1 : # args['data'] = DATA[index]
args['gpu'] = index # if int(args['num_gpu']) > 1 :
else: # args['gpu'] = index
args['gpu']=0 # else:
# args['gpu']=0
make = lambda _args: (Components()).train(**_args)
job = Process(target=make,args=( dict(args),)) # make = lambda _args: (Components()).train(**_args)
job.name = 'Trainer # ' + str(index) # job = Process(target=make,args=( dict(args),))
job.start() # job.name = 'Trainer # ' + str(index)
jobs.append(job) # job.start()
# args['gpu'] # jobs.append(job)
print (["Started ",len(jobs),"trainers" if len(jobs)>1 else "trainer" ]) # # args['gpu']
while len(jobs)> 0 : # print (["Started ",len(jobs),"trainers" if len(jobs)>1 else "trainer" ])
jobs = [job for job in jobs if job.is_alive()] # while len(jobs)> 0 :
time.sleep(2) # jobs = [job for job in jobs if job.is_alive()]
# time.sleep(2)
# trainer = Components() # trainer = Components()
# trainer.train(**args) # trainer.train(**args)

Loading…
Cancel
Save