""" (c) 2019 Data Maker, hiplab.mc.vanderbilt.edu version 1.0.0 This package serves as a proxy to the overall usage of the framework. This package is designed to generate synthetic data from a dataset from an original dataset using deep learning techniques @TODO: - Make configurable GPU, EPOCHS """ import pandas as pd import numpy as np import data.gan as gan import transport # from data.bridge import Binary import threading as thread from data.maker import prepare import copy import os import json from multiprocessing import Process, RLock from datetime import datetime, timedelta class Learner(Process): def __init__(self,**_args): super(Learner, self).__init__() self.ndx = 0 self.lock = RLock() if 'gpu' in _args : os.environ['CUDA_VISIBLE_DEVICES'] = str(_args['gpu']) self.gpu = int(_args['gpu']) else: self.gpu = None self.info = _args['info'] self.columns = self.info['columns'] if 'columns' in self.info else None self.store = _args['store'] if 'network_args' not in _args : self.network_args ={ 'context':self.info['context'] , 'logs':_args['logpath'] if 'logpath' in _args else 'logs', 'max_epochs':int(_args['epochs']) if 'epochs' in _args else 2, 'batch_size':int (_args['batch']) if 'batch' in _args else 2000 } else: self.network_args = _args['network_args'] self._encoder = None self._map = None self._df = _args['data'] if 'data' in _args else None self.name = self.__class__.__name__ # # @TODO: allow for verbose mode so we have a sens of what is going on within the newtork # _log = {'action':'init','context':self.info['context'],'gpu':(self.gpu if self.gpu is not None else -1)} self.log(**_log) # self.logpath= _args['logpath'] if 'logpath' in _args else 'logs' # sel.max_epoc def log(self,**_args): # self.lock.acquire() try: logger = transport.factory.instance(**self.store['logger']) if 'logger' in self.store else transport.factory.instance(provider='console',context='write',lock=True) _args = dict({'ndx':self.ndx,'module':self.name,'table':self.info['from'],'info':self.info['context'],**_args}) logger.write(_args) self.ndx += 1 if hasattr(logger,'close') : logger.close() except Exception as e: print () print (_args) print (e) pass finally: # self.lock.release() pass def get_schema(self): if self.store['source']['provider'] != 'bigquery' : return [] #{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])] else: reader = transport.factory.instance(**self.store['source']) return reader.meta(table=self.info['from']) def initalize(self): reader = transport.factory.instance(**self.store['source']) _read_args= self.info if self._df is None : self._df = reader.read(**_read_args) columns = self.columns if self.columns else self._df.columns # # convert the data to binary here ... _args = {"schema":self.get_schema(),"data":self._df,"columns":columns} if self._map : _args['map'] = self._map self._encoder = prepare.Input(**_args) if self._df.shape[0] > 0 else None _log = {'action':'data-prep','input':{'rows':int(self._df.shape[0]),'cols':int(self._df.shape[1]) } } self.log(**_log) class Trainer(Learner): """ This will perform training using a GAN """ def __init__(self,**_args): super().__init__(**_args) # self.info = _args['info'] self.limit = int(_args['limit']) if 'limit' in _args else None self.autopilot = _args['autopilot'] if 'autopilot' in _args else False self.generate = None self.candidates = int(_args['candidates']) if 'candidates' in _args else 1 def run(self): self.initalize() if self._encoder is None : # # @TODO Log that the dataset was empty or not statistically relevant return _space,_matrix = self._encoder.convert() _args = self.network_args if self.gpu : _args['gpu'] = self.gpu _args['real'] = _matrix _args['candidates'] = self.candidates # # At this point we have the binary matrix, we can initiate training # beg = datetime.now() #.strftime('%Y-%m-%d %H:%M:%S') gTrain = gan.Train(**_args) gTrain.apply() writer = transport.factory.instance(provider='file',context='write',path=os.sep.join([gTrain.out_dir,'map.json'])) writer.write(self._encoder._map,overwrite=True) writer.close() # # @TODO: At this point we need to generate another some other objects # _args = {"network_args":self.network_args,"store":self.store,"info":self.info,"candidates":self.candidates,"data":self._df} if self.gpu : _args['gpu'] = self.gpu g = Generator(**_args) # g.run() end = datetime.now() #.strftime('%Y-%m-%d %H:%M:%S') _min = float((end-beg).seconds/ 60) _logs = {'action':'train','input':{'start':beg.strftime('%Y-%m-%d %H:%M:%S'),'minutes':_min,"unique_counts":self._encoder._io[0]}} self.log(**_logs) self.generate = g if self.autopilot : self.generate.run() def generate (self): if self.autopilot : print( "Autopilot is set ... No need to call this function") else: raise Exception( "Autopilot has not been, Wait till training is finished. Use is_alive function on process object") class Generator (Learner): def __init__(self,**_args): super().__init__(**_args) # # We need to load the mapping information for the space we are working with ... # self.network_args['candidates'] = int(_args['candidates']) if 'candidates' in _args else 1 filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'map.json']) self.log(**{'action':'init-map','input':{'filename':filename,'exists':os.path.exists(filename)}}) if os.path.exists(filename): file = open(filename) self._map = json.loads(file.read()) file.close() else: self._map = {} def run(self): self.initalize() if self._encoder is None : # # @TODO Log that the dataset was empty or not statistically relevant return # # The values will be returned because we have provided _map information from the constructor # values,_matrix = self._encoder.convert() _args = self.network_args _args['map'] = self._map _args['values'] = np.array(values) _args['row_count'] = self._df.shape[0] if self.gpu : _args['gpu'] = self.gpu gHandler = gan.Predict(**_args) gHandler.load_meta(columns=None) _iomatrix = gHandler.apply() _candidates= [ self._encoder.revert(matrix=_item) for _item in _iomatrix] _size = np.sum([len(_item) for _item in _iomatrix]) _log = {'action':'io-data','input':{'candidates':len(_candidates),'rows':int(_size)}} self.log(**_log) self.post(_candidates) def approximate(self,_df): _columns = self.info['approximate'] for name in _columns : if _df[name].size > 100 : BATCH_SIZE = 10 else: BATCH_SIZE = 1 batches = np.array_split(_df[name].fillna(np.nan).values,BATCH_SIZE) _type = np.int64 if 'int' in self.info['approximate'][name]else np.float64 x = [] _log = {'action':'approximate','input':{'batch':BATCH_SIZE,'col':name}} for values in batches : index = [ _x not in ['',None,np.nan] for _x in values] if np.sum(index) == 0: # # Sometimes messy data has unpleasant surprises continue _values = np.random.rand( len(values[index])) _values += np.std(values[index]) / 4 values[index] = list(values[index] + _values )if np.random.randint(0,2) else list(values[index] - _values) values[index] = values[index].astype(_type) x += values.tolist() if x : _log['input']['identical_percentage'] = 100 * (np.divide( (_df[name].dropna() == x).sum(),_df[name].dropna().size)) _df[name] = x #np.array(x,dtype=np.int64) if 'int' in _type else np.arry(x,dtype=np.float64) self.log(**_log) return _df def make_date(self,**_args) : """ :param year initial value """ if _args['year'] in ['',None,np.nan] : return None year = int(_args['year']) offset = _args['offset'] if 'offset' in _args else 0 month = np.random.randint(1,13) if month == 2: _end = 28 if year % 4 != 0 else 29 else: _end = 31 if month in [1,3,5,7,8,10,12] else 30 day = np.random.randint(1,_end) #-- synthetic date _date = datetime(year=year,month=month,day=day,minute=0,hour=0,second=0) FORMAT = '%Y-%m-%d' _name = _args['field'] if 'field' in _args else None if 'format' in self.info and _name in self.info['format']: # _name = _args['field'] FORMAT = self.info['format'][_name] # print ([_name,FORMAT, _date.strftime(FORMAT)]) r = [] if offset : r = [_date.strftime(FORMAT)] for _delta in offset : _date = _date + timedelta(_delta) r.append(_date.strptime(FORMAT)) return r else: return _date.strftime(FORMAT) pass def format(self,_df,_schema): r = {} for _item in _schema : name = _item['name'] if _item['type'].upper() in ['DATE','DATETIME','TIMESTAMP'] : FORMAT = '%Y-%m-%d' try: # #-- Sometimes data isn't all it's meant to be SIZE = -1 if 'format' in self.info and name in self.info['format'] : FORMAT = self.info['format'][name] SIZE = 10 elif _item['type'] in ['DATETIME','TIMESTAMP'] : FORMAT = '%Y-%m-%d %H:%M:%S' SIZE = 19 if SIZE > 0 : values = pd.to_datetime(_df[name], format=FORMAT).astype(str) _df[name] = [_date[:SIZE] for _date in values] r[name] = FORMAT # _df[name] = pd.to_datetime(_df[name], format=FORMAT) #.astype('datetime64[ns]') if _item['type'] in ['DATETIME','TIMESTAMP']: pass #;_df[name] = _df[name].fillna('').astype('datetime64[ns]') except Exception as e: pass finally: pass else: # # Because types are inferred on the basis of the sample being processed they can sometimes be wrong # To help disambiguate we add the schema information _type = None if 'int' in _df[name].dtypes.name or 'int' in _item['type'].lower(): _type = np.int elif 'float' in _df[name].dtypes.name or 'float' in _item['type'].lower(): _type = np.float if _type : _df[name] = _df[name].fillna(0).replace('',0).astype(_type) # _df = _df.replace('NaT','').replace('NA','') if r : self.log(**{'action':'format','input':r}) return _df pass def post(self,_candidates): _store = self.store['target'] if 'target' in self.store else {'provider':'console'} _store['lock'] = True _store['context'] = 'write' #-- Just in case if 'table' not in _store : _store['table'] = self.info['from'] N = 0 for _iodf in _candidates : _df = self._df.copy() _df[self.columns] = _iodf[self.columns] N += _df.shape[0] # #@TODO: # Improve formatting with better post-processing pipeline if 'approximate' in self.info : _df = self.approximate(_df) if 'make_date' in self.info : for name in self.info['make_date'] : # iname = self.info['make_date']['init_field'] iname = self.info['make_date'][name] years = _df[iname] _dates = [self.make_date(year=_year,field=name) for _year in years] if _dates : _df[name] = _dates _schema = self.get_schema() _schema = [{'name':_item.name,'type':_item.field_type} for _item in _schema] _df = self.format(_df,_schema) writer = transport.factory.instance(**_store) writer.write(_df,schema=_schema) # _df.to_csv('foo.csv') self.log(**{'action':'write','input':{'rows':N,'candidates':len(_candidates)}}) class Shuffle(Generator): """ This is a method that will yield data with low utility """ def __init__(self,**_args): super().__init__(**_args) def run(self): np.random.seed(1) self.initalize() _index = np.arange(self._df.shape[0]) np.random.shuffle(_index) np.random.shuffle(_index) _iocolumns = self.info['columns'] _ocolumns = list(set(self._df.columns) - set(_iocolumns) ) # _iodf = pd.DataFrame(self._df[_ocolumns],self._df.loc[_index][_iocolumns],index=np.arange(_index.size)) _iodf = pd.DataFrame(self._df[_iocolumns].copy(),index = np.arange(_index.size)) # self._df = self._df.loc[_index][_ocolumns].join(_iodf) self._df = self._df.loc[_index][_ocolumns] self._df.index = np.arange(self._df.shape[0]) self._df = self._df.join(_iodf) # # The following is a full shuffle self._df = self._df.loc[_index] self._df.index = np.arange(self._df.shape[0]) _log = {'action':'io-data','input':{'candidates':1,'rows':int(self._df.shape[0])}} self.log(**_log) try: self.post([self._df]) self.log(**{'action':'completed','input':{'candidates':1,'rows':int(self._df.shape[0])}}) except Exception as e : # print (e) self.log(**{'action':'failed','input':{'msg':e,'info':self.info}}) class factory : _infocache = {} @staticmethod def instance(**_args): """ An instance of an object that trains and generates candidate datasets :param gpu (optional) index of the gpu to be used if using one :param store {source,target} if no target is provided console will be output :param epochs (default 2) number of epochs to train :param candidates(default 1) number of candidates to generate :param info {columns,sql,from} :param autopilot will generate output automatically :param batch (default 2k) size of the batch """ if _args['apply'] == 'shuffle' : return Shuffle(**_args) elif _args['apply'] == 'generate' : return Generator(**_args) else: pthread= Trainer(**_args) if 'start' in _args and _args['start'] == True : pthread.start() return pthread