diff --git a/README.md b/README.md index f3c92ed..32224c2 100644 --- a/README.md +++ b/README.md @@ -13,17 +13,19 @@ This package is designed to generate synthetic data from a dataset from an origi After installing the easiest way to get started is as follows (using pandas). The process is as follows: +Read about [data-transport on github](https://github.com/lnyemba/data-transport) or on [healthcareio.the-phi.com/git/code/transport](https://healthcareio.the-phi.com/git/code/transport.git) + **Train the GAN on the original/raw dataset** +1. We define the data sources + +The sources will consists in source, target and logger20. import pandas as pd import data.maker + import transport + from transport import providers - df = pd.read_csv('sample.csv') - column = 'gender' - id = 'id' - context = 'demo' - data.maker.train(context=context,data=df,column=column,id=id,logs='logs') The trainer will store the data on disk (for now) in a structured folder that will hold training models that will be used to generate the synthetic data. diff --git a/data/__init__.py b/data/__init__.py index 2b4a6aa..91b566d 100644 --- a/data/__init__.py +++ b/data/__init__.py @@ -3,3 +3,4 @@ from data.params import SYS_ARGS import transport from multiprocessing import Process, Queue from data.maker import prepare +from data.maker import state diff --git a/data/gan.py b/data/gan.py index 812426a..3727edb 100644 --- a/data/gan.py +++ b/data/gan.py @@ -100,6 +100,13 @@ class GNet : self.TOTAL_BATCHSIZE = self.BATCHSIZE_PER_GPU * self.NUM_GPUS self.STEPS_PER_EPOCH = 256 #int(np.load('ICD9/train.npy').shape[0] / 2000) self.MAX_EPOCHS = 10 if 'max_epochs' not in args else int(args['max_epochs']) + CHECKPOINT_SKIPS = 10 + if self.MAX_EPOCHS < 2*CHECKPOINT_SKIPS : + CHECKPOINT_SKIPS = 2 + self.CHECKPOINTS = np.repeat( np.divide(self.MAX_EPOCHS,CHECKPOINT_SKIPS),CHECKPOINT_SKIPS ).cumsum().astype(int).tolist() + + + self.ROW_COUNT = args['real'].shape[0] if 'real' in args else 100 self.CONTEXT = args['context'] self.ATTRIBUTES = {"id":args['column_id'] if 'column_id' in args else None,"synthetic":args['column'] if 'column' in args else None} @@ -120,14 +127,18 @@ class GNet : for key in ['train','output'] : self.mkdir(os.sep.join([self.log_dir,key])) self.mkdir (os.sep.join([self.log_dir,key,self.CONTEXT])) - if 'partition' in args : - self.mkdir (os.sep.join([self.log_dir,key,self.CONTEXT,str(args['partition'])])) - + # if 'partition' in args : + # self.mkdir (os.sep.join([self.log_dir,key,self.CONTEXT,str(args['partition'])])) self.train_dir = os.sep.join([self.log_dir,'train',self.CONTEXT]) self.out_dir = os.sep.join([self.log_dir,'output',self.CONTEXT]) if 'partition' in args : self.train_dir = os.sep.join([self.train_dir,str(args['partition'])]) self.out_dir = os.sep.join([self.out_dir,str(args['partition'])]) + + for checkpoint in self.CHECKPOINTS : + self.mkdir (os.sep.join([self.train_dir,str(checkpoint)])) + self.mkdir (os.sep.join([self.out_dir,str(checkpoint)])) + # if self.logger : # We will clear the logs from the data-store @@ -150,12 +161,13 @@ class GNet : attr = json.loads((open(_name)).read()) for key in attr : value = attr[key] - setattr(self,key,value) + if not hasattr(self,key): + setattr(self,key,value) self.train_dir = os.sep.join([self.log_dir,'train',self.CONTEXT]) self.out_dir = os.sep.join([self.log_dir,'output',self.CONTEXT]) - if 'partition' in args : - self.train_dir = os.sep.join([self.train_dir,str(args['partition'])]) - self.out_dir = os.sep.join([self.out_dir,str(args['partition'])]) + # if 'partition' in args : + # self.train_dir = os.sep.join([self.train_dir,str(args['partition'])]) + # self.out_dir = os.sep.join([self.out_dir,str(args['partition'])]) def log_meta(self,**args) : @@ -183,15 +195,24 @@ class GNet : suffix = self.CONTEXT #self.get.suffix() _name = os.sep.join([self.out_dir,'meta-'+suffix]) - f = open(_name+'.json','w') - f.write(json.dumps(_object)) + # f = open(_name+'.json','w') + # f.write(json.dumps(_object)) + # f.close() + + for _info in [{"name":os.sep.join([self.out_dir,'meta-'+suffix+'.json']),"data":_object},{"name":os.sep.join([self.out_dir,'epochs.json']),"data":self.logs['epochs'] if 'epochs' in self.logs else []}] : + f = open(_info['name'],'w') + f.write(json.dumps(_info['data'])) + f.close() return _object def mkdir (self,path): if not os.path.exists(path) : if os.sep in path : pass root = [] - for loc in path.split(os.sep) : + + for loc in path.strip().split(os.sep) : + if loc == '' : + root.append(os.sep) root.append(loc) if not os.path.exists(os.sep.join(root)) : os.mkdir(os.sep.join(root)) @@ -278,8 +299,10 @@ class Generator (GNet): tf.compat.v1.add_to_collection('glosses', loss) return loss, loss def load_meta(self, **args): - super().load_meta(**args) + # super().load_meta(**args) self.discriminator.load_meta(**args) + + def network(self,**args) : """ This function will build the network that will generate the synthetic candidates @@ -381,6 +404,7 @@ class Train (GNet): self.logger.write({"module":"gan-train","action":"start","input":{"partition":self.PARTITION,"meta":self.meta} } ) + # self.log (real_shape=list(self._REAL.shape),label_shape = self._LABEL.shape,meta_data=self.meta) def load_meta(self, column): """ @@ -445,7 +469,7 @@ class Train (GNet): else : dataset = tf.data.Dataset.from_tensor_slices(features_placeholder) # labels_placeholder = None - dataset = dataset.repeat(10000) + dataset = dataset.repeat(20000) dataset = dataset.batch(batch_size=self.BATCHSIZE_PER_GPU) dataset = dataset.prefetch(1) @@ -472,9 +496,11 @@ class Train (GNet): if self._LABEL is not None : (real, label) = iterator.get_next() else: + real = iterator.get_next() label= None loss, w = self.loss(scope=scope, stage=stage, real=real, label=label) + #tf.get_variable_scope().reuse_variables() tf.compat.v1.get_variable_scope().reuse_variables() #vars_ = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope=stage) @@ -507,6 +533,7 @@ class Train (GNet): # init = tf.global_variables_initializer() init = tf.compat.v1.global_variables_initializer() logs = [] + self.logs['epochs'] = [] #with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)) as sess: with tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(allow_soft_placement=True, log_device_placement=False)) as sess: @@ -536,25 +563,41 @@ class Train (GNet): logs.append({"epoch": int(epoch),"distance":float(-w_sum/(self.STEPS_PER_EPOCH*2)) }) # if epoch % self.MAX_EPOCHS == 0: - if epoch in [5,10,20,50,75, self.MAX_EPOCHS] : + # if epoch in [5,10,20,50,75, self.MAX_EPOCHS] : + if epoch in self.CHECKPOINTS or int(epoch) == 1: # suffix = "-".join(self.ATTRIBUTES['synthetic']) if isinstance(self.ATTRIBUTES['synthetic'],list) else self.ATTRIBUTES['synthetic'] suffix = self.CONTEXT #self.get.suffix() - _name = os.sep.join([self.train_dir,suffix]) + _name = os.sep.join([self.train_dir,str(epoch),suffix]) # saver.save(sess, self.train_dir, write_meta_graph=False, global_step=epoch) saver.save(sess, _name, write_meta_graph=False, global_step=epoch) + # # + + logs = [{"path":_name,"epochs":int(epoch),"loss":float(-w_sum/(self.STEPS_PER_EPOCH*2))}] if self.logger : - row = {"module":"gan-train","action":"logs","input":{"partition":self.PARTITION,"logs":logs}} #,"model":pickle.dump(sess)} - self.logger.write(row) - + # row = {"module":"gan-train","action":"epochs","input":{"logs":logs}} #,"model":pickle.dump(sess)} + # self.logger.write(row) + self.logs['epochs'] += logs # # @TODO: # We should upload the files in the checkpoint # This would allow the learnt model to be portable to another system # tf.compat.v1.reset_default_graph() - + # + # let's sort the epochs we've logged thus far (if any) + # + self.logs['epochs'].sort(key=lambda _item: _item['loss']) + if self.logger : + _log = {'module':'gan-train','action':'epochs','input':self.logs['epochs']} + self.logger.write(_log) + + # + # @TODO: + # Make another copy of this on disk to be able to load it should we not have a logger setup + # + self.log_meta() class Predict(GNet): """ This class uses synthetic data given a learned model @@ -565,6 +608,7 @@ class Predict(GNet): self.values = args['values'] self.ROW_COUNT = args['row_count'] self.oROW_COUNT = self.ROW_COUNT + # self.MISSING_VALUES = np.nan_to_num(np.nan) # if 'no_value' in args and args['no_value'] not in ['na','','NA'] : # self.MISSING_VALUES = args['no_value'] @@ -577,9 +621,20 @@ class Predict(GNet): super().load_meta(**args) self.generator.load_meta(**args) self.ROW_COUNT = self.oROW_COUNT + # + # updating the input/output for the generator, so it points properly + # + + for object in [self,self.generator] : + _train_dir = os.sep.join([self.log_dir,'train',self.CONTEXT,str(self.MAX_EPOCHS)]) + _out_dir= os.sep.join([self.log_dir,'output',self.CONTEXT,str(self.MAX_EPOCHS)]) + setattr(object,'train_dir',_train_dir) + setattr(object,'out_dir',_out_dir) def apply(self,**args): suffix = self.CONTEXT #self.get.suffix() model_dir = os.sep.join([self.train_dir,suffix+'-'+str(self.MAX_EPOCHS)]) + # model_dir = os.sep.join([self.train_dir,str(self.MAX_EPOCHS)]) + demo = self._LABEL #np.zeros([self.ROW_COUNT,self.NUM_LABELS]) #args['de"shape":{"LABEL":list(self._LABEL.shape)} mo'] # # setup computational graph diff --git a/data/maker/__init__.py b/data/maker/__init__.py index cdc48e2..dea44eb 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -15,6 +15,7 @@ import transport # from data.bridge import Binary import threading from data.maker import prepare +from data.maker.state import State import copy import os import nujson as json @@ -25,6 +26,7 @@ from multiprocessing import Queue import time + class Learner(Process): def __init__(self,**_args): @@ -48,7 +50,7 @@ class Learner(Process): if 'network_args' not in _args : self.network_args ={ 'context':self.info['context'] , - 'logs':_args['logpath'] if 'logpath' in _args else 'logs', + 'logs':_args['logs'] if 'logs' in _args else 'logs', 'max_epochs':int(_args['epochs']) if 'epochs' in _args else 2, 'batch_size':int (_args['batch']) if 'batch' in _args else 2000 } @@ -72,6 +74,36 @@ class Learner(Process): self.logger = None if 'logger' in self.store : self.logger = transport.factory.instance(**self.store['logger']) + self.autopilot = False #-- to be set by caller + self._initStateSpace() + def _initStateSpace(self): + """ + Initializing state-space for the data-maker, The state-space functions are used as pre-post processing functions applied to the data accordingly i.e + - Trainer -> pre-processing + - Generation -> post processing + The specifications of a state space in the configuration file is as such + state:{pre:{path,pipeline:[]}, post:{path,pipeline:[]}} + """ + self._states = None + + if 'state' in self.info : + try: + _config = self.info ['state'] + self._states = State.instance(_config) + except Exception as e: + print (e) + pass + finally: + # __info = (pd.DataFrame(self._states)[['name','path','args']]).to_dict(orient='records') + if self._states : + __info = {} + + for key in self._states : + __info[key] = [{"name":_item['name'],"args":_item['args'],"path":_item['path']} for _item in self._states[key]] + self.log(object='state-space',action='load',input=__info) + + + def log(self,**_args): try: @@ -108,11 +140,36 @@ class Learner(Process): _read_args= self.info if self._df is None : self._df = reader.read(**_read_args) + # + # NOTE : PRE + # At this point we apply pre-processing of the data if there were ever a need for it + # + _log = {} + HAS_STATES = self._states is not None and 'pre' in self._states + NOT_GENERATING = self.name in ['Trainer','Shuffle'] + IS_AUTOPILOT = self.autopilot + # + # allow calling pre-conditions if either of the conditions is true + # 1. states and not generating + # 2. IS_GENERATING and states and not autopilot + _ALLOW_PRE_CALL = (HAS_STATES and NOT_GENERATING) or (NOT_GENERATING is False and HAS_STATES and IS_AUTOPILOT is False) + if _ALLOW_PRE_CALL : + # if HAS_STATES and NOT_GENERATING or (HAS_STATES and IS_AUTOPILOT is False and NOT_GENERATING is False): + _logs = {'action':'status','input':{'pre':self._states['pre']}} + _beg = list(self._df.shape) + self._df = State.apply(self._df,self._states['pre']) + _end = list(self._df.shape) + _logs['input']['size'] = _beg,_end + self.log(**_log) + + # + # + columns = self.columns if self.columns else self._df.columns # # Below is a source of inefficiency, unfortunately python's type inference doesn't work well in certain cases # - The code below tries to address the issue (Perhaps better suited for the reading components) - _log = {} + for name in columns : # # randomly sampling 5 elements to make sense of data-types @@ -201,8 +258,14 @@ class Trainer(Learner): # @TODO: At this point we need to generate another some other objects # _args = {"network_args":self.network_args,"store":self.store,"info":self.info,"candidates":self.candidates,"data":self._df} + _args['logs'] = self.network_args['logs'] + _args['autopilot'] = self.autopilot if self.gpu : _args['gpu'] = self.gpu + + # + # Let us find the smallest, the item is sorted by loss ... + _args['epochs'] = gTrain.logs['epochs'][0]['epochs'] g = Generator(**_args) # g.run() @@ -239,6 +302,7 @@ class Generator (Learner): file.close() else: self._map = {} + self.autopilot = False if 'autopilot' not in _args else _args['autopilot'] def run(self): self.initalize() if self._encoder is None : @@ -416,33 +480,32 @@ class Generator (Learner): _df = self._df.copy() _df[self.columns] = _iodf[self.columns] N += _df.shape[0] - # - #@TODO: - # Improve formatting with better post-processing pipeline - if 'approximate' in self.info : - _df = self.approximate(_df) - if 'make_date' in self.info : - for name in self.info['make_date'] : - # iname = self.info['make_date']['init_field'] - iname = self.info['make_date'][name] + if self._states : + _df = State.apply(_df,self._states['post']) + # # + # #@TODO: + # # Improve formatting with better post-processing pipeline + # if 'approximate' in self.info : + # _df = self.approximate(_df) + # if 'make_date' in self.info : + # for name in self.info['make_date'] : + # # iname = self.info['make_date']['init_field'] + # iname = self.info['make_date'][name] - years = _df[iname] - _dates = [self.make_date(year=_year,field=name) for _year in years] - if _dates : - _df[name] = _dates + # years = _df[iname] + # _dates = [self.make_date(year=_year,field=name) for _year in years] + # if _dates : + # _df[name] = _dates _schema = self.get_schema() - # _schema = [{'name':_item.name,'type':_item.field_type} for _item in _schema] + _df = self.format(_df,_schema) _log = [{"name":_schema[i]['name'],"dataframe":_df[_df.columns[i]].dtypes.name,"schema":_schema[i]['type']} for i in np.arange(len(_schema)) ] self.log(**{"action":"consolidate","input":_log}) - # w = transport.factory.instance(doc='observation',provider='mongodb',context='write',db='IOV01_LOGS',auth_file='/home/steve/dev/transport/mongo.json') - # w.write(_df) - # cols = [name for name in _df.columns if name.endswith('datetime')] - # print (_df[cols]) + if _store : writer = transport.factory.instance(**_store) if _store['provider'] == 'bigquery': @@ -507,8 +570,10 @@ class factory : :param info {columns,sql,from} :param autopilot will generate output automatically :param batch (default 2k) size of the batch + """ + if _args['apply'] in [apply.RANDOM] : pthread = Shuffle(**_args) elif _args['apply'] == apply.GENERATE : diff --git a/data/maker/prepare/__init__.py b/data/maker/prepare/__init__.py index c8331bd..b11be57 100644 --- a/data/maker/prepare/__init__.py +++ b/data/maker/prepare/__init__.py @@ -276,7 +276,7 @@ class Input : if np.random.choice([0,1],1)[0] : novalues = _values[np.random.choice( len(_values),1)[0]].tolist() else: - novalues = np.repeat(None,len(self._columns)) + novalues = np.repeat(None,len(self._columns)) x = _matrix.apply(lambda row: _values[row.values == 1].tolist()[0] if (row.values == 1).sum() > 0 else novalues ,axis=1).tolist() return pd.DataFrame(x,columns=columns)