From e81e50c94f8fdf051cbf76d9479cc68a40b1ef5d Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Sat, 14 Mar 2020 11:12:13 -0500 Subject: [PATCH] Bug fix with the number of candidates generated --- data/gan.py | 10 +- data/maker/__init__.py | 1 + drive/pipeline.py | 303 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 311 insertions(+), 3 deletions(-) create mode 100644 drive/pipeline.py diff --git a/data/gan.py b/data/gan.py index 4f34634..28d5ea3 100644 --- a/data/gan.py +++ b/data/gan.py @@ -424,6 +424,7 @@ class Train (GNet): dataset = tf.data.Dataset.from_tensor_slices(features_placeholder) # labels_placeholder = None dataset = dataset.repeat(10000) + print ([' ******* ',self.BATCHSIZE_PER_GPU]) dataset = dataset.batch(batch_size=self.BATCHSIZE_PER_GPU) dataset = dataset.prefetch(1) # iterator = dataset.make_initializable_iterator() @@ -560,7 +561,7 @@ class Predict(GNet): init = tf.compat.v1.global_variables_initializer() saver = tf.compat.v1.train.Saver() df = pd.DataFrame() - CANDIDATE_COUNT = 1000 + CANDIDATE_COUNT = 10 #0 if self.ROW_COUNT < 1000 else 100 NTH_VALID_CANDIDATE = count = np.random.choice(np.arange(2,60),2)[0] with tf.compat.v1.Session() as sess: @@ -594,13 +595,16 @@ class Predict(GNet): if np.divide( np.sum(x), x.size) > .9 or p and np.sum(x) == x.size : ratio.append(np.divide( np.sum(x), x.size)) found.append(df) - if i == CANDIDATE_COUNT: + + # break + if len(found) == CANDIDATE_COUNT: + break else: __x__ = df if __x__ is None or np.where(x > 0)[0].size > np.where(__x__ > 0)[0].size else __x__ __ratio = np.divide( np.sum(x), x.size) if __x__ is None or np.where(x > 0)[0].size > np.where(__x__ > 0)[0].size else __ratio continue - + # i = df.T.index.astype(np.int32) #-- These are numeric pseudonyms # df = (i * df).sum(axis=1) # diff --git a/data/maker/__init__.py b/data/maker/__init__.py index 5b4cb7e..3a016cf 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -208,4 +208,5 @@ def generate(**args): # # print (r)s # break + return _df \ No newline at end of file diff --git a/drive/pipeline.py b/drive/pipeline.py new file mode 100644 index 0000000..04658da --- /dev/null +++ b/drive/pipeline.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 +import json +from transport import factory +import numpy as np +import os +from multiprocessing import Process +import pandas as pd +from google.oauth2 import service_account +import data.maker + +from data.params import SYS_ARGS + +# +# The configuration array is now loaded and we will execute the pipe line as follows +DATASET='combined20190510' + +class Components : + + @staticmethod + def get(args): + """ + This function returns a data-frame provided a bigquery sql statement with conditions (and limits for testing purposes) + The function must be wrapped around a lambda this makes testing easier and changing data stores transparent to the rest of the code. (Vital when testing) + :sql basic sql statement + :condition optional condition and filters + """ + SQL = args['sql'] + if 'condition' in args : + condition = ' '.join([args['condition']['field'],args['condition']['qualifier'],'(',args['condition']['value'],')']) + SQL = " ".join([SQL,'WHERE',condition]) + + SQL = SQL.replace(':dataset',args['dataset']) #+ " LIMIT 1000 " + if 'limit' in args : + SQL = SQL + 'LIMIT ' + args['limit'] + credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') + df = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna() + return df + + # return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna() + @staticmethod + def split(X,MAX_ROWS=3,PART_SIZE=3): + + return list(pd.cut( np.arange(X.shape[0]+1),PART_SIZE).categories) + + def train(self,**args): + """ + This function will perform training on the basis of a given pointer that reads data + + """ + # + # @TODO: we need to log something here about the parameters being passed + pointer = args['reader'] if 'reader' in args else lambda: Components.get(**args) + df = pointer() + + # + # Now we can parse the arguments and submit the entire thing to training + # + + logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) + log_folder = args['logs'] if 'logs' in args else 'logs' + _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} + _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) + _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 + + MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 + PART_SIZE = args['part_size'] if 'part_size' in args else 0 + + if df.shape[0] > MAX_ROWS and 'partition' not in args: + lbound = 0 + bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories) + # bounds = Components.split(df,MAX_ROWS,PART_SIZE) + + qwriter = factory.instance(type='queue.QueueWriter',args={'queue':'aou.io'}) + + for b in bounds : + part_index = bounds.index(b) + ubound = int(b.right) + + + _data = df.iloc[lbound:ubound][args['columns']] + lbound = ubound + + # _args['logs'] = os.sep.join([log_folder,str(part_index)]) + _args['partition'] = str(part_index) + _args['logger'] = {'args':{'dbname':'aou','doc':args['context']},'type':'mongo.MongoWriter'} + # + # We should post the the partitions to a queue server (at least the instructions on ): + # - where to get the data + # - and athe arguments to use (partition #,columns,gpu,epochs) + # + info = {"rows":_data.shape[0],"cols":_data.shape[1], "paritition":part_index,"logs":_args['logs']} + p = {"args":_args,"data":_data.to_dict(orient="records"),"info":info} + qwriter.write(p) + # + # @TODO: + # - Notify that information was just posted to the queue + info['max_rows'] = MAX_ROWS + info['part_size'] = PART_SIZE + logger.write({"module":"train","action":"setup-partition","input":info}) + + pass + else: + partition = args['partition'] if 'partition' in args else '' + log_folder = os.sep.join([log_folder,args['context'],partition]) + _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} + _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) + _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 + os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0' + + _args['data'] = df + # + # @log : + # Logging information about the training process for this partition (or not) + # + info = {"rows":df.shape[0],"cols":df.shape[1], "partition":partition,"logs":_args['logs']} + logger.write({"module":"train","action":"train","input":info}) + data.maker.train(**_args) + + pass + + # @staticmethod + def generate(self,args): + """ + This function will generate data and store it to a given, + """ + logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) + log_folder = args['logs'] if 'logs' in args else 'logs' + partition = args['partition'] if 'partition' in args else '' + log_folder = os.sep.join([log_folder,args['context'],partition]) + _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger} + _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs']) + _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 + os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0' + _args['no_value']= args['no_value'] + MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0 + PART_SIZE = args['part_size'] if 'part_size' in args else 0 + + # credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') + # _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna() + reader = args['reader'] + df = reader() + if 'partition' in args : + bounds = Components.split(df,MAX_ROWS,PART_SIZE) + # bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories) + lbound = int(bounds[int(partition)].left) + ubound = int(bounds[int(partition)].right) + df = df.iloc[lbound:ubound] + _args['data'] = df + # _args['data'] = reader() + #_args['data'] = _args['data'].astype(object) + _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1 + _dc = data.maker.generate(**_args) + # + # We need to post the generate the data in order to : + # 1. compare immediately + # 2. synthetic copy + # + + cols = _dc.columns.tolist() + + data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query) + base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it) + + for name in cols : + _args['data'][name] = _dc[name] + info = {"module":"generate","action":"io","input":{"rows":_dc[name].shape[0],"name":name}} + if partition != '' : + info['partition'] = partition + logger.write(info) + # filename = os.sep.join([log_folder,'output',name+'.csv']) + # data_comp[[name]].to_csv(filename,index=False) + + # + #-- Let us store all of this into bigquery + prefix = args['notify']+'.'+_args['context'] + table = '_'.join([prefix,partition,'io']).replace('__','_') + folder = os.sep.join([args['logs'],args['context'],partition,'output']) + if 'file' in args : + + _fname = os.sep.join([folder,table.replace('_io','_full_io.csv')]) + _pname = os.sep.join([folder,table])+'.csv' + data_comp.to_csv( _pname,index=False) + _args['data'].to_csv(_fname,index=False) + + + else: + credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json') + _pname = os.sep.join([folder,table+'.csv']) + _fname = table.replace('_io','_full_io') + data_comp.to_gbq(if_exists='replace',destination_table=_pname,credentials='credentials',chunk_size=50000) + data_comp.to_csv(_pname,index=False) + INSERT_FLAG = 'replace' if 'partition' not in args else 'append' + _args['data'].to_gbq(if_exists=INSERT_FLAG,destination_table=_fname,credentials='credentials',chunk_size=50000) + + info = {"full":{"path":_fname,"rows":_args['data'].shape[0]},"compare":{"name":_pname,"rows":data_comp.shape[0]} } + if partition : + info ['partition'] = partition + logger.write({"module":"generate","action":"write","info":info} ) + @staticmethod + def callback(channel,method,header,stream): + + info = json.loads(stream) + logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':SYS_ARGS['context']}) + + logger.write({'module':'process','action':'read-partition','input':info['info']}) + df = pd.DataFrame(info['data']) + args = info['args'] + if int(args['num_gpu']) > 1 and args['gpu'] > 0: + args['gpu'] = args['gpu'] + args['num_gpu'] + args['reader'] = lambda: df + # + # @TODO: Fix + # There is an inconsistency in column/columns ... fix this shit! + # + args['columns'] = args['column'] + (Components()).train(**args) + logger.write({"module":"process","action":"exit","info":info["info"]}) + channel.close() + channel.connection.close() + pass + +if __name__ == '__main__' : + filename = SYS_ARGS['config'] if 'config' in SYS_ARGS else 'config.json' + f = open (filename) + PIPELINE = json.loads(f.read()) + f.close() + index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else 0 + + args = (PIPELINE[index]) + args['dataset'] = 'combined20190510' + args = dict(args,**SYS_ARGS) + args['max_rows'] = int(args['max_rows']) if 'max_rows' in args else 3 + args['part_size']= int(args['part_size']) if 'part_size' in args else 3 + + # + # @TODO: + # Log what was initiated so we have context of this processing ... + # + if 'listen' not in SYS_ARGS : + if 'file' in args : + reader = lambda: pd.read_csv(args['file']) ; + else: + reader = lambda: Components().get(args) + args['reader'] = reader + + if 'generate' in SYS_ARGS : + # + # Let us see if we have partitions given the log folder + + content = os.listdir( os.sep.join([args['logs'],args['context']])) + generator = Components() + if ''.join(content).isnumeric() : + # + # we have partitions we are working with + + for id in ''.join(content) : + args['partition'] = id + + generator.generate(args) + else: + generator.generate(args) + # Components.generate(args) + elif 'listen' in args : + # + # This will start a worker just in case to listen to a queue + if 'read' in SYS_ARGS : + QUEUE_TYPE = 'queue.QueueReader' + pointer = lambda qreader: qreader.read(1) + else: + QUEUE_TYPE = 'queue.QueueListener' + pointer = lambda qlistener: qlistener.listen() + N = int(SYS_ARGS['jobs']) if 'jobs' in SYS_ARGS else 1 + + qhandlers = [factory.instance(type=QUEUE_TYPE,args={'queue':'aou.io'}) for i in np.arange(N)] + jobs = [] + for qhandler in qhandlers : + qhandler.callback = Components.callback + job = Process(target=pointer,args=(qhandler,)) + job.start() + jobs.append(job) + # + # let us wait for the jobs + print (["Started ",len(jobs)," trainers"]) + while len(jobs) > 0 : + + jobs = [job for job in jobs if job.is_alive()] + + # pointer(qhandler) + + + # qreader.read(1) + pass + else: + + trainer = Components() + trainer.train(**args) + # Components.train(**args) +#for args in PIPELINE : + #args['dataset'] = 'combined20190510' + #process = Process(target=Components.train,args=(args,)) + #process.name = args['context'] + #process.start() +# Components.train(args)