From 97bae5ef92a9dbf9c53ec2dbfe854e099612d67e Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Mon, 29 Mar 2021 11:10:57 -0500 Subject: [PATCH] bug fixes: design improvements --- data/__init__.py | 15 ++ data/gan.py | 225 ++++++++++++-------- data/maker/__init__.py | 110 +++++++++- data/maker/__main__.py | 32 --- pipeline.py | 471 +++++++++++++++++++++-------------------- 5 files changed, 500 insertions(+), 353 deletions(-) delete mode 100644 data/maker/__main__.py diff --git a/data/__init__.py b/data/__init__.py index 98124f1..0ca216d 100644 --- a/data/__init__.py +++ b/data/__init__.py @@ -1,2 +1,17 @@ import data.params as params +from data.params import SYS_ARGS +import transport +from multiprocessing import Process, Queue +from data.maker import prepare +class Trainer (Process) : + pass +class Maker(Process): + pass + +if __name__ == '__main__' : + + logger = transport.factory.instance(SYS_ARGS['store']['logger']) + + + \ No newline at end of file diff --git a/data/gan.py b/data/gan.py index 1418a04..e7ab6cf 100644 --- a/data/gan.py +++ b/data/gan.py @@ -111,15 +111,15 @@ class GNet : self.train_dir = os.sep.join([self.log_dir,'train',self.CONTEXT]) self.out_dir = os.sep.join([self.log_dir,'output',self.CONTEXT]) - if self.logger : - # - # We will clear the logs from the data-store - # - column = self.ATTRIBUTES['synthetic'] - db = self.logger.db - if db[column].count() > 0 : - db.backup.insert({'name':column,'logs':list(db[column].find()) }) - db[column].drop() + # if self.logger : + + # We will clear the logs from the data-store + + # column = self.ATTRIBUTES['synthetic'] + # db = self.logger.db + # if db[column].count() > 0 : + # db.backup.insert({'name':column,'logs':list(db[column].find()) }) + # db[column].drop() def load_meta(self,column): """ @@ -127,7 +127,7 @@ class GNet : Because prediction and training can happen independently """ # suffix = "-".join(column) if isinstance(column,list)else column - suffix = self.get.suffix() + suffix = self.CONTEXT #self.get.suffix() _name = os.sep.join([self.out_dir,'meta-'+suffix+'.json']) if os.path.exists(_name) : attr = json.loads((open(_name)).read()) @@ -159,7 +159,7 @@ class GNet : value= args['value'] object[key] = value # suffix = "-".join(self.column) if isinstance(self.column,list) else self.column - suffix = self.get.suffix() + suffix = self.CONTEXT #self.get.suffix() _name = os.sep.join([self.out_dir,'meta-'+suffix]) f = open(_name+'.json','w') @@ -351,7 +351,7 @@ class Train (GNet): self.discriminator = Discriminator(**args) self._REAL = args['real'] self._LABEL= args['label'] if 'label' in args else None - self.column = args['column'] + # self.column = args['column'] # print ([" *** ",self.BATCHSIZE_PER_GPU]) self.meta = self.log_meta() @@ -438,6 +438,11 @@ class Train (GNet): per_gpu_w = [] iterator, features_placeholder, labels_placeholder = self.input_fn() with tf.compat.v1.variable_scope(tf.compat.v1.get_variable_scope()): + # + # @TODO: Find a way to handle this across multiple CPU in case the GPU are not available + # - abstract hardware specification + # - determine if the GPU/CPU are busy + # for i in range(self.NUM_GPUS): with tf.device('/gpu:%d' % i): with tf.name_scope('%s_%d' % ('TOWER', i)) as scope: @@ -510,7 +515,7 @@ class Train (GNet): # if epoch % self.MAX_EPOCHS == 0: if epoch in [5,10,20,50,75, self.MAX_EPOCHS] : # suffix = "-".join(self.ATTRIBUTES['synthetic']) if isinstance(self.ATTRIBUTES['synthetic'],list) else self.ATTRIBUTES['synthetic'] - suffix = self.get.suffix() + suffix = self.CONTEXT #self.get.suffix() _name = os.sep.join([self.train_dir,suffix]) # saver.save(sess, self.train_dir, write_meta_graph=False, global_step=epoch) saver.save(sess, _name, write_meta_graph=False, global_step=epoch) @@ -539,7 +544,8 @@ class Predict(GNet): # self.MISSING_VALUES = np.nan_to_num(np.nan) # if 'no_value' in args and args['no_value'] not in ['na','','NA'] : # self.MISSING_VALUES = args['no_value'] - self.MISSING_VALUES = args['missing'] + self.MISSING_VALUES = args['missing'] if 'missing' in args else [] + # self.MISSING_VALUES = args['no_value'] # self.MISSING_VALUES = int(args['no_value']) if args['no_value'].isnumeric() else np.na if args['no_value'] in ['na','NA','N/A'] else args['no_value'] @@ -548,9 +554,56 @@ class Predict(GNet): self.generator.load_meta(column) self.ROW_COUNT = self.oROW_COUNT def apply(self,**args): + suffix = self.CONTEXT #self.get.suffix() + model_dir = os.sep.join([self.train_dir,suffix+'-'+str(self.MAX_EPOCHS)]) + demo = self._LABEL #np.zeros([self.ROW_COUNT,self.NUM_LABELS]) #args['de"shape":{"LABEL":list(self._LABEL.shape)} mo'] + # + # setup computational graph + tf.compat.v1.reset_default_graph() + z = tf.random.normal(shape=[self.ROW_COUNT, self.Z_DIM]) + + y = tf.compat.v1.placeholder(shape=[self.ROW_COUNT, self.NUM_LABELS], dtype=tf.int32) + if self._LABEL is not None : + ma = [[i] for i in np.arange(self.NUM_LABELS - 2)] + label = y[:, 1] * len(ma) + tf.squeeze(tf.matmul(y[:, 2:], tf.constant(ma, dtype=tf.int32))) + else: + label = None + + fake = self.generator.network(inputs=z, label=label) + init = tf.compat.v1.global_variables_initializer() + saver = tf.compat.v1.train.Saver() + df = pd.DataFrame() + CANDIDATE_COUNT = args['candidates'] if 'candidates' in args else 1 #0 if self.ROW_COUNT < 1000 else 100 + candidates = [] + + with tf.compat.v1.Session() as sess: + saver.restore(sess, model_dir) + if self._LABEL is not None : + # labels = np.zeros((self.ROW_COUNT,self.NUM_LABELS) ) + labels= demo + else: + labels = None + + for i in np.arange(CANDIDATE_COUNT) : + if labels : + _matrix = sess.run(fake,feed_dict={y:labels}) + else: + _matrix = sess.run(fake) + # + # if we are dealing with numeric values only we can perform a simple marginal sum against the indexes + # The code below will insure we have some acceptable cardinal relationships between id and synthetic values + # + + # df = pd.DataFrame(np.round(f)).astype(np.int32) + candidates.append (np.round(_matrix).astype(np.int64)) + # return candidates[0] if len(candidates) == 1 else candidates + + return candidates + + def _apply(self,**args): # print (self.train_dir) # suffix = "-".join(self.ATTRIBUTES['synthetic']) if isinstance(self.ATTRIBUTES['synthetic'],list) else self.ATTRIBUTES['synthetic'] - suffix = self.get.suffix() + suffix = self.CONTEXT #self.get.suffix() model_dir = os.sep.join([self.train_dir,suffix+'-'+str(self.MAX_EPOCHS)]) demo = self._LABEL #np.zeros([self.ROW_COUNT,self.NUM_LABELS]) #args['de"shape":{"LABEL":list(self._LABEL.shape)} mo'] tf.compat.v1.reset_default_graph() @@ -567,11 +620,12 @@ class Predict(GNet): init = tf.compat.v1.global_variables_initializer() saver = tf.compat.v1.train.Saver() df = pd.DataFrame() - CANDIDATE_COUNT = 10 #0 if self.ROW_COUNT < 1000 else 100 + CANDIDATE_COUNT = 5 #0 if self.ROW_COUNT < 1000 else 100 NTH_VALID_CANDIDATE = count = np.random.choice(np.arange(2,60),2)[0] with tf.compat.v1.Session() as sess: # sess.run(init) + saver.restore(sess, model_dir) if self._LABEL is not None : labels = np.zeros((self.ROW_COUNT,self.NUM_LABELS) ) @@ -585,109 +639,110 @@ class Predict(GNet): __ratio=0 for i in np.arange(CANDIDATE_COUNT) : if labels : - f = sess.run(fake,feed_dict={y:labels}) + _matrix = sess.run(fake,feed_dict={y:labels}) else: - f = sess.run(fake) + _matrix = sess.run(fake) # # if we are dealing with numeric values only we can perform a simple marginal sum against the indexes # The code below will insure we have some acceptable cardinal relationships between id and synthetic values # # df = pd.DataFrame(np.round(f)).astype(np.int32) - df = pd.DataFrame(np.round(f),dtype=int) - + found.append (np.round(_matrix).astype(np.int64)) + # df = pd.DataFrame(np.round(_matrix),dtype=int) p = 0 not in df.sum(axis=1).values - x = df.sum(axis=1).values + # x = df.sum(axis=1).values - if np.divide( np.sum(x), x.size) > .9 or p and np.sum(x) == x.size : - ratio.append(np.divide( np.sum(x), x.size)) - found.append(df) + # if np.divide( np.sum(x), x.size) > .9 or p and np.sum(x) == x.size : + # ratio.append(np.divide( np.sum(x), x.size)) + # found.append(df) - # break - if len(found) == CANDIDATE_COUNT: + # # break + # if len(found) == CANDIDATE_COUNT: - break - else: - __x__ = df if __x__ is None or np.where(x > 0)[0].size > np.where(__x__ > 0)[0].size else __x__ - __ratio = np.divide( np.sum(x), x.size) if __x__ is None or np.where(x > 0)[0].size > np.where(__x__ > 0)[0].size else __ratio - continue + # break + # else: + # __x__ = df if __x__ is None or np.where(x > 0)[0].size > np.where(__x__ > 0)[0].size else __x__ + # __ratio = np.divide( np.sum(x), x.size) if __x__ is None or np.where(x > 0)[0].size > np.where(__x__ > 0)[0].size else __ratio + # continue # i = df.T.index.astype(np.int32) #-- These are numeric pseudonyms # df = (i * df).sum(axis=1) # # In case we are dealing with actual values like diagnosis codes we can perform # - N = len(found) - _index = [i for i in range(0,N) if found[i].shape[1] == len(self.values)] - if not _index and not found : - df = __x__ - INDEX = -1 - else : - if not _index : - INDEX = np.random.choice(np.arange(len(found)),1)[0] - INDEX = ratio.index(np.max(ratio)) - else: - INDEX = _index[0] + # N = len(found) + # _index = [i for i in range(0,N) if found[i].shape[1] == len(self.values)] + # if not _index and not found : + # df = __x__ + # INDEX = -1 + # else : + # if not _index : + # INDEX = np.random.choice(np.arange(len(found)),1)[0] + # INDEX = ratio.index(np.max(ratio)) + # else: + # INDEX = _index[0] - df = found[INDEX] - columns = self.ATTRIBUTES['synthetic'] if isinstance(self.ATTRIBUTES['synthetic'],list)else [self.ATTRIBUTES['synthetic']] + # df = found[INDEX] + # columns = self.ATTRIBUTES['synthetic'] if isinstance(self.ATTRIBUTES['synthetic'],list)else [self.ATTRIBUTES['synthetic']] # r = np.zeros((self.ROW_COUNT,len(columns))) # r = np.zeros(self.ROW_COUNT) - if self.logger : - info = {"found":len(found),"rows":df.shape[0],"cols":df.shape[1],"expected":len(self.values)} - if df.shape[1] > len(self.values) : - df = df.iloc[:len(self.values)] - if INDEX > 0 : - info =dict(info ,**{"selected":INDEX, "ratio": ratio[INDEX] }) - else : + # if self.logger : + # info = {"found":len(found),"rows":df.shape[0],"cols":df.shape[1],"expected":len(self.values)} + # if df.shape[1] > len(self.values) : + # df = df.iloc[:len(self.values)] + # if INDEX > 0 : + # info =dict(info ,**{"selected":INDEX, "ratio": ratio[INDEX] }) + # else : - info['selected'] = -1 - info['ratio'] = __ratio - info['partition'] = self.PARTITION - self.logger.write({"module":"gan-generate","action":"generate","input":info}) - # df.columns = self.values - if len(found) or df.columns.size <= len(self.values): - ii = df.apply(lambda row: np.sum(row) == 0 ,axis=1) - missing = [] - if ii.sum() > 0 : - # - # If the generator had a reductive effect we should be able to get random values from either : - # - The space of outliers - # - existing values for smaller spaces that have suffered over training - # - - N = ii.sum() - missing_values = self.MISSING_VALUES if self.MISSING_VALUES else self.values - missing = np.random.choice(missing_values,N) - # missing = [] - # - # @TODO: - # Log the findings here in terms of ratio, missing, candidate count - # print ([np.max(ratio),len(missing),len(found),i]) - i = np.where(ii == 0)[0] + # info['selected'] = -1 + # info['ratio'] = __ratio + # info['partition'] = self.PARTITION + # self.logger.write({"module":"gan-generate","action":"generate","input":info}) + # # df.columns = self.values + # if len(found) or df.columns.size <= len(self.values): + # ii = df.apply(lambda row: np.sum(row) == 0 ,axis=1) + # missing = [] + # if ii.sum() > 0 : + # # + # # If the generator had a reductive effect we should be able to get random values from either : + # # - The space of outliers + # # - existing values for smaller spaces that have suffered over training + # # + + # N = ii.sum() + # missing_values = self.MISSING_VALUES if self.MISSING_VALUES else self.values + # missing = np.random.choice(missing_values,N) + # # missing = [] + # # + # # @TODO: + # # Log the findings here in terms of ratio, missing, candidate count + # # print ([np.max(ratio),len(missing),len(found),i]) + # i = np.where(ii == 0)[0] - df = pd.DataFrame( df.iloc[i].apply(lambda row: self.values[np.random.choice(np.where(row != 0)[0],1)[0]] ,axis=1)) - df.columns = columns - df = df[columns[0]].append(pd.Series(missing)) + # df = pd.DataFrame( df.iloc[i].apply(lambda row: self.values[np.random.choice(np.where(row != 0)[0],1)[0]] ,axis=1)) + # df.columns = columns + # df = df[columns[0]].append(pd.Series(missing)) - if self.logger : + # if self.logger : - info= {"missing": i.size,"rows":df.shape[0],"cols":1,'partition':self.PARTITION} - self.logger.write({"module":"gan-generate","action":"compile.io","input":info}) + # info= {"missing": i.size,"rows":df.shape[0],"cols":1,'partition':self.PARTITION} + # self.logger.write({"module":"gan-generate","action":"compile.io","input":info}) # print(df.head()) tf.compat.v1.reset_default_graph() - df = pd.DataFrame(df) - df.columns = columns - np.random.shuffle(df[columns[0]].values) - return df.to_dict(orient='list') + # df = pd.DataFrame(df) + # df.columns = columns + # np.random.shuffle(df[columns[0]].values) + # return df.to_dict(orient='list') + return _matrix if __name__ == '__main__' : diff --git a/data/maker/__init__.py b/data/maker/__init__.py index 3e2c9aa..086df3f 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -14,6 +14,11 @@ import data.gan as gan from transport import factory from data.bridge import Binary import threading as thread +from data.maker import prepare +import copy +import os +import json + class ContinuousToDiscrete : ROUND_UP = 2 @staticmethod @@ -77,8 +82,62 @@ class ContinuousToDiscrete : +def train (**_args): + """ + :params sql + :params store + """ + # + # Let us prepare the data by calling the utility function + # + if 'file' in _args : + # + # We are reading data from a file + _args['data'] = pd.read_csv(_args['file']) + else: + # + # data will be read from elsewhere (a data-store)... + pass + # if 'ignore' in _args and 'columns' in _args['ignore']: + + _inputhandler = prepare.Input(**_args) + values,_matrix = _inputhandler.convert() + args = {"real":_matrix,"context":_args['context']} + _map = {} + if 'store' in _args : + # + # This + args['store'] = copy.deepcopy(_args['store']['logs']) + args['store']['args']['doc'] = _args['context'] + logger = factory.instance(**args['store']) + args['logger'] = logger + + for key in _inputhandler._map : + beg = _inputhandler._map[key]['beg'] + end = _inputhandler._map[key]['end'] + values = _inputhandler._map[key]['values'].tolist() + _map[key] = {"beg":beg,"end":end,"values":np.array(values).astype(str).tolist()} + info = {"rows":_matrix.shape[0],"cols":_matrix.shape[1],"map":_map} + logger.write({"module":"gan-train","action":"data-prep","context":_args['context'],"input":info}) + + args['logs'] = _args['logs'] if 'logs' in _args else 'logs' + args ['max_epochs'] = _args['max_epochs'] + args['matrix_size'] = _matrix.shape[0] + args['batch_size'] = 2000 + args['partition'] = 0 if 'partition' not in _args else _args['partition'] + os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0' -def train (**args) : + trainer = gan.Train(**args) + # + # @TODO: Write the map.json in the output directory for the logs + # + f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json']),'w') + f.write(json.dumps(_map)) + f.close() + + trainer.apply() + pass +def _train (**args) : """ This function is intended to train the GAN in order to learn about the distribution of the features :column columns that need to be synthesized (discrete) @@ -122,18 +181,53 @@ def train (**args) : # If the s trainer = gan.Train(**args) trainer.apply() -def post(**args): - """ - This uploads the tensorflow checkpoint to a data-store (mongodb, biguqery, s3) - - """ - pass def get(**args): """ This function will restore a checkpoint from a persistant storage on to disk """ pass -def generate(**args): +def generate(**_args): + """ + This function will generate a set of records, before we must load the parameters needed + :param data + :param context + :param logs + """ + f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json'])) + _map = json.loads(f.read()) + f.close() + if 'file' in _args : + df = pd.read_csv(_args['file']) + else: + df = _args['data'] if not isinstance(_args['data'],str) else pd.read_csv(_args['data']) + args = {"context":_args['context'],"max_epochs":_args['max_epochs'],"candidates":_args['candidates']} + args['logs'] = _args['logs'] if 'logs' in _args else 'logs' + args ['max_epochs'] = _args['max_epochs'] + # args['matrix_size'] = _matrix.shape[0] + args['batch_size'] = 2000 + args['partition'] = 0 if 'partition' not in _args else _args['partition'] + args['row_count'] = df.shape[0] + # + # @TODO: perhaps get the space of values here ... (not sure it's a good idea) + # + _args['map'] = _map + _inputhandler = prepare.Input(**_args) + values,_matrix = _inputhandler.convert() + args['values'] = np.array(values) + if 'gpu' in _args : + os.environ['CUDA_VISIBLE_DEVICES'] = str(_args['gpu']) + handler = gan.Predict (**args) + handler.load_meta(None) + # + # Let us now format the matrices as we expect them to be + # + + candidates = handler.apply(candidates=args['candidates']) + return [_inputhandler.revert(matrix=_matrix) for _matrix in candidates] + + + +def _generate(**args): """ This function will generate a synthetic dataset on the basis of a model that has been learnt for the dataset @return pandas.DataFrame diff --git a/data/maker/__main__.py b/data/maker/__main__.py deleted file mode 100644 index d71d400..0000000 --- a/data/maker/__main__.py +++ /dev/null @@ -1,32 +0,0 @@ -import pandas as pd -import data.maker -from data.params import SYS_ARGS -import json -from scipy.stats import wasserstein_distance as wd -import risk -import numpy as np -if 'config' in SYS_ARGS : - ARGS = json.loads(open(SYS_ARGS['config']).read()) - if 'generate' not in SYS_ARGS : - data.maker.train(**ARGS) - else: - # - # - ARGS['no_value'] = '' - _df = data.maker.generate(**ARGS) - odf = pd.read_csv (ARGS['data']) - odf.columns = [name.lower() for name in odf.columns] - column = ARGS['column'] if isinstance(ARGS['column'],list) else [ARGS['column']] - # print (odf.head()) - # print (_df.head()) - print(odf.join(_df[column],rsuffix='_io')) - # print (_df[column].risk.evaluate(flag='synth')) - # print (odf[column].risk.evaluate(flag='original')) - # _x = pd.get_dummies(_df[column]).values - # y = pd.get_dummies(odf[column]).values - # N = _df.shape[0] - # print (np.mean([ wd(_x[i],y[i])for i in range(0,N)])) - # print (wd(_x[0],y[0]) ) - - # column = SYS_ARGS['column'] - # odf = open(SYS_ARGS['data']) \ No newline at end of file diff --git a/pipeline.py b/pipeline.py index 00f558d..4a86d94 100644 --- a/pipeline.py +++ b/pipeline.py @@ -9,7 +9,7 @@ import pandas as pd from google.oauth2 import service_account from google.cloud import bigquery as bq import data.maker - +import copy from data.params import SYS_ARGS # @@ -69,53 +69,45 @@ class Components : This function will perform training on the basis of a given pointer that reads data """ - # - # @TODO: we need to log something here about the parameters being passed - # pointer = args['reader'] if 'reader' in args else lambda: Components.get(**args) - df = args['data'] - - # - # Now we can parse the arguments and submit the entire thing to training - # - - logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) - log_folder = args['logs'] if 'logs' in args else 'logs' - PART_SIZE = int(args['part_size']) - - partition = args['partition'] - log_folder = os.sep.join([log_folder,args['context'],str(partition)]) - _args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} - _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) - if 'batch_size' in args : - _args['batch_size'] = int(args['batch_size']) - - _args['matrix_size'] = args['matrix_size'] if 'matrix_size' in args else 128 # - # We ask the process to assume 1 gpu given the system number of GPU and that these tasks can run in parallel - # - if int(args['num_gpu']) > 1 : - _args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int) + schema = None + if 'file' in args : + + df = pd.read_csv(args['file']) + del args['file'] + elif 'data' not in args : + reader = factory.instance(**args['store']['source']) + if 'row_limit' in args : + df = reader.read(sql=args['sql'],limit=args['row_limit']) + else: + df = reader.read(sql=args['sql']) + schema = reader.meta(table=args['from']) if hasattr(reader,'meta') and 'from' in args else None else: - _args['gpu'] = 0 - _args['num_gpu'] = 1 - os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) - _args['partition'] = int(partition) - _args['continuous']= args['continuous'] if 'continuous' in args else [] - _args['store'] = {'type':'mongo.MongoWriter','args':{'dbname':'aou','doc':args['context']}} - _args['data'] = args['data'] - - # print (['partition ',partition,df.value_source_concept_id.unique()]) - # - # @log : - # Logging information about the training process for this partition (or not) - # - - info = {"rows":df.shape[0],"cols":df.shape[1], "partition":int(partition),"logs":_args['logs']} + df = args['data'] + + + # df = df.fillna('') + if schema : + _schema = {} + for _item in schema : + _type = int + _value = 0 + if _item.field_type == 'FLOAT' : + _type =float + elif _item.field_type != 'INTEGER' : + _type = str + _value = '' + _schema[_item.name] = _type + df[_item.name] = df[_item.name].fillna(_value).astype(_type) + args['schema'] = _schema + # df[_item.name] = df[_item.name].astype(_type) + _args = copy.deepcopy(args) + # _args['store'] = args['store']['source'] + _args['data'] = df - logger.write({"module":"train","action":"train","input":info}) data.maker.train(**_args) if 'autopilot' in ( list(args.keys())) : - print (['autopilot mode enabled ....']) + print (['autopilot mode enabled ....',args['context']]) self.generate(args) pass @@ -129,141 +121,167 @@ class Components : """ This function will generate data and store it to a given, """ - logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) - log_folder = args['logs'] if 'logs' in args else 'logs' - partition = args['partition'] if 'partition' in args else '' - log_folder = os.sep.join([log_folder,args['context'],str(partition)]) - - _args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} - _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) + store = args['store']['logs'] + store['doc'] = args['context'] + logger = factory.instance(**store) #type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) + + ostore = args['store']['target'] + writer = factory.instance(**ostore) + # log_folder = args['logs'] if 'logs' in args else 'logs' + # partition = args['partition'] if 'partition' in args else '' + # log_folder = os.sep.join([log_folder,args['context'],str(partition)]) + + # _args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} + # _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) # _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 - if 'batch_size' in args : - _args['batch_size'] = int(args['batch_size']) - - if int(args['num_gpu']) > 1 : - _args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int) - else: - _args['gpu'] = 0 - _args['num_gpu'] = 1 - os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) - # _args['no_value']= args['no_value'] - _args['matrix_size'] = args['matrix_size'] if 'matrix_size' in args else 128 + # if 'batch_size' in args : + # _args['batch_size'] = int(args['batch_size']) + + # if int(args['num_gpu']) > 1 : + # _args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int) + # else: + # _args['gpu'] = 0 + # _args['num_gpu'] = 1 + # os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) + # # _args['no_value']= args['no_value'] + # _args['matrix_size'] = args['matrix_size'] if 'matrix_size' in args else 128 - # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 - PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 + # # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 + # PART_SIZE = int(args['part_size']) if 'part_size' in args else 8 # credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') # _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna() # reader = args['reader'] # df = reader() - df = args['reader']() if 'reader' in args else args['data'] - - # if 'slice' in args and 'max_rows' in args['slice']: - - # max_rows = args['slice']['max_rows'] - # if df.shape[0] > max_rows : - # print (".. slicing ") - # i = np.random.choice(df.shape[0],max_rows,replace=False) - # df = df.iloc[i] + schema = args['schema'] if 'schema' in args else None + if 'file' in args : + df = pd.read_csv(args['file']) + else: + if 'data' not in args : + reader = factory.instance(**args['store']['source']) + if 'row_limit' in args : + df = reader.read(sql=args['sql'],limit=args['row_limit']) + else: + df = reader.read(sql=args['sql']) + if 'schema' not in args and hasattr(reader,'meta'): + schema = reader.meta(table=args['from']) - - # bounds = Components.split(df,MAX_ROWS,PART_SIZE) - # if partition != '' : - # columns = args['columns'] - # df = np.array_split(df[columns].values,PART_SIZE) - # df = pd.DataFrame(df[ int (partition) ],columns = columns) - # max_rows = int(args['partition_max_rows']) if 'partition_max_rows' in args else 1000000 - # N = np.divide(df.shape[0],max_rows).astype(int) + 1 - info = {"name":args['columns'],"parition":int(partition),"gpu":_args["gpu"],"rows":int(df.shape[0]),"cols":int(df.shape[1]),"space":df[args['columns'][0]].unique().size, "part_size":int(PART_SIZE)} - logger.write({"module":"generate","action":"partition","input":info}) - _args['partition'] = int(partition) - _args['continuous']= args['continuous'] if 'continuous' in args else [] - # - # How many rows sub-partition must we divide this into ? - # let us fix the data types here every _id field will be an np.int64... - # - - schema = args['schema'] - for item in schema : - if item.field_type == 'INTEGER' and df[item.name].dtype != np.int64: - df[item.name] = np.array(df[item.name].values,dtype=np.int64) - elif item.field_type == 'STRING' and df[item.name].dtype != object : - df[item.name] = np.array(df[item.name],dtype=object) - - - - # for name in df.columns.tolist(): - - # if name.endswith('_id') : - # if df[name].isnull().sum() > 0 and name not in ['unique_device_id']: - # df[name].fillna(np.nan_to_num(np.nan),inplace=True) - # df[name] = df[name].astype(int) - + + else: + # + # This will account for autopilot mode ... + df = args['data'] + + _info = {"module":"gan-prep","action":"read","shape":{"rows":df.shape[0],"columns":df.shape[0]}} + _dc = pd.DataFrame() # for mdf in df : - _args['data'] = df - - _dc = _dc.append(data.maker.generate(**_args)) + args['data'] = df + args['candidates'] = 1 if 'candidates' not in args else int(args['candidates']) + + candidates = (data.maker.generate(**args)) + if 'sql.BQWriter' in ostore['type'] : + #table = ".".join([ostore['['dataset'],args['context']]) + # writer = factory.instance(**ostore) + _columns = None + skip_columns = [] + _schema = [{"name":field.name,"type":field.field_type,"description":field.description} for field in schema] if schema else [] + for _df in candidates : + # + # we need to format the fields here to make sure we have something cohesive + # + + if not skip_columns : + # _columns = set(df.columns) - set(_df.columns) + if 'ignore' in args and 'columns' in args['ignore'] : + + for name in args['ignore']['columns'] : + for _name in _df.columns: + if _name in name: + skip_columns.append(_name) + # + # We perform a series of set operations to insure that the following conditions are met: + # - the synthetic dataset only has fields that need to be synthesized + # - The original dataset has all the fields except those that need to be synthesized + # + + _df = _df[list(set(_df.columns) - set(skip_columns))] + + if set(df.columns) & set(_df.columns) : + _columns = set(df.columns) - set(_df.columns) + df = df[_columns] + + # + # Let us merge the dataset here and and have a comprehensive dataset + + _df = pd.DataFrame.join(df,_df) + + writer.write(_df,schema=_schema,table=args['from']) + # writer.write(df,table=table) + pass + else: + pass + - # - # We need to post the generate the data in order to : - # 1. compare immediately - # 2. synthetic copy - # + # # + # # We need to post the generate the data in order to : + # # 1. compare immediately + # # 2. synthetic copy + # # - cols = _dc.columns.tolist() + # cols = _dc.columns.tolist() - data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query) - # - # performing basic analytics on the synthetic data generated (easy to quickly asses) - # - info = {"module":"generate","action":"io.metrics","input":{"rows":data_comp.shape[0],"partition":partition,"logs":[]}} + # data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query) + # # + # # performing basic analytics on the synthetic data generated (easy to quickly asses) + # # + # info = {"module":"generate","action":"io.metrics","input":{"rows":data_comp.shape[0],"partition":partition,"logs":[]}} - # - # @TODO: Send data over to a process for analytics + # # + # # @TODO: Send data over to a process for analytics - base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it) - cols = _dc.columns.tolist() - for name in cols : - _args['data'][name] = _dc[name] + # base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it) + # cols = _dc.columns.tolist() + # for name in cols : + # _args['data'][name] = _dc[name] - # - #-- Let us store all of this into bigquery - prefix = args['notify']+'.'+_args['context'] - partition = str(partition) - table = '_'.join([prefix,partition,'io']).replace('__','_') - folder = os.sep.join([args['logs'],args['context'],partition,'output']) - if 'file' in args : + # # + # #-- Let us store all of this into bigquery + # prefix = args['notify']+'.'+_args['context'] + # partition = str(partition) + # table = '_'.join([prefix,partition,'io']).replace('__','_') + # folder = os.sep.join([args['logs'],args['context'],partition,'output']) + # if 'file' in args : - _fname = os.sep.join([folder,table.replace('_io','_full_io.csv')]) - _pname = os.sep.join([folder,table])+'.csv' - data_comp.to_csv( _pname,index=False) - _args['data'].to_csv(_fname,index=False) + # _fname = os.sep.join([folder,table.replace('_io','_full_io.csv')]) + # _pname = os.sep.join([folder,table])+'.csv' + # data_comp.to_csv( _pname,index=False) + # _args['data'].to_csv(_fname,index=False) - _id = 'path' - else: + # _id = 'path' + # else: - credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') - _pname = os.sep.join([folder,table+'.csv']) - _fname = table.replace('_io','_full_io') - partial = '.'.join(['io',args['context']+'_partial_io']) - complete= '.'.join(['io',args['context']+'_full_io']) - data_comp.to_csv(_pname,index=False) - if 'dump' in args : - print (_args['data'].head()) - else: - Components.lock.acquire() - data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) - _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000) - Components.lock.release() - _id = 'dataset' - info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} } - if partition : - info ['partition'] = int(partition) - logger.write({"module":"generate","action":"write","input":info} ) + # credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') + # _pname = os.sep.join([folder,table+'.csv']) + # _fname = table.replace('_io','_full_io') + # partial = '.'.join(['io',args['context']+'_partial_io']) + # complete= '.'.join(['io',args['context']+'_full_io']) + # data_comp.to_csv(_pname,index=False) + # if 'dump' in args : + # print (_args['data'].head()) + # else: + # Components.lock.acquire() + # data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) + # _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000) + # Components.lock.release() + # _id = 'dataset' + # info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} } + # if partition : + # info ['partition'] = int(partition) + # logger.write({"module":"generate","action":"write","input":info} ) @@ -308,98 +326,95 @@ if __name__ == '__main__' : # Log what was initiated so we have context of this processing ... # # if 'listen' not in SYS_ARGS : - if 'file' in args : - DATA = pd.read_csv(args['file']) ; - schema = [] - else: - DATA = Components().get(args) - client = bq.Client.from_service_account_json(args["private_key"]) - schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema + # if 'file' in args : + # DATA = pd.read_csv(args['file']) ; + # schema = [] + # else: + # DATA = Components().get(args) + # client = bq.Client.from_service_account_json(args["private_key"]) + # schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema - COLUMNS = DATA.columns - DATA = np.array_split(DATA,PART_SIZE) - args['schema'] = schema + # COLUMNS = DATA.columns + # DATA = np.array_split(DATA,PART_SIZE) + # args['schema'] = schema if 'generate' in SYS_ARGS : # # Let us see if we have partitions given the log folder - content = os.listdir( os.sep.join([args['logs'],args['context']])) + content = os.listdir( os.sep.join([args['logs'],'train',args['context']])) generator = Components() - if ''.join(content).isnumeric() : - # - # we have partitions we are working with + # if ''.join(content).isnumeric() : + # # + # # we have partitions we are working with - jobs = [] + # jobs = [] - # columns = DATA.columns.tolist() + # # columns = DATA.columns.tolist() - # DATA = np.array_split(DATA,PART_SIZE) + # # DATA = np.array_split(DATA,PART_SIZE) - for index in range(0,PART_SIZE) : - if 'focus' in args and int(args['focus']) != index : - # - # This handles failures/recoveries for whatever reason - # If we are only interested in generating data for a given partition - continue - # index = id.index(id) + # for index in range(0,PART_SIZE) : + # if 'focus' in args and int(args['focus']) != index : + # # + # # This handles failures/recoveries for whatever reason + # # If we are only interested in generating data for a given partition + # continue + # # index = id.index(id) - args['partition'] = index - args['data'] = DATA[index] - if int(args['num_gpu']) > 1 : - args['gpu'] = index - else: - args['gpu']=0 + # args['partition'] = index + # args['data'] = DATA[index] + # if int(args['num_gpu']) > 1 : + # args['gpu'] = index + # else: + # args['gpu']=0 - make = lambda _args: (Components()).generate(_args) - job = Process(target=make,args=(args,)) - job.name = 'generator # '+str(index) - job.start() - jobs.append(job) - # if len(jobs) == 1 : - # job.join() + # make = lambda _args: (Components()).generate(_args) + # job = Process(target=make,args=(args,)) + # job.name = 'generator # '+str(index) + # job.start() + # jobs.append(job) + # # if len(jobs) == 1 : + # # job.join() - print (["Started ",len(jobs),"generators" if len(jobs)>1 else "generator" ]) - while len(jobs)> 0 : - jobs = [job for job in jobs if job.is_alive()] - time.sleep(2) + # print (["Started ",len(jobs),"generators" if len(jobs)>1 else "generator" ]) + # while len(jobs)> 0 : + # jobs = [job for job in jobs if job.is_alive()] + # time.sleep(2) - # generator.generate(args) - else: - generator.generate(args) + # # generator.generate(args) + # else: + # generator.generate(args) # Components.generate(args) - elif 'shuffle' in SYS_ARGS: - - - for data in DATA : - args['data'] = data - _df = (Components()).shuffle(args) + generator.generate(args) + else: # DATA = np.array_split(DATA,PART_SIZE) - - jobs = [] - for index in range(0,PART_SIZE) : - if 'focus' in args and int(args['focus']) != index : - continue - args['part_size'] = PART_SIZE - args['partition'] = index - args['data'] = DATA[index] - if int(args['num_gpu']) > 1 : - args['gpu'] = index - else: - args['gpu']=0 + agent = Components() + agent.train(**args) + # jobs = [] + # for index in range(0,PART_SIZE) : + # if 'focus' in args and int(args['focus']) != index : + # continue + # args['part_size'] = PART_SIZE + # args['partition'] = index + # args['data'] = DATA[index] + # if int(args['num_gpu']) > 1 : + # args['gpu'] = index + # else: + # args['gpu']=0 - make = lambda _args: (Components()).train(**_args) - job = Process(target=make,args=( dict(args),)) - job.name = 'Trainer # ' + str(index) - job.start() - jobs.append(job) - # args['gpu'] - print (["Started ",len(jobs),"trainers" if len(jobs)>1 else "trainer" ]) - while len(jobs)> 0 : - jobs = [job for job in jobs if job.is_alive()] - time.sleep(2) + # make = lambda _args: (Components()).train(**_args) + # job = Process(target=make,args=( dict(args),)) + # job.name = 'Trainer # ' + str(index) + # job.start() + # jobs.append(job) + # # args['gpu'] + # print (["Started ",len(jobs),"trainers" if len(jobs)>1 else "trainer" ]) + # while len(jobs)> 0 : + # jobs = [job for job in jobs if job.is_alive()] + # time.sleep(2) # trainer = Components() # trainer.train(**args)