You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
data-maker/data/maker/__init__.py

255 lines
9.3 KiB
Python

"""
(c) 2019 Data Maker, hiplab.mc.vanderbilt.edu
version 1.0.0
This package serves as a proxy to the overall usage of the framework.
This package is designed to generate synthetic data from a dataset from an original dataset using deep learning techniques
@TODO:
- Make configurable GPU, EPOCHS
"""
import pandas as pd
import numpy as np
import data.gan as gan
from transport import factory
from data.bridge import Binary
import threading as thread
class ContinuousToDiscrete :
ROUND_UP = 2
@staticmethod
def binary(X,n=4) :
"""
This function will convert a continous stream of information into a variety a bit stream of bins
"""
# BOUNDS = np.repeat(np.divide(X.max(),n),n).cumsum().tolist()
# print ( X.values.astype(np.float32))
# print ("___________________________")
values = np.array(X).astype(np.float32)
BOUNDS = ContinuousToDiscrete.bounds(values,n)
# _map = [{"index":BOUNDS.index(i),"ubound":i} for i in BOUNDS]
# _matrix = []
# m = []
# for value in X :
# x_ = np.zeros(n)
# for row in BOUNDS :
# if value>= row.left and value <= row.right :
# index = BOUNDS.index(row)
# x_[index] = 1
# break
# _matrix += x_.tolist()
# #
# # for items in BOUNDS :
# # index = BOUNDS.index(items)
# return np.array(_matrix).reshape(len(X),n)
matrix = np.repeat(np.zeros(n),len(X)).reshape(len(X),n)
@staticmethod
def bounds(x,n):
# return np.array_split(x,n)
values = np.round(x,ContinuousToDiscrete.ROUND_UP)
return list(pd.cut(values,n).categories)
@staticmethod
def continuous(X,BIN_SIZE=4) :
"""
This function will approximate a binary vector given boundary information
:X binary matrix
:BIN_SIZE
"""
BOUNDS = ContinuousToDiscrete.bounds(X,BIN_SIZE)
values = []
# _BINARY= ContinuousToDiscrete.binary(X,BIN_SIZE)
# # # print (BOUNDS)
l = {}
for i in np.arange(len(X)): #value in X :
value = X[i]
for item in BOUNDS :
if value >= item.left and value <= item.right :
values += [np.round(np.random.uniform(item.left,item.right),ContinuousToDiscrete.ROUND_UP)]
break
# values += [ np.round(np.random.uniform(item.left,item.right),ContinuousToDiscrete.ROUND_UP) for item in BOUNDS if value >= item.left and value <= item.right ]
# # values = []
# for row in _BINARY :
# # ubound = BOUNDS[row.index(1)]
# index = np.where(row == 1)[0][0]
# ubound = BOUNDS[ index ].right
# lbound = BOUNDS[ index ].left
# x_ = np.round(np.random.uniform(lbound,ubound),ContinuousToDiscrete.ROUND_UP).astype(float)
# values.append(x_)
# lbound = ubound
# values = [np.random.uniform() for item in BOUNDS]
return values
def train (**args) :
"""
This function is intended to train the GAN in order to learn about the distribution of the features
:column columns that need to be synthesized (discrete)
:logs where the output of the (location on disk)
:id identifier of the dataset
:data data-frame to be synthesized
:context label of what we are synthesizing
"""
column = args['column'] if (isinstance(args['column'],list)) else [args['column']]
# CONTINUOUS = args['continuous'] if 'continuous' in args else []
# column_id = args['id']
df = args['data'] if not isinstance(args['data'],str) else pd.read_csv(args['data'])
df.columns = [name.lower() for name in df.columns]
#
# @TODO:
# Consider sequential training of sub population for extremely large datasets
#
#
# If we have several columns we will proceed one at a time (it could be done in separate threads)
# @TODO : Consider performing this task on several threads/GPUs simulataneously
#
for col in column :
# args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values
# if 'float' not in df[col].dtypes.name :
# args['real'] = pd.get_dummies(df[col].fillna('')).astype(np.float32).values
# if col in CONTINUOUS:
# BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
# args['real'] = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32)
# # args['real'] = args['real'].reshape(df.shape[0],BIN_SIZE)
# else:
# df.to_csv('tmp-'+args['logs'].replace('/','_')+'-'+col+'.csv',index=False)
# print (df[col].dtypes)
# print (df[col].dropna/(axis=1).unique())
# args['real'] = pd.get_dummies(df[col].dropna()).astype(np.float32).values
msize = args['matrix_size'] if 'matrix_size' in args else -1
args['real'] = (Binary()).apply(df[col],msize)
context = args['context']
if 'store' in args :
args['store']['args']['doc'] = context
logger = factory.instance(**args['store'])
args['logger'] = logger
info = {"rows":args['real'].shape[0],"cols":args['real'].shape[1],"name":col,"partition":args['partition']}
logger.write({"module":"gan-train","action":"data-prep","input":info})
else:
logger = None
args['column'] = col
args['context'] = col
#
# If the s
trainer = gan.Train(**args)
trainer.apply()
def post(**args):
"""
This uploads the tensorflow checkpoint to a data-store (mongodb, biguqery, s3)
"""
pass
def get(**args):
"""
This function will restore a checkpoint from a persistant storage on to disk
"""
pass
def generate(**args):
"""
This function will generate a synthetic dataset on the basis of a model that has been learnt for the dataset
@return pandas.DataFrame
:data data-frame to be synthesized
:column columns that need to be synthesized (discrete)
:id column identifying an entity
:logs location on disk where the learnt knowledge of the dataset is
"""
# df = args['data']
df = args['data'] if not isinstance(args['data'],str) else pd.read_csv(args['data'])
CONTINUOUS = args['continuous'] if 'continuous' in args else []
column = args['column'] if (isinstance(args['column'],list)) else [args['column']]
# column_id = args['id']
#
#@TODO:
# If the identifier is not present, we should fine a way to determine or make one
#
BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
NO_VALUE = dict(args['no_value']) if type(args['no_value']) == dict else args['no_value']
bhandler = Binary()
_df = df.copy()
for col in column :
args['context'] = col
args['column'] = col
# if 'float' in df[col].dtypes.name or col in CONTINUOUS :
# #
# # We should create the bins for the values we are observing here
# BIN_SIZE = 4 if 'bin_size' not in args else int(args['bin_size'])
# values = ContinuousToDiscrete.continuous(df[col].values,BIN_SIZE)
# # values = np.unique(values).tolist()
# else:
# if col in CONTINUOUS :
# values = ContinuousToDiscrete.binary(df[col],BIN_SIZE).astype(np.float32).T
# else:
# values = df[col].dropna().unique().tolist()
msize = args['matrix_size'] if 'matrix_size' in args else -1
values = bhandler.get_column_values(df[col])
args['values'] = values
args['row_count'] = df.shape[0]
if col in NO_VALUE :
args['no_value'] = NO_VALUE[col]
else:
args['no_value'] = NO_VALUE
#
# we can determine the cardinalities here so we know what to allow or disallow
handler = gan.Predict (**args)
handler.load_meta(col)
r = handler.apply()
if col in CONTINUOUS :
r[col] = np.array(r[col])
MISSING= np.nan if args['no_value'] in ['na','','NA'] else args['no_value']
if np.isnan(MISSING):
i = np.isnan(r[col])
i = np.where (i == False)[0]
else:
i = np.where( r[col] != None)[0]
_approx = ContinuousToDiscrete.continuous(r[col][i],BIN_SIZE) #-- approximating based on arbitrary bins
r[col][i] = _approx
_df[col] = r[col]
#
# Let's cast the type to the original type (it makes the data more usable)
#
otype = df[col].dtype
_df[col] = _df[col].astype(otype)
#
# @TODO: log basic stats about the synthetic attribute
#
# print (r)s
# break
return _df