fix: handling outliers and missing values

dev
Steve L. Nyemba 5 years ago
parent f920ba0eda
commit 71097103da

@ -197,12 +197,21 @@ class Binary :
""" """
This function will return the columns that are available for processing ... This function will return the columns that are available for processing ...
""" """
values = column.dropna().value_counts().index values = column.dropna().value_counts().index.values
if size > 0 and column.size > size: if size > 0 and column.size > size:
values = values[:size] values = values[:size]
values.sort_values() values.sort()
return values return values
def get_missing(self,column,size=-1):
values = column.dropna().value_counts().index.values
if size > 0 and column.size > size :
values = values[size:]
else:
values = np.array([])
values.sort()
return values.tolist();
def _get_column_values(self,column,size=-1): def _get_column_values(self,column,size=-1):
values = column.dropna().unique() values = column.dropna().unique()
values.sort() values.sort()

@ -536,9 +536,10 @@ class Predict(GNet):
self.values = args['values'] self.values = args['values']
self.ROW_COUNT = args['row_count'] self.ROW_COUNT = args['row_count']
self.oROW_COUNT = self.ROW_COUNT self.oROW_COUNT = self.ROW_COUNT
self.MISSING_VALUES = np.nan_to_num(np.nan) # self.MISSING_VALUES = np.nan_to_num(np.nan)
if 'no_value' in args and args['no_value'] not in ['na','','NA'] : # if 'no_value' in args and args['no_value'] not in ['na','','NA'] :
self.MISSING_VALUES = args['no_value'] # self.MISSING_VALUES = args['no_value']
self.MISSING_VALUES = args['missing']
# self.MISSING_VALUES = args['no_value'] # self.MISSING_VALUES = args['no_value']
# self.MISSING_VALUES = int(args['no_value']) if args['no_value'].isnumeric() else np.na if args['no_value'] in ['na','NA','N/A'] else args['no_value'] # self.MISSING_VALUES = int(args['no_value']) if args['no_value'].isnumeric() else np.na if args['no_value'] in ['na','NA','N/A'] else args['no_value']
@ -650,15 +651,18 @@ class Predict(GNet):
# df.columns = self.values # df.columns = self.values
if len(found) or df.columns.size <= len(self.values): if len(found) or df.columns.size <= len(self.values):
ii = df.apply(lambda row: np.sum(row) == 0 ,axis=1) ii = df.apply(lambda row: np.sum(row) == 0 ,axis=1)
# print ([' **** ',ii.sum()]) missing = []
if ii.sum() > 0 :
if ii.shape[0] > 0 :
# #
#@TODO Have this be a configurable variable # If the generator had a reductive effect we should be able to get random values from either :
# - The space of outliers
missing = np.repeat(self.MISSING_VALUES, np.where(ii==1)[0].size) # - existing values for smaller spaces that have suffered over training
else: #
missing = []
N = ii.sum()
missing_values = self.MISSING_VALUES if self.MISSING_VALUES else self.values
missing = np.random.choice(missing_values,N)
# missing = []
# #
# @TODO: # @TODO:
# Log the findings here in terms of ratio, missing, candidate count # Log the findings here in terms of ratio, missing, candidate count
@ -669,6 +673,8 @@ class Predict(GNet):
df = pd.DataFrame( df.iloc[i].apply(lambda row: self.values[np.random.choice(np.where(row != 0)[0],1)[0]] ,axis=1)) df = pd.DataFrame( df.iloc[i].apply(lambda row: self.values[np.random.choice(np.where(row != 0)[0],1)[0]] ,axis=1))
df.columns = columns df.columns = columns
df = df[columns[0]].append(pd.Series(missing)) df = df[columns[0]].append(pd.Series(missing))
if self.logger : if self.logger :
info= {"missing": i.size,"rows":df.shape[0],"cols":1,'partition':self.PARTITION} info= {"missing": i.size,"rows":df.shape[0],"cols":1,'partition':self.PARTITION}
@ -680,40 +686,9 @@ class Predict(GNet):
tf.compat.v1.reset_default_graph() tf.compat.v1.reset_default_graph()
df = pd.DataFrame(df) df = pd.DataFrame(df)
df.columns = columns df.columns = columns
np.random.shuffle(df[columns[0]].values)
return df.to_dict(orient='list') return df.to_dict(orient='list')
# return df.to_dict(orient='list')
# count = str(len(os.listdir(self.out_dir)))
# _name = os.sep.join([self.out_dir,self.CONTEXT+'-'+count+'.csv'])
# df.to_csv(_name,index=False)
# output.extend(np.round(f))
# for m in range(2):
# for n in range(2, self.NUM_LABELS):
# idx1 = (demo[:, m] == 1)
# idx2 = (demo[:, n] == 1)
# idx = [idx1[j] and idx2[j] for j in range(len(idx1))]
# num = np.sum(idx)
# print ("___________________list__")
# print (idx1)
# print (idx2)
# print (idx)
# print (num)
# print ("_____________________")
# nbatch = int(np.ceil(num / self.BATCHSIZE_PER_GPU))
# label_input = np.zeros((nbatch*self.BATCHSIZE_PER_GPU, self.NUM_LABELS))
# label_input[:, n] = 1
# label_input[:, m] = 1
# output = []
# for i in range(nbatch):
# f = sess.run(fake,feed_dict={y: label_input[i* self.BATCHSIZE_PER_GPU:(i+1)* self.BATCHSIZE_PER_GPU]})
# output.extend(np.round(f))
# output = np.array(output)[:num]
# print ([m,n,output])
# np.save(self.out_dir + str(m) + str(n), output)
if __name__ == '__main__' : if __name__ == '__main__' :
# #

@ -21,29 +21,8 @@ class ContinuousToDiscrete :
""" """
This function will convert a continous stream of information into a variety a bit stream of bins This function will convert a continous stream of information into a variety a bit stream of bins
""" """
# BOUNDS = np.repeat(np.divide(X.max(),n),n).cumsum().tolist()
# print ( X.values.astype(np.float32))
# print ("___________________________")
values = np.array(X).astype(np.float32) values = np.array(X).astype(np.float32)
BOUNDS = ContinuousToDiscrete.bounds(values,n) BOUNDS = ContinuousToDiscrete.bounds(values,n)
# _map = [{"index":BOUNDS.index(i),"ubound":i} for i in BOUNDS]
# _matrix = []
# m = []
# for value in X :
# x_ = np.zeros(n)
# for row in BOUNDS :
# if value>= row.left and value <= row.right :
# index = BOUNDS.index(row)
# x_[index] = 1
# break
# _matrix += x_.tolist()
# #
# # for items in BOUNDS :
# # index = BOUNDS.index(items)
# return np.array(_matrix).reshape(len(X),n)
matrix = np.repeat(np.zeros(n),len(X)).reshape(len(X),n) matrix = np.repeat(np.zeros(n),len(X)).reshape(len(X),n)
@ -123,25 +102,9 @@ def train (**args) :
# @TODO : Consider performing this task on several threads/GPUs simulataneously # @TODO : Consider performing this task on several threads/GPUs simulataneously
# #
for col in column : for col in column :
# args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values
# if 'float' not in df[col].dtypes.name :
# args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values
# if col in CONTINUOUS:
# BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
# args['real'] = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32)
# # args['real'] = args['real'].reshape(df.shape[0],BIN_SIZE)
# else:
# df.to_csv('tmp-'+args['logs'].replace('/','_')+'-'+col+'.csv',index=False)
# print (df[col].dtypes)
# print (df[col].dropna/(axis=1).unique())
# args['real'] = pd.get_dummies(df[col].dropna()).astype(np.float32).values
msize = args['matrix_size'] if 'matrix_size' in args else -1 msize = args['matrix_size'] if 'matrix_size' in args else -1
args['real'] = (Binary()).apply(df[col],msize) args['real'] = (Binary()).apply(df[col],msize)
context = args['context'] context = args['context']
if 'store' in args : if 'store' in args :
args['store']['args']['doc'] = context args['store']['args']['doc'] = context
@ -191,61 +154,49 @@ def generate(**args):
# If the identifier is not present, we should fine a way to determine or make one # If the identifier is not present, we should fine a way to determine or make one
# #
BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size']) BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
NO_VALUE = dict(args['no_value']) if type(args['no_value']) == dict else args['no_value'] # NO_VALUE = dict(args['no_value']) if type(args['no_value']) == dict else args['no_value']
bhandler = Binary() bhandler = Binary()
_df = df.copy() _df = df.copy()
for col in column : for col in column :
args['context'] = col args['context'] = col
args['column'] = col args['column'] = col
# if 'float' in df[col].dtypes.name or col in CONTINUOUS :
# #
# # We should create the bins for the values we are observing here
# BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
# values = ContinuousToDiscrete.continuous(df[col].values,BIN_SIZE)
# # values = np.unique(values).tolist()
# else:
# if col in CONTINUOUS :
# values = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32).T
# else:
# values = df[col].dropna().unique().tolist()
msize = args['matrix_size'] if 'matrix_size' in args else -1 msize = args['matrix_size'] if 'matrix_size' in args else -1
values = bhandler.get_column(df[col],msize) values = bhandler.get_column(df[col],msize)
MISSING= bhandler.get_missing(df[col],msize)
args['values'] = values args['values'] = values
args['row_count'] = df.shape[0] args['row_count'] = df.shape[0]
if col in NO_VALUE : # if col in NO_VALUE :
args['no_value'] = NO_VALUE[col] # args['no_value'] = NO_VALUE[col]
else: # else:
args['no_value'] = NO_VALUE # args['no_value'] = NO_VALUE
# novalue = NO_VALUE[col] if NO_VALUE[col] in ['na',''] else NO_VALUE[col]
# MISSING += [NO_VALUE[col]]
args['missing'] = MISSING
# #
# we can determine the cardinalities here so we know what to allow or disallow # we can determine the cardinalities here so we know what to allow or disallow
handler = gan.Predict (**args) handler = gan.Predict (**args)
handler.load_meta(col) handler.load_meta(col)
r = handler.apply() r = handler.apply()
if col in CONTINUOUS : if col in CONTINUOUS :
r[col] = np.array(r[col]) r[col] = np.array(r[col])
MISSING= np.nan if args['no_value'] in ['na','','NA'] else args['no_value'] _approx = ContinuousToDiscrete.continuous(r[col],BIN_SIZE) #-- approximating based on arbitrary bins
r[col] = _approx
if np.isnan(MISSING):
i = np.isnan(r[col])
i = np.where (i == False)[0]
else:
i = np.where( r[col] != None)[0]
_approx = ContinuousToDiscrete.continuous(r[col][i],BIN_SIZE) #-- approximating based on arbitrary bins
r[col][i] = _approx
_df[col] = r[col] _df[col] = r[col]
# #
# Let's cast the type to the original type (it makes the data more usable) # Let's cast the type to the original type (it makes the data more usable)
# #
# print (values)
# print ([col,df[col].dtype,_df[col].tolist()])
otype = df[col].dtype otype = df[col].dtype
_df[col] = _df[col].astype(otype) _df[col] = _df[col].astype(otype)
# #
# @TODO: log basic stats about the synthetic attribute # @TODO: log basic stats about the synthetic attribute
# #

@ -9,8 +9,10 @@ if len(sys.argv) > 1:
if sys.argv[i].startswith('--'): if sys.argv[i].startswith('--'):
key = sys.argv[i][2:] #.replace('-','') key = sys.argv[i][2:] #.replace('-','')
SYS_ARGS[key] = 1 SYS_ARGS[key] = 1
if i + 1 < N: if i + 1 < N and not sys.argv[i + 1].startswith('--'):
value = sys.argv[i + 1] = sys.argv[i+1].strip() value = sys.argv[i + 1] = sys.argv[i+1].strip()
else:
value = None
if key and value: if key and value:
SYS_ARGS[key] = value SYS_ARGS[key] = value

@ -6,10 +6,13 @@ This file will perform basic tasks to finalize the GAN process by performing the
""" """
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from multiprocessing import Process, Lock
from google.oauth2 import service_account from google.oauth2 import service_account
from google.cloud import bigquery as bq from google.cloud import bigquery as bq
import transport
from data.params import SYS_ARGS from data.params import SYS_ARGS
import json import json
class Analytics : class Analytics :
""" """
This class will compile basic analytics about a given dataset i.e compare original/synthetic This class will compile basic analytics about a given dataset i.e compare original/synthetic
@ -33,15 +36,23 @@ class Analytics :
""" """
This function will measure the distance between This function will measure the distance between
""" """
df = args['data'] pass
names = [name for name in df_counts.columns.tolist() if name.endswith('_io') == False]
class Utils : class Utils :
@staticmethod
def log(**args):
logger = transport.factory.instance(type="mongo.MongoWriter",args={"dbname":"aou","doc":"logs"})
logger.write(args)
logger.close()
class get : class get :
@staticmethod @staticmethod
def config(**args) : def pipeline(table,path) :
contexts = args['contexts'].split(',') if type(args['contexts']) == str else args['contexts'] # contexts = args['contexts'].split(',') if type(args['contexts']) == str else args['contexts']
pipeline = args['pipeline'] config = json.loads((open(path)).read())
return [ item for item in pipeline if item['context'] in contexts] pipeline = config['pipeline']
# return [ item for item in pipeline if item['context'] in contexts]
pipeline = [item for item in pipeline if 'from' in item and item['from'].strip() == table]
Utils.log(module=table,action='init',input={"pipeline":pipeline})
return pipeline
@staticmethod @staticmethod
def sql(**args) : def sql(**args) :
""" """
@ -54,7 +65,8 @@ class Utils :
SQL = ["SELECT * FROM :from "] SQL = ["SELECT * FROM :from "]
SQL_FILTER = [] SQL_FILTER = []
NO_FILTERS_FOUND = True NO_FILTERS_FOUND = True
pipeline = Utils.get.config(**args) # pipeline = Utils.get.config(**args)
pipeline = args['pipeline']
REVERSE_QUALIFIER = {'IN':'NOT IN','NOT IN':'IN','=':'<>','<>':'='} REVERSE_QUALIFIER = {'IN':'NOT IN','NOT IN':'IN','=':'<>','<>':'='}
for item in pipeline : for item in pipeline :
@ -73,7 +85,7 @@ class Utils :
# #
# let's pull the field schemas out of the table definition # let's pull the field schemas out of the table definition
# #
Utils.log(module=args['from'],action='sql',input={"sql":" ".join(SQL) })
return " ".join(SQL).replace(":from",src) return " ".join(SQL).replace(":from",src)
@ -91,26 +103,36 @@ def mk(**args) :
return client.create_dataset(dataset) return client.create_dataset(dataset)
return found[0] return found[0]
def move (**args): def move (args):
""" """
This function will move a table from the synthetic dataset into a designated location This function will move a table from the synthetic dataset into a designated location
This is the simplest case for finalizing a synthetic data set This is the simplest case for finalizing a synthetic data set
:private_key :private_key
""" """
private_key = args['private_key'] pipeline = Utils.get.pipeline(args['from'],args['config'])
client = bq.Client.from_service_account_json(private_key) _args = json.loads((open(args['config'])).read())
config = Utils.get.config(**args) _args['pipeline'] = pipeline
# del _args['pipeline']
args = dict(args,**_args)
# del args['pipeline']
# private_key = args['private_key']
client = bq.Client.from_service_account_json(args['private_key'])
dataset = args['dataset'] dataset = args['dataset']
if 'contexts' in args : if pipeline :
SQL = [ ''.join(["SELECT * FROM io.",item['context'],'_full_io']) for item in config] SQL = [ ''.join(["SELECT * FROM io.",item['context'],'_full_io']) for item in pipeline]
SQL += [Utils.get.sql(**args)] SQL += [Utils.get.sql(**args)]
SQL = ('\n UNION ALL \n'.join(SQL).replace(':dataset','io')) SQL = ('\n UNION ALL \n'.join(SQL).replace(':dataset','io'))
else: else:
# #
# moving a table to a designated location # moving a table to a designated location
tablename = args['from'] tablename = args['from']
SQL = "SELECT * FROM :dataset.:table".replace(":dataset",dataset).replace(":table",tablename) if 'sql' not in args :
SQL = "SELECT * FROM :dataset.:table"
else:
SQL = args['sql']
SQL = SQL.replace(":dataset",dataset).replace(":table",tablename)
Utils.log(module=args['from'],action='sql',input={'sql':SQL})
# #
# At this point we have gathered all the tables in the io folder and we should now see if we need to merge with the remainder from the original table # At this point we have gathered all the tables in the io folder and we should now see if we need to merge with the remainder from the original table
# #
@ -132,7 +154,7 @@ def move (**args):
SQL = SQL.replace("*"," , ".join(fields)) SQL = SQL.replace("*"," , ".join(fields))
# print (SQL) # print (SQL)
out = client.query(SQL,location='US',job_config=config) out = client.query(SQL,location='US',job_config=config)
print () Utils.log(module=args['from'],action='move',input={'job':out.job_id})
return (out.job_id) return (out.job_id)
@ -158,23 +180,59 @@ if __name__ == '__main__' :
Usage : Usage :
finalize --<move|stats> --contexts <c1,c2,...c3> --from <table> finalize --<move|stats> --contexts <c1,c2,...c3> --from <table>
""" """
if 'move' in SYS_ARGS : if 'move' in SYS_ARGS :
# table = SYS_ARGS['from']
# args = dict(config,**{"private_key":"../curation-prod.json"}) if 'init' in SYS_ARGS :
args = dict(args,**SYS_ARGS) dep = config['dep'] if 'dep' in config else {}
contexts = [item['context'] for item in config['pipeline'] if item['from'] == SYS_ARGS['from']] info = []
log = []
if contexts :
args['contexts'] = contexts
log = move(**args)
if 'queries' in dep :
info += dep['queries']
print ('________')
if 'tables' in dep :
info += dep['tables']
args = {}
jobs = []
for item in info :
args = {}
if type(item) == str :
args['from'] = item
name = item
else:
args = item
name = item['from']
args['config'] = SYS_ARGS['config']
# args['pipeline'] = []
job = Process(target=move,args=(args,))
job.name = name
jobs.append(job)
job.start()
# while len(jobs) > 0 :
# jobs = [job for job in jobs if job.is_alive()]
# time.sleep(1)
else: else:
tables = args['from'].split(',') move(SYS_ARGS)
for name in tables : # # table = SYS_ARGS['from']
name = name.strip() # # args = dict(config,**{"private_key":"../curation-prod.json"})
args['from'] = name # args = dict(args,**SYS_ARGS)
log += [move(**args)] # contexts = [item['context'] for item in config['pipeline'] if item['from'] == SYS_ARGS['from']]
print ("\n".join(log)) # log = []
# if contexts :
# args['contexts'] = contexts
# log = move(**args)
# else:
# tables = args['from'].split(',')
# for name in tables :
# name = name.strip()
# args['from'] = name
# log += [move(**args)]
# print ("\n".join(log))

@ -14,7 +14,6 @@ from data.params import SYS_ARGS
# #
# The configuration array is now loaded and we will execute the pipe line as follows # The configuration array is now loaded and we will execute the pipe line as follows
DATASET='combined20191004v2_deid'
class Components : class Components :
lock = Lock() lock = Lock()
@ -120,37 +119,7 @@ class Components :
self.generate(args) self.generate(args)
pass pass
def shuffle(self,args):
"""
"""
df = args['reader']() if 'reader' in args else args['data']
col = args['columns'][0]
distrib = df[col].value_counts()
values = np.array(distrib.index)
counts = np.array(distrib.values)
np.random.shuffle(values)
np.random.shuffle(counts)
N = len (values)
theta = np.random.sample()
pad = 0
# print (values)
iovalues = np.zeros(df.shape[0],dtype=df[col].dtype)
for i in range(N) :
# n = int(counts[i] - counts[i]*theta)
n = counts[i]
print ([counts[i],theta,n])
index = np.where(iovalues == 0)[0]
if index.size > 0 and index.size > n:
index = index[:n]
iovalues[index] = values[i]
np.random.shuffle(iovalues)
df[col] = iovalues
return df
def post(self,args): def post(self,args):
pass pass
@ -177,7 +146,7 @@ class Components :
_args['gpu'] = 0 _args['gpu'] = 0
_args['num_gpu'] = 1 _args['num_gpu'] = 1
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu'])
_args['no_value']= args['no_value'] # _args['no_value']= args['no_value']
_args['matrix_size'] = args['matrix_size'] if 'matrix_size' in args else 128 _args['matrix_size'] = args['matrix_size'] if 'matrix_size' in args else 128
@ -207,7 +176,7 @@ class Components :
# df = pd.DataFrame(df[ int (partition) ],columns = columns) # df = pd.DataFrame(df[ int (partition) ],columns = columns)
# max_rows = int(args['partition_max_rows']) if 'partition_max_rows' in args else 1000000 # max_rows = int(args['partition_max_rows']) if 'partition_max_rows' in args else 1000000
# N = np.divide(df.shape[0],max_rows).astype(int) + 1 # N = np.divide(df.shape[0],max_rows).astype(int) + 1
info = {"parition":int(partition),"gpu":_args["gpu"],"rows":int(df.shape[0]),"cols":int(df.shape[1]),"space":df[args['columns'][0]].unique().size, "part_size":int(PART_SIZE)} info = {"name":args['columns'],"parition":int(partition),"gpu":_args["gpu"],"rows":int(df.shape[0]),"cols":int(df.shape[1]),"space":df[args['columns'][0]].unique().size, "part_size":int(PART_SIZE)}
logger.write({"module":"generate","action":"partition","input":info}) logger.write({"module":"generate","action":"partition","input":info})
_args['partition'] = int(partition) _args['partition'] = int(partition)
_args['continuous']= args['continuous'] if 'continuous' in args else [] _args['continuous']= args['continuous'] if 'continuous' in args else []
@ -400,11 +369,11 @@ if __name__ == '__main__' :
generator.generate(args) generator.generate(args)
# Components.generate(args) # Components.generate(args)
elif 'shuffle' in SYS_ARGS: elif 'shuffle' in SYS_ARGS:
args['data'] = DATA[0]
_df = (Components()).shuffle(args)
print (DATA[0][args['columns']]) for data in DATA :
print () args['data'] = data
print (_df[args['columns']]) _df = (Components()).shuffle(args)
else: else:
# DATA = np.array_split(DATA,PART_SIZE) # DATA = np.array_split(DATA,PART_SIZE)

@ -4,7 +4,7 @@ import sys
def read(fname): def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read() return open(os.path.join(os.path.dirname(__file__), fname)).read()
args = {"name":"data-maker","version":"1.3.1","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT", args = {"name":"data-maker","version":"1.3.2","author":"Vanderbilt University Medical Center","author_email":"steve.l.nyemba@vanderbilt.edu","license":"MIT",
"packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]} "packages":find_packages(),"keywords":["healthcare","data","transport","protocol"]}
args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo'] args["install_requires"] = ['data-transport@git+https://dev.the-phi.com/git/steve/data-transport.git','tensorflow==1.15','pandas','pandas-gbq','pymongo']
args['url'] = 'https://hiplab.mc.vanderbilt.edu/git/aou/data-maker.git' args['url'] = 'https://hiplab.mc.vanderbilt.edu/git/aou/data-maker.git'
@ -14,3 +14,5 @@ if sys.version_info[0] == 2 :
args['use_2to3_exclude_fixers'] = ['lib2to3.fixes.fix_import'] args['use_2to3_exclude_fixers'] = ['lib2to3.fixes.fix_import']
args['scripts']=['pipeline.py','finalize.py'] args['scripts']=['pipeline.py','finalize.py']
setup(**args) setup(**args)

Loading…
Cancel
Save