You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
430 lines
16 KiB
Python
430 lines
16 KiB
Python
"""
|
|
(c) 2019 Data Maker, hiplab.mc.vanderbilt.edu
|
|
version 1.0.0
|
|
|
|
This package serves as a proxy to the overall usage of the framework.
|
|
This package is designed to generate synthetic data from a dataset from an original dataset using deep learning techniques
|
|
|
|
@TODO:
|
|
- Make configurable GPU, EPOCHS
|
|
"""
|
|
import pandas as pd
|
|
import numpy as np
|
|
import data.gan as gan
|
|
import transport
|
|
# from data.bridge import Binary
|
|
import threading as thread
|
|
from data.maker import prepare
|
|
import copy
|
|
import os
|
|
import json
|
|
from multiprocessing import Process, RLock
|
|
from datetime import datetime, timedelta
|
|
|
|
class ContinuousToDiscrete :
|
|
ROUND_UP = 2
|
|
@staticmethod
|
|
def binary(X,n=4) :
|
|
"""
|
|
This function will convert a continous stream of information into a variety a bit stream of bins
|
|
"""
|
|
values = np.array(X).astype(np.float32)
|
|
BOUNDS = ContinuousToDiscrete.bounds(values,n)
|
|
matrix = np.repeat(np.zeros(n),len(X)).reshape(len(X),n)
|
|
|
|
|
|
@staticmethod
|
|
def bounds(x,n):
|
|
# return np.array_split(x,n)
|
|
values = np.round(x,ContinuousToDiscrete.ROUND_UP)
|
|
return list(pd.cut(values,n).categories)
|
|
|
|
|
|
|
|
@staticmethod
|
|
def continuous(X,BIN_SIZE=4) :
|
|
"""
|
|
This function will approximate a binary vector given boundary information
|
|
:X binary matrix
|
|
:BIN_SIZE
|
|
"""
|
|
BOUNDS = ContinuousToDiscrete.bounds(X,BIN_SIZE)
|
|
|
|
values = []
|
|
# _BINARY= ContinuousToDiscrete.binary(X,BIN_SIZE)
|
|
# # # print (BOUNDS)
|
|
l = {}
|
|
for i in np.arange(len(X)): #value in X :
|
|
|
|
value = X[i]
|
|
|
|
for item in BOUNDS :
|
|
if value >= item.left and value <= item.right :
|
|
values += [np.round(np.random.uniform(item.left,item.right),ContinuousToDiscrete.ROUND_UP)]
|
|
break
|
|
# values += [ np.round(np.random.uniform(item.left,item.right),ContinuousToDiscrete.ROUND_UP) for item in BOUNDS if value >= item.left and value <= item.right ]
|
|
|
|
|
|
# # values = []
|
|
# for row in _BINARY :
|
|
# # ubound = BOUNDS[row.index(1)]
|
|
# index = np.where(row == 1)[0][0]
|
|
|
|
# ubound = BOUNDS[ index ].right
|
|
# lbound = BOUNDS[ index ].left
|
|
|
|
# x_ = np.round(np.random.uniform(lbound,ubound),ContinuousToDiscrete.ROUND_UP).astype(float)
|
|
# values.append(x_)
|
|
|
|
# lbound = ubound
|
|
|
|
# values = [np.random.uniform() for item in BOUNDS]
|
|
|
|
return values
|
|
|
|
|
|
def train (**_args):
|
|
"""
|
|
:params sql
|
|
:params store
|
|
"""
|
|
|
|
_inputhandler = prepare.Input(**_args)
|
|
values,_matrix = _inputhandler.convert()
|
|
args = {"real":_matrix,"context":_args['context']}
|
|
_map = {}
|
|
if 'store' in _args :
|
|
#
|
|
# This
|
|
|
|
args['store'] = copy.deepcopy(_args['store']['logs'])
|
|
if 'args' in _args['store']:
|
|
args['store']['args']['doc'] = _args['context']
|
|
else:
|
|
|
|
args['store']['doc'] = _args['context']
|
|
logger = transport.factory.instance(**args['store'])
|
|
args['logger'] = logger
|
|
|
|
for key in _inputhandler._map :
|
|
beg = _inputhandler._map[key]['beg']
|
|
end = _inputhandler._map[key]['end']
|
|
values = _inputhandler._map[key]['values'].tolist()
|
|
_map[key] = {"beg":beg,"end":end,"values":np.array(values).astype(str).tolist()}
|
|
info = {"rows":_matrix.shape[0],"cols":_matrix.shape[1],"map":_map}
|
|
print()
|
|
# print ([_args['context'],_inputhandler._io])
|
|
logger.write({"module":"gan-train","action":"data-prep","context":_args['context'],"input":_inputhandler._io})
|
|
|
|
args['logs'] = _args['logs'] if 'logs' in _args else 'logs'
|
|
args ['max_epochs'] = _args['max_epochs']
|
|
args['matrix_size'] = _matrix.shape[0]
|
|
args['batch_size'] = 2000
|
|
if 'partition' in _args :
|
|
args['partition'] = _args['partition']
|
|
if 'gpu' in _args :
|
|
args['gpu'] = _args['gpu']
|
|
# os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0'
|
|
|
|
trainer = gan.Train(**args)
|
|
#
|
|
# @TODO: Write the map.json in the output directory for the logs
|
|
#
|
|
# f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json']),'w')
|
|
f = open(os.sep.join([trainer.out_dir,'map.json']),'w')
|
|
f.write(json.dumps(_map))
|
|
f.close()
|
|
|
|
trainer.apply()
|
|
pass
|
|
|
|
def get(**args):
|
|
"""
|
|
This function will restore a checkpoint from a persistant storage on to disk
|
|
"""
|
|
pass
|
|
def generate(**_args):
|
|
"""
|
|
This function will generate a set of records, before we must load the parameters needed
|
|
:param data
|
|
:param context
|
|
:param logs
|
|
"""
|
|
_args['logs'] = _args['logs'] if 'logs' in _args else 'logs'
|
|
partition = _args['partition'] if 'partition' in _args else None
|
|
if not partition :
|
|
MAP_FOLDER = os.sep.join([_args['logs'],'output',_args['context']])
|
|
# f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json']))
|
|
else:
|
|
MAP_FOLDER = os.sep.join([_args['logs'],'output',_args['context'],str(partition)])
|
|
# f = open(os.sep.join([_args['logs'],'output',_args['context'],str(partition),'map.json']))
|
|
f = open(os.sep.join([MAP_FOLDER,'map.json']))
|
|
_map = json.loads(f.read())
|
|
f.close()
|
|
#
|
|
#
|
|
# if 'file' in _args :
|
|
# df = pd.read_csv(_args['file'])
|
|
# else:
|
|
# df = _args['data'] if not isinstance(_args['data'],str) else pd.read_csv(_args['data'])
|
|
args = {"context":_args['context'],"max_epochs":_args['max_epochs'],"candidates":_args['candidates']}
|
|
args['logs'] = _args['logs'] if 'logs' in _args else 'logs'
|
|
args ['max_epochs'] = _args['max_epochs']
|
|
# args['matrix_size'] = _matrix.shape[0]
|
|
args['batch_size'] = 2000
|
|
args['partition'] = 0 if 'partition' not in _args else _args['partition']
|
|
args['row_count'] = _args['data'].shape[0]
|
|
#
|
|
# @TODO: perhaps get the space of values here ... (not sure it's a good idea)
|
|
#
|
|
_args['map'] = _map
|
|
_inputhandler = prepare.Input(**_args)
|
|
values,_matrix = _inputhandler.convert()
|
|
args['values'] = np.array(values)
|
|
if 'gpu' in _args :
|
|
args['gpu'] = _args['gpu']
|
|
|
|
handler = gan.Predict (**args)
|
|
lparams = {'columns':None}
|
|
if partition :
|
|
lparams['partition'] = partition
|
|
handler.load_meta(**lparams)
|
|
#
|
|
# Let us now format the matrices by reverting them to a data-frame with values
|
|
#
|
|
|
|
candidates = handler.apply(candidates=args['candidates'])
|
|
return [_inputhandler.revert(matrix=_matrix) for _matrix in candidates]
|
|
|
|
class Learner(Process):
|
|
def __init__(self,**_args):
|
|
|
|
|
|
super(Learner, self).__init__()
|
|
if 'gpu' in _args :
|
|
print (_args['gpu'])
|
|
os.environ['CUDA_VISIBLE_DEVICES'] = str(_args['gpu'])
|
|
self.gpu = int(_args['gpu'])
|
|
else:
|
|
self.gpu = None
|
|
|
|
self.info = _args['info']
|
|
self.columns = self.info['columns'] if 'columns' in self.info else None
|
|
self.store = _args['store']
|
|
self.logger = transport.factory.instance(_args['logger']) if 'logger' in self.store else transport.factory.instance(provider='console',context='write',lock=True)
|
|
if 'network_args' not in _args :
|
|
self.network_args ={
|
|
'context':self.info['context'] ,
|
|
'logs':_args['logpath'] if 'logpath' in _args else 'logs',
|
|
'max_epochs':int(_args['epochs']) if 'epochs' in _args else 2,
|
|
'batch_size':int (_args['batch']) if 'batch' in _args else 2000
|
|
}
|
|
else:
|
|
self.network_args = _args['network_args']
|
|
self._encoder = None
|
|
self._map = None
|
|
self._df = _args['data'] if 'data' in _args else None
|
|
#
|
|
# @TODO: allow for verbose mode so we have a sens of what is going on within the newtork
|
|
#
|
|
|
|
# self.logpath= _args['logpath'] if 'logpath' in _args else 'logs'
|
|
# sel.max_epoc
|
|
def get_schema(self):
|
|
if self.store['source']['provider'] != 'bigquery' :
|
|
return [{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])]
|
|
else:
|
|
reader = transport.factory.instance(**self.store['source'])
|
|
return reader.meta(table=self.info['from'])
|
|
def initalize(self):
|
|
reader = transport.factory.instance(**self.store['source'])
|
|
_read_args= self.info
|
|
if self._df is None :
|
|
self._df = reader.read(**_read_args)
|
|
columns = self.columns if self.columns else self._df.columns
|
|
#
|
|
# convert the data to binary here ...
|
|
|
|
_args = {"schema":self.get_schema(),"data":self._df,"columns":columns}
|
|
if self._map :
|
|
_args['map'] = self._map
|
|
self._encoder = prepare.Input(**_args)
|
|
class Trainer(Learner):
|
|
"""
|
|
This will perform training using a GAN
|
|
"""
|
|
def __init__(self,**_args):
|
|
super().__init__(**_args)
|
|
# self.info = _args['info']
|
|
self.limit = int(_args['limit']) if 'limit' in _args else None
|
|
self.name = _args['name']
|
|
self.autopilot = _args['autopilot'] if 'autopilot' in _args else False
|
|
self.generate = None
|
|
self.candidates = int(_args['candidates']) if 'candidates' in _args else 1
|
|
def run(self):
|
|
self.initalize()
|
|
_space,_matrix = self._encoder.convert()
|
|
|
|
_args = self.network_args
|
|
if self.gpu :
|
|
_args['gpu'] = self.gpu
|
|
_args['real'] = _matrix
|
|
_args['candidates'] = self.candidates
|
|
#
|
|
# At this point we have the binary matrix, we can initiate training
|
|
#
|
|
|
|
gTrain = gan.Train(**_args)
|
|
gTrain.apply()
|
|
|
|
writer = transport.factory.instance(provider='file',context='write',path=os.sep.join([gTrain.out_dir,'map.json']))
|
|
writer.write(self._encoder._map,overwrite=True)
|
|
writer.close()
|
|
|
|
#
|
|
# @TODO: At this point we need to generate another some other objects
|
|
#
|
|
_args = {"network_args":self.network_args,"store":self.store,"info":self.info,"candidates":self.candidates,"data":self._df}
|
|
if self.gpu :
|
|
_args['gpu'] = self.gpu
|
|
g = Generator(**_args)
|
|
# g.run()
|
|
self.generate = g
|
|
if self.autopilot :
|
|
self.generate.run()
|
|
def generate (self):
|
|
if self.autopilot :
|
|
print( "Autopilot is set ... No need to call this function")
|
|
else:
|
|
raise Exception( "Autopilot has not been, Wait till training is finished. Use is_alive function on process object")
|
|
|
|
class Generator (Learner):
|
|
def __init__(self,**_args):
|
|
super().__init__(**_args)
|
|
#
|
|
# We need to load the mapping information for the space we are working with ...
|
|
#
|
|
self.network_args['candidates'] = int(_args['candidates']) if 'candidates' in _args else 1
|
|
filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'map.json'])
|
|
file = open(filename)
|
|
self._map = json.loads(file.read())
|
|
file.close()
|
|
def run(self):
|
|
self.initalize()
|
|
#
|
|
# The values will be returned because we have provided _map information from the constructor
|
|
#
|
|
values,_matrix = self._encoder.convert()
|
|
_args = self.network_args
|
|
_args['map'] = self._map
|
|
_args['values'] = np.array(values)
|
|
_args['row_count'] = self._df.shape[0]
|
|
|
|
gHandler = gan.Predict(**_args)
|
|
gHandler.load_meta(columns=None)
|
|
_iomatrix = gHandler.apply()
|
|
_candidates= [ self._encoder.revert(matrix=_item) for _item in _iomatrix]
|
|
self.post(_candidates)
|
|
def approximate(self,_df):
|
|
_columns = self.info['approximate']
|
|
# _schema = {}
|
|
# for _info in self.get_schema() :
|
|
# _schema[_info['name']] = _info['type']
|
|
|
|
|
|
for name in _columns :
|
|
batches = np.array_split(_df[name].fillna(np.nan).values,2)
|
|
_type = np.int64 if 'int' in self.info['approximate'][name]else np.float64
|
|
x = []
|
|
for values in batches :
|
|
|
|
index = np.where(values != '')
|
|
_values = np.random.dirichlet(values[index].astype(_type))
|
|
values[index] = list(values[index] + _values )if np.random.randint(0,2) else list(values[index] - _values)
|
|
values[index] = values[index].astype(_type)
|
|
x += values.tolist()
|
|
if x :
|
|
_df[name] = x #np.array(x,dtype=np.int64) if 'int' in _type else np.arry(x,dtype=np.float64)
|
|
return _df
|
|
def make_date(self,**_args) :
|
|
"""
|
|
:param year initial value
|
|
"""
|
|
if _args['year'] in ['',None,np.nan] :
|
|
return None
|
|
year = int(_args['year'])
|
|
offset = _args['offset'] if 'offset' in _args else 0
|
|
month = np.random.randint(1,13)
|
|
if month == 2:
|
|
_end = 28 if year % 4 != 0 else 29
|
|
else:
|
|
_end = 31 if month in [1,3,5,7,8,10,12] else 30
|
|
day = np.random.randint(1,_end)
|
|
|
|
#-- synthetic date
|
|
_date = datetime(year=year,month=month,day=day)
|
|
FORMAT = '%Y-%m-%d'
|
|
if 'format' in self.info and 'field' in _args and _args['field'] in self.info['format']:
|
|
_name = _args['field']
|
|
FORMAT = self.info['format'][_name]
|
|
|
|
|
|
|
|
r = []
|
|
if offset :
|
|
r = [_date.strftime(FORMAT)]
|
|
for _delta in offset :
|
|
_date = _date + timedelta(_delta)
|
|
r.append(_date.strftime(FORMAT))
|
|
return r
|
|
else:
|
|
return _date.strftime(FORMAT)
|
|
|
|
pass
|
|
def format(self,_df):
|
|
pass
|
|
def post(self,_candidates):
|
|
|
|
_store = self.store['target'] if 'target' in self.store else {'provider':'console'}
|
|
_store['lock'] = True
|
|
_store['context'] = 'write' #-- Just in case
|
|
if 'table' not in _store :
|
|
_store['table'] = self.info['from']
|
|
writer = transport.factory.instance(**_store)
|
|
|
|
for _iodf in _candidates :
|
|
_df = self._df.copy()
|
|
_df[self.columns] = _iodf[self.columns]
|
|
#
|
|
#@TODO:
|
|
# Improve formatting with better post-processing pipeline
|
|
if 'approximate' in self.info :
|
|
_df = self.approximate(_df)
|
|
if 'make_date' in self.info :
|
|
for name in self.info['make_date'] :
|
|
# iname = self.info['make_date']['init_field']
|
|
iname = self.info['make_date'][name]
|
|
|
|
years = _df[iname]
|
|
_dates = [self.make_date(year=year,field=name) for year in years]
|
|
if _dates :
|
|
_df[name] = _dates
|
|
_schema = self.get_schema()
|
|
_schema = [{'name':_item.name,'type':_item.field_type} for _item in _schema]
|
|
writer.write(_df[['birth_datetime']+self.columns],schema=_schema)
|
|
pass
|
|
class factory :
|
|
_infocache = {}
|
|
@staticmethod
|
|
def instance(**_args):
|
|
"""
|
|
An instance of an object that trains and generates candidate datasets
|
|
:param gpu (optional) index of the gpu to be used if using one
|
|
:param store {source,target} if no target is provided console will be output
|
|
:param epochs (default 2) number of epochs to train
|
|
:param candidates(default 1) number of candidates to generate
|
|
:param info {columns,sql,from}
|
|
:param autopilot will generate output automatically
|
|
:param batch (default 2k) size of the batch
|
|
"""
|
|
return Trainer(**_args) |