From 71097103da4d3b4618ee83e3ec50d5a96ccbc8ef Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Wed, 29 Apr 2020 01:27:25 -0500 Subject: [PATCH] fix: handling outliers and missing values --- data/bridge.py | 15 ++++-- data/gan.py | 61 +++++++-------------- data/maker/__init__.py | 81 ++++++---------------------- data/params.py | 4 +- finalize.py | 120 ++++++++++++++++++++++++++++++----------- pipeline.py | 47 +++------------- setup.py | 4 +- 7 files changed, 149 insertions(+), 183 deletions(-) diff --git a/data/bridge.py b/data/bridge.py index 902c6d3..3116a4b 100644 --- a/data/bridge.py +++ b/data/bridge.py @@ -197,12 +197,21 @@ class Binary : """ This function will return the columns that are available for processing ... """ - values = column.dropna().value_counts().index + values = column.dropna().value_counts().index.values + if size > 0 and column.size > size: values = values[:size] - values.sort_values() + values.sort() return values - + def get_missing(self,column,size=-1): + values = column.dropna().value_counts().index.values + if size > 0 and column.size > size : + values = values[size:] + else: + values = np.array([]) + values.sort() + return values.tolist(); + def _get_column_values(self,column,size=-1): values = column.dropna().unique() values.sort() diff --git a/data/gan.py b/data/gan.py index 8a0c7a7..1418a04 100644 --- a/data/gan.py +++ b/data/gan.py @@ -536,9 +536,10 @@ class Predict(GNet): self.values = args['values'] self.ROW_COUNT = args['row_count'] self.oROW_COUNT = self.ROW_COUNT - self.MISSING_VALUES = np.nan_to_num(np.nan) - if 'no_value' in args and args['no_value'] not in ['na','','NA'] : - self.MISSING_VALUES = args['no_value'] + # self.MISSING_VALUES = np.nan_to_num(np.nan) + # if 'no_value' in args and args['no_value'] not in ['na','','NA'] : + # self.MISSING_VALUES = args['no_value'] + self.MISSING_VALUES = args['missing'] # self.MISSING_VALUES = args['no_value'] # self.MISSING_VALUES = int(args['no_value']) if args['no_value'].isnumeric() else np.na if args['no_value'] in ['na','NA','N/A'] else args['no_value'] @@ -650,15 +651,18 @@ class Predict(GNet): # df.columns = self.values if len(found) or df.columns.size <= len(self.values): ii = df.apply(lambda row: np.sum(row) == 0 ,axis=1) - # print ([' **** ',ii.sum()]) - - if ii.shape[0] > 0 : + missing = [] + if ii.sum() > 0 : # - #@TODO Have this be a configurable variable - - missing = np.repeat(self.MISSING_VALUES, np.where(ii==1)[0].size) - else: - missing = [] + # If the generator had a reductive effect we should be able to get random values from either : + # - The space of outliers + # - existing values for smaller spaces that have suffered over training + # + + N = ii.sum() + missing_values = self.MISSING_VALUES if self.MISSING_VALUES else self.values + missing = np.random.choice(missing_values,N) + # missing = [] # # @TODO: # Log the findings here in terms of ratio, missing, candidate count @@ -669,6 +673,8 @@ class Predict(GNet): df = pd.DataFrame( df.iloc[i].apply(lambda row: self.values[np.random.choice(np.where(row != 0)[0],1)[0]] ,axis=1)) df.columns = columns df = df[columns[0]].append(pd.Series(missing)) + + if self.logger : info= {"missing": i.size,"rows":df.shape[0],"cols":1,'partition':self.PARTITION} @@ -680,40 +686,9 @@ class Predict(GNet): tf.compat.v1.reset_default_graph() df = pd.DataFrame(df) df.columns = columns + np.random.shuffle(df[columns[0]].values) return df.to_dict(orient='list') - # return df.to_dict(orient='list') - # count = str(len(os.listdir(self.out_dir))) - # _name = os.sep.join([self.out_dir,self.CONTEXT+'-'+count+'.csv']) - # df.to_csv(_name,index=False) - - # output.extend(np.round(f)) - - # for m in range(2): - # for n in range(2, self.NUM_LABELS): - # idx1 = (demo[:, m] == 1) - # idx2 = (demo[:, n] == 1) - # idx = [idx1[j] and idx2[j] for j in range(len(idx1))] - # num = np.sum(idx) - # print ("___________________list__") - # print (idx1) - # print (idx2) - # print (idx) - # print (num) - # print ("_____________________") - # nbatch = int(np.ceil(num / self.BATCHSIZE_PER_GPU)) - # label_input = np.zeros((nbatch*self.BATCHSIZE_PER_GPU, self.NUM_LABELS)) - # label_input[:, n] = 1 - # label_input[:, m] = 1 - # output = [] - # for i in range(nbatch): - # f = sess.run(fake,feed_dict={y: label_input[i* self.BATCHSIZE_PER_GPU:(i+1)* self.BATCHSIZE_PER_GPU]}) - # output.extend(np.round(f)) - # output = np.array(output)[:num] - # print ([m,n,output]) - - # np.save(self.out_dir + str(m) + str(n), output) - if __name__ == '__main__' : # diff --git a/data/maker/__init__.py b/data/maker/__init__.py index 26cc4de..3e2c9aa 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -21,29 +21,8 @@ class ContinuousToDiscrete : """ This function will convert a continous stream of information into a variety a bit stream of bins """ - # BOUNDS = np.repeat(np.divide(X.max(),n),n).cumsum().tolist() - # print ( X.values.astype(np.float32)) - # print ("___________________________") values = np.array(X).astype(np.float32) BOUNDS = ContinuousToDiscrete.bounds(values,n) - # _map = [{"index":BOUNDS.index(i),"ubound":i} for i in BOUNDS] - # _matrix = [] - # m = [] - # for value in X : - # x_ = np.zeros(n) - - # for row in BOUNDS : - - # if value>= row.left and value <= row.right : - # index = BOUNDS.index(row) - # x_[index] = 1 - # break - # _matrix += x_.tolist() - # # - # # for items in BOUNDS : - # # index = BOUNDS.index(items) - - # return np.array(_matrix).reshape(len(X),n) matrix = np.repeat(np.zeros(n),len(X)).reshape(len(X),n) @@ -123,25 +102,9 @@ def train (**args) : # @TODO : Consider performing this task on several threads/GPUs simulataneously # for col in column : - # args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values - # if 'float' not in df[col].dtypes.name : - # args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values - # if col in CONTINUOUS: - # BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size']) - # args['real'] = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32) - # # args['real'] = args['real'].reshape(df.shape[0],BIN_SIZE) - - # else: - # df.to_csv('tmp-'+args['logs'].replace('/','_')+'-'+col+'.csv',index=False) - # print (df[col].dtypes) - # print (df[col].dropna/(axis=1).unique()) - # args['real'] = pd.get_dummies(df[col].dropna()).astype(np.float32).values msize = args['matrix_size'] if 'matrix_size' in args else -1 args['real'] = (Binary()).apply(df[col],msize) - - - context = args['context'] if 'store' in args : args['store']['args']['doc'] = context @@ -191,61 +154,49 @@ def generate(**args): # If the identifier is not present, we should fine a way to determine or make one # BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size']) - NO_VALUE = dict(args['no_value']) if type(args['no_value']) == dict else args['no_value'] + # NO_VALUE = dict(args['no_value']) if type(args['no_value']) == dict else args['no_value'] bhandler = Binary() _df = df.copy() for col in column : args['context'] = col args['column'] = col - # if 'float' in df[col].dtypes.name or col in CONTINUOUS : - # # - # # We should create the bins for the values we are observing here - # BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size']) - # values = ContinuousToDiscrete.continuous(df[col].values,BIN_SIZE) - # # values = np.unique(values).tolist() - # else: - # if col in CONTINUOUS : - # values = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32).T - - # else: - # values = df[col].dropna().unique().tolist() msize = args['matrix_size'] if 'matrix_size' in args else -1 values = bhandler.get_column(df[col],msize) - + MISSING= bhandler.get_missing(df[col],msize) args['values'] = values args['row_count'] = df.shape[0] - if col in NO_VALUE : - args['no_value'] = NO_VALUE[col] - else: - args['no_value'] = NO_VALUE - + # if col in NO_VALUE : + # args['no_value'] = NO_VALUE[col] + # else: + # args['no_value'] = NO_VALUE + # novalue = NO_VALUE[col] if NO_VALUE[col] in ['na',''] else NO_VALUE[col] + # MISSING += [NO_VALUE[col]] + args['missing'] = MISSING # # we can determine the cardinalities here so we know what to allow or disallow handler = gan.Predict (**args) handler.load_meta(col) r = handler.apply() if col in CONTINUOUS : - r[col] = np.array(r[col]) - MISSING= np.nan if args['no_value'] in ['na','','NA'] else args['no_value'] + r[col] = np.array(r[col]) + _approx = ContinuousToDiscrete.continuous(r[col],BIN_SIZE) #-- approximating based on arbitrary bins + r[col] = _approx + - if np.isnan(MISSING): - i = np.isnan(r[col]) - i = np.where (i == False)[0] - else: - i = np.where( r[col] != None)[0] - _approx = ContinuousToDiscrete.continuous(r[col][i],BIN_SIZE) #-- approximating based on arbitrary bins - r[col][i] = _approx _df[col] = r[col] # # Let's cast the type to the original type (it makes the data more usable) # + # print (values) + # print ([col,df[col].dtype,_df[col].tolist()]) otype = df[col].dtype _df[col] = _df[col].astype(otype) + # # @TODO: log basic stats about the synthetic attribute # diff --git a/data/params.py b/data/params.py index c667063..f2c3536 100644 --- a/data/params.py +++ b/data/params.py @@ -9,8 +9,10 @@ if len(sys.argv) > 1: if sys.argv[i].startswith('--'): key = sys.argv[i][2:] #.replace('-','') SYS_ARGS[key] = 1 - if i + 1 < N: + if i + 1 < N and not sys.argv[i + 1].startswith('--'): value = sys.argv[i + 1] = sys.argv[i+1].strip() + else: + value = None if key and value: SYS_ARGS[key] = value diff --git a/finalize.py b/finalize.py index 079830d..d420d7d 100644 --- a/finalize.py +++ b/finalize.py @@ -6,10 +6,13 @@ This file will perform basic tasks to finalize the GAN process by performing the """ import pandas as pd import numpy as np +from multiprocessing import Process, Lock from google.oauth2 import service_account from google.cloud import bigquery as bq +import transport from data.params import SYS_ARGS import json + class Analytics : """ This class will compile basic analytics about a given dataset i.e compare original/synthetic @@ -33,15 +36,23 @@ class Analytics : """ This function will measure the distance between """ - df = args['data'] - names = [name for name in df_counts.columns.tolist() if name.endswith('_io') == False] + pass class Utils : + @staticmethod + def log(**args): + logger = transport.factory.instance(type="mongo.MongoWriter",args={"dbname":"aou","doc":"logs"}) + logger.write(args) + logger.close() class get : @staticmethod - def config(**args) : - contexts = args['contexts'].split(',') if type(args['contexts']) == str else args['contexts'] - pipeline = args['pipeline'] - return [ item for item in pipeline if item['context'] in contexts] + def pipeline(table,path) : + # contexts = args['contexts'].split(',') if type(args['contexts']) == str else args['contexts'] + config = json.loads((open(path)).read()) + pipeline = config['pipeline'] + # return [ item for item in pipeline if item['context'] in contexts] + pipeline = [item for item in pipeline if 'from' in item and item['from'].strip() == table] + Utils.log(module=table,action='init',input={"pipeline":pipeline}) + return pipeline @staticmethod def sql(**args) : """ @@ -54,7 +65,8 @@ class Utils : SQL = ["SELECT * FROM :from "] SQL_FILTER = [] NO_FILTERS_FOUND = True - pipeline = Utils.get.config(**args) + # pipeline = Utils.get.config(**args) + pipeline = args['pipeline'] REVERSE_QUALIFIER = {'IN':'NOT IN','NOT IN':'IN','=':'<>','<>':'='} for item in pipeline : @@ -73,7 +85,7 @@ class Utils : # # let's pull the field schemas out of the table definition # - + Utils.log(module=args['from'],action='sql',input={"sql":" ".join(SQL) }) return " ".join(SQL).replace(":from",src) @@ -91,26 +103,36 @@ def mk(**args) : return client.create_dataset(dataset) return found[0] -def move (**args): +def move (args): """ This function will move a table from the synthetic dataset into a designated location This is the simplest case for finalizing a synthetic data set :private_key """ - private_key = args['private_key'] - client = bq.Client.from_service_account_json(private_key) - config = Utils.get.config(**args) + pipeline = Utils.get.pipeline(args['from'],args['config']) + _args = json.loads((open(args['config'])).read()) + _args['pipeline'] = pipeline + # del _args['pipeline'] + args = dict(args,**_args) + # del args['pipeline'] + # private_key = args['private_key'] + client = bq.Client.from_service_account_json(args['private_key']) + dataset = args['dataset'] - if 'contexts' in args : - SQL = [ ''.join(["SELECT * FROM io.",item['context'],'_full_io']) for item in config] + if pipeline : + SQL = [ ''.join(["SELECT * FROM io.",item['context'],'_full_io']) for item in pipeline] SQL += [Utils.get.sql(**args)] SQL = ('\n UNION ALL \n'.join(SQL).replace(':dataset','io')) else: # # moving a table to a designated location tablename = args['from'] - SQL = "SELECT * FROM :dataset.:table".replace(":dataset",dataset).replace(":table",tablename) - + if 'sql' not in args : + SQL = "SELECT * FROM :dataset.:table" + else: + SQL = args['sql'] + SQL = SQL.replace(":dataset",dataset).replace(":table",tablename) + Utils.log(module=args['from'],action='sql',input={'sql':SQL}) # # At this point we have gathered all the tables in the io folder and we should now see if we need to merge with the remainder from the original table # @@ -132,7 +154,7 @@ def move (**args): SQL = SQL.replace("*"," , ".join(fields)) # print (SQL) out = client.query(SQL,location='US',job_config=config) - print () + Utils.log(module=args['from'],action='move',input={'job':out.job_id}) return (out.job_id) @@ -158,23 +180,59 @@ if __name__ == '__main__' : Usage : finalize -- --contexts --from """ + if 'move' in SYS_ARGS : - # table = SYS_ARGS['from'] - # args = dict(config,**{"private_key":"../curation-prod.json"}) - args = dict(args,**SYS_ARGS) - contexts = [item['context'] for item in config['pipeline'] if item['from'] == SYS_ARGS['from']] - log = [] - if contexts : - args['contexts'] = contexts - log = move(**args) + + if 'init' in SYS_ARGS : + dep = config['dep'] if 'dep' in config else {} + info = [] + if 'queries' in dep : + info += dep['queries'] + print ('________') + if 'tables' in dep : + info += dep['tables'] + args = {} + jobs = [] + for item in info : + args = {} + if type(item) == str : + args['from'] = item + name = item + else: + args = item + name = item['from'] + args['config'] = SYS_ARGS['config'] + # args['pipeline'] = [] + job = Process(target=move,args=(args,)) + job.name = name + jobs.append(job) + job.start() + + + # while len(jobs) > 0 : + # jobs = [job for job in jobs if job.is_alive()] + # time.sleep(1) + + else: - tables = args['from'].split(',') - for name in tables : - name = name.strip() - args['from'] = name - log += [move(**args)] - print ("\n".join(log)) + move(SYS_ARGS) + # # table = SYS_ARGS['from'] + # # args = dict(config,**{"private_key":"../curation-prod.json"}) + # args = dict(args,**SYS_ARGS) + # contexts = [item['context'] for item in config['pipeline'] if item['from'] == SYS_ARGS['from']] + # log = [] + # if contexts : + # args['contexts'] = contexts + # log = move(**args) + + # else: + # tables = args['from'].split(',') + # for name in tables : + # name = name.strip() + # args['from'] = name + # log += [move(**args)] + # print ("\n".join(log)) diff --git a/pipeline.py b/pipeline.py index 5ef3013..00f558d 100644 --- a/pipeline.py +++ b/pipeline.py @@ -14,7 +14,6 @@ from data.params import SYS_ARGS # # The configuration array is now loaded and we will execute the pipe line as follows -DATASET='combined20191004v2_deid' class Components : lock = Lock() @@ -120,37 +119,7 @@ class Components : self.generate(args) pass - def shuffle(self,args): - """ - """ - df = args['reader']() if 'reader' in args else args['data'] - - - col = args['columns'][0] - distrib = df[col].value_counts() - values = np.array(distrib.index) - counts = np.array(distrib.values) - np.random.shuffle(values) - np.random.shuffle(counts) - N = len (values) - theta = np.random.sample() - pad = 0 - # print (values) - iovalues = np.zeros(df.shape[0],dtype=df[col].dtype) - for i in range(N) : - # n = int(counts[i] - counts[i]*theta) - n = counts[i] - print ([counts[i],theta,n]) - index = np.where(iovalues == 0)[0] - if index.size > 0 and index.size > n: - index = index[:n] - iovalues[index] = values[i] - - - np.random.shuffle(iovalues) - df[col] = iovalues - - return df + def post(self,args): pass @@ -177,7 +146,7 @@ class Components : _args['gpu'] = 0 _args['num_gpu'] = 1 os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) - _args['no_value']= args['no_value'] + # _args['no_value']= args['no_value'] _args['matrix_size'] = args['matrix_size'] if 'matrix_size' in args else 128 @@ -207,7 +176,7 @@ class Components : # df = pd.DataFrame(df[ int (partition) ],columns = columns) # max_rows = int(args['partition_max_rows']) if 'partition_max_rows' in args else 1000000 # N = np.divide(df.shape[0],max_rows).astype(int) + 1 - info = {"parition":int(partition),"gpu":_args["gpu"],"rows":int(df.shape[0]),"cols":int(df.shape[1]),"space":df[args['columns'][0]].unique().size, "part_size":int(PART_SIZE)} + info = {"name":args['columns'],"parition":int(partition),"gpu":_args["gpu"],"rows":int(df.shape[0]),"cols":int(df.shape[1]),"space":df[args['columns'][0]].unique().size, "part_size":int(PART_SIZE)} logger.write({"module":"generate","action":"partition","input":info}) _args['partition'] = int(partition) _args['continuous']= args['continuous'] if 'continuous' in args else [] @@ -400,11 +369,11 @@ if __name__ == '__main__' : generator.generate(args) # Components.generate(args) elif 'shuffle' in SYS_ARGS: - args['data'] = DATA[0] - _df = (Components()).shuffle(args) - print (DATA[0][args['columns']]) - print () - print (_df[args['columns']]) + + + for data in DATA : + args['data'] = data + _df = (Components()).shuffle(args) else: # DATA = np.array_split(DATA,PART_SIZE) diff --git a/setup.py b/setup.py index 0370cdc..40e8d11 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ import sys def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() -args = {"name":"data-maker","version":"1.3.1","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT", +args = {"name":"data-maker","version":"1.3.2","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT", "packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]} args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo'] args['url'] = 'https://hiplab.mc.vanderbilt.edu/git/aou/data-maker.git' @@ -14,3 +14,5 @@ if sys.version_info[0] == 2 : args['use_2to3_exclude_fixers'] = ['lib2to3.fixes.fix_import'] args['scripts']=['pipeline.py','finalize.py'] setup(**args) + +