bug fix: encoding/decoding to improve correlations between attributes

dev
Steve Nyemba 2 years ago
parent 899db5c036
commit 322b21aaac

@ -13,13 +13,17 @@ import numpy as np
import data.gan as gan import data.gan as gan
import transport import transport
# from data.bridge import Binary # from data.bridge import Binary
import threading as thread import threading
from data.maker import prepare from data.maker import prepare
import copy import copy
import os import os
import json import nujson as json
from multiprocessing import Process, RLock from multiprocessing import Process, RLock
from datetime import datetime, timedelta from datetime import datetime, timedelta
from multiprocessing import Queue
import time
class Learner(Process): class Learner(Process):
@ -28,6 +32,7 @@ class Learner(Process):
super(Learner, self).__init__() super(Learner, self).__init__()
self.ndx = 0 self.ndx = 0
self._queue = Queue()
self.lock = RLock() self.lock = RLock()
if 'gpu' in _args : if 'gpu' in _args :
@ -61,34 +66,38 @@ class Learner(Process):
_log = {'action':'init','gpu':(self.gpu if self.gpu is not None else -1)} _log = {'action':'init','gpu':(self.gpu if self.gpu is not None else -1)}
self.log(**_log) self.log(**_log)
self.cache = []
# self.logpath= _args['logpath'] if 'logpath' in _args else 'logs' # self.logpath= _args['logpath'] if 'logpath' in _args else 'logs'
# sel.max_epoc # sel.max_epoc
def log(self,**_args): def log(self,**_args):
# self.lock.acquire()
try: try:
_context = self.info['context'] # _context = self.info['context']
_label = self.info['info'] if 'info' in self.info else _context # _label = self.info['info'] if 'info' in self.info else _context
logger = transport.factory.instance(**self.store['logger']) if 'logger' in self.store else transport.factory.instance(provider='console',context='write',lock=True) # logger = transport.factory.instance(**self.store['logger']) if 'logger' in self.store else transport.factory.instance(provider=transport.providers.CONSOLE,context='write',lock=True)
_args = dict({'ndx':self.ndx,'module':self.name,'table':self.info['from'],'context':_context,'info':_label,**_args}) # _args = dict({'ndx':self.ndx,'module':self.name,'table':self.info['from'],'context':_context,'info':_label,**_args})
logger.write(_args) # logger.write(_args)
self.ndx += 1 # self.ndx += 1
if hasattr(logger,'close') : # if hasattr(logger,'close') :
logger.close() # logger.close()
pass
except Exception as e: except Exception as e:
print () print ()
print (_args) print (_args)
print (e) print (e)
pass pass
finally: finally:
# self.lock.release()
pass pass
def get_schema(self): def get_schema(self):
if self.store['source']['provider'] != 'bigquery' : # if self.store['source']['provider'] != 'bigquery' :
return [] #{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])] # return [] #{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])]
else: # else:
reader = transport.factory.instance(**self.store['source']) # reader = transport.factory.instance(**self.store['source'])
return reader.meta(table=self.info['from']) # return reader.meta(table=self.info['from'])
reader = transport.factory.instance(**self.store['source'])
return reader.meta(table=self.info['from'])
def initalize(self): def initalize(self):
reader = transport.factory.instance(**self.store['source']) reader = transport.factory.instance(**self.store['source'])
_read_args= self.info _read_args= self.info
@ -124,6 +133,25 @@ class Learner(Process):
self._encoder = prepare.Input(**_args) if self._df.shape[0] > 0 else None self._encoder = prepare.Input(**_args) if self._df.shape[0] > 0 else None
_log = {'action':'data-prep','input':{'rows':int(self._df.shape[0]),'cols':int(self._df.shape[1]) } } _log = {'action':'data-prep','input':{'rows':int(self._df.shape[0]),'cols':int(self._df.shape[1]) } }
self.log(**_log) self.log(**_log)
def get(self):
if self.cache :
return self.cache if len(self.cache) > 0 else(self.cache if not self.cache else self.cache[0])
else:
return self._queue.get() if self._queue.qsize() > 0 else []
def listen(self):
while True :
_info = self._queue.get()
self.cache.append(_info)
self._queue.task_done()
def publish(self,caller):
if hasattr(caller,'_queue') :
_queue = caller._queue
_queue.put(self.cache)
# _queue.join()
pass
class Trainer(Learner): class Trainer(Learner):
""" """
This will perform training using a GAN This will perform training using a GAN
@ -157,7 +185,8 @@ class Trainer(Learner):
gTrain = gan.Train(**_args) gTrain = gan.Train(**_args)
gTrain.apply() gTrain.apply()
writer = transport.factory.instance(provider='file',context='write',path=os.sep.join([gTrain.out_dir,'map.json'])) writer = transport.factory.instance(provider=transport.providers.FILE,context='write',path=os.sep.join([gTrain.out_dir,'map.json']))
writer.write(self._encoder._map,overwrite=True) writer.write(self._encoder._map,overwrite=True)
writer.close() writer.close()
@ -174,9 +203,14 @@ class Trainer(Learner):
_min = float((end-beg).seconds/ 60) _min = float((end-beg).seconds/ 60)
_logs = {'action':'train','input':{'start':beg.strftime('%Y-%m-%d %H:%M:%S'),'minutes':_min,"unique_counts":self._encoder._io[0]}} _logs = {'action':'train','input':{'start':beg.strftime('%Y-%m-%d %H:%M:%S'),'minutes':_min,"unique_counts":self._encoder._io[0]}}
self.log(**_logs) self.log(**_logs)
self.generate = g self._g = g
if self.autopilot : if self.autopilot :
self.generate.run() self._g.run()
#
#@TODO Find a way to have the data in the object ....
def generate (self): def generate (self):
if self.autopilot : if self.autopilot :
print( "Autopilot is set ... No need to call this function") print( "Autopilot is set ... No need to call this function")
@ -224,6 +258,7 @@ class Generator (Learner):
_size = np.sum([len(_item) for _item in _iomatrix]) _size = np.sum([len(_item) for _item in _iomatrix])
_log = {'action':'io-data','input':{'candidates':len(_candidates),'rows':int(_size)}} _log = {'action':'io-data','input':{'candidates':len(_candidates),'rows':int(_size)}}
self.log(**_log) self.log(**_log)
# self.cache = _candidates
self.post(_candidates) self.post(_candidates)
def approximate(self,_df): def approximate(self,_df):
_columns = self.info['approximate'] _columns = self.info['approximate']
@ -359,12 +394,14 @@ class Generator (Learner):
pass pass
def post(self,_candidates): def post(self,_candidates):
_store = self.store['target'] if 'target' in self.store else {'provider':'console'} if 'target' in self.store :
_store['lock'] = True _store = self.store['target'] if 'target' in self.store else {'provider':'console'}
_store['context'] = 'write' #-- Just in case _store['lock'] = True
if 'table' not in _store : _store['context'] = 'write' #-- Just in case
_store['table'] = self.info['from'] if 'table' not in _store :
_store['table'] = self.info['from']
else:
_store = None
N = 0 N = 0
for _iodf in _candidates : for _iodf in _candidates :
_df = self._df.copy() _df = self._df.copy()
@ -397,13 +434,15 @@ class Generator (Learner):
# w.write(_df) # w.write(_df)
# cols = [name for name in _df.columns if name.endswith('datetime')] # cols = [name for name in _df.columns if name.endswith('datetime')]
# print (_df[cols]) # print (_df[cols])
if _store :
writer = transport.factory.instance(**_store) writer = transport.factory.instance(**_store)
if _store['provider'] == 'bigquery': if _store['provider'] == 'bigquery':
writer.write(_df,schema=[],table=self.info['from']) writer.write(_df,schema=[],table=self.info['from'])
else:
writer.write(_df,table=self.info['from'])
else: else:
writer.write(_df,table=self.info['from']) self.cache.append(_df)
@ -444,6 +483,8 @@ class Shuffle(Generator):
except Exception as e : except Exception as e :
# print (e) # print (e)
self.log(**{'action':'failed','input':{'msg':e,'info':self.info}}) self.log(**{'action':'failed','input':{'msg':e,'info':self.info}})
class apply :
TRAIN,GENERATE,RANDOM = 'train','generate','random'
class factory : class factory :
_infocache = {} _infocache = {}
@staticmethod @staticmethod
@ -459,12 +500,12 @@ class factory :
:param batch (default 2k) size of the batch :param batch (default 2k) size of the batch
""" """
if _args['apply'] == 'shuffle' : if _args['apply'] in [apply.RANDOM] :
return Shuffle(**_args) pthread = Shuffle(**_args)
elif _args['apply'] == 'generate' : elif _args['apply'] == apply.GENERATE :
return Generator(**_args) pthread = Generator(**_args)
else: else:
pthread= Trainer(**_args) pthread= Trainer(**_args)
if 'start' in _args and _args['start'] == True : if 'start' in _args and _args['start'] == True :
pthread.start() pthread.start()
return pthread return pthread

@ -47,6 +47,15 @@ class Input :
:param sql sql query that pulls a representative sample of the data :param sql sql query that pulls a representative sample of the data
""" """
self._schema = _args['schema'] if 'schema' in _args else {} self._schema = _args['schema'] if 'schema' in _args else {}
#
# schema data should be in a hash map for these purposes
#
if self._schema :
r = {}
for _item in self._schema :
r[_item['name']] = r[_item['type']]
self._schema = r
self.df = _args['data'] self.df = _args['data']
if 'sql' not in _args : if 'sql' not in _args :
self._initdata(**_args) self._initdata(**_args)
@ -60,6 +69,7 @@ class Input :
# #
self._map = {} if 'map' not in _args else _args['map'] self._map = {} if 'map' not in _args else _args['map']
def _initsql(self,**_args): def _initsql(self,**_args):
""" """
This function will initialize the class on the basis of a data-store and optionally pre-defined columns to be used to be synthesized This function will initialize the class on the basis of a data-store and optionally pre-defined columns to be used to be synthesized
@ -73,6 +83,10 @@ class Input :
self._initcols(data=self.df,columns=_args['columns']) self._initcols(data=self.df,columns=_args['columns'])
pass pass
def _init_map(self,values):
self._map = dict(zip(np.arange(len(values)),values))
for key in self._map :
self._map[key] = self._map[key].tolist()
def _initcols (self,**_args) : def _initcols (self,**_args) :
""" """
This function will initialize the columns to be synthesized and/or determine which ones can be synthesized This function will initialize the columns to be synthesized and/or determine which ones can be synthesized
@ -109,7 +123,7 @@ class Input :
""" """
self._initcols(**_args) self._initcols(**_args)
def convert(self,**_args): def _convert(self,**_args):
""" """
This function will convert a data-frame into a binary matrix and provide a map to be able to map the values back to the matrix This function will convert a data-frame into a binary matrix and provide a map to be able to map the values back to the matrix
:param columns in case we specify the columns to account for (just in case the original assumptions don't hold) :param columns in case we specify the columns to account for (just in case the original assumptions don't hold)
@ -150,7 +164,7 @@ class Input :
return _values,_m return _values,_m
def revert(self,**_args) : def _revert(self,**_args) :
""" """
This function will take in a binary matrix and based on the map of values it will repopulate it with values This function will take in a binary matrix and based on the map of values it will repopulate it with values
:param _matrix binary matrix :param _matrix binary matrix
@ -186,7 +200,9 @@ class Input :
# r[key] = [columns[np.where(row == 1) [0][0] ] for row in _matrix[:,_beg:_end]] # r[key] = [columns[np.where(row == 1) [0][0] ] for row in _matrix[:,_beg:_end]]
r[key] = [columns[np.where(row==1)[0][0]] if np.where(row==1)[0].size > 0 else '' for row in _matrix] r[key] = [columns[np.where(row==1)[0][0]] if np.where(row==1)[0].size > 0 else '' for row in _matrix]
#
# we should consider decoding the matrix if possible
#
return pd.DataFrame(r) return pd.DataFrame(r)
@ -217,4 +233,39 @@ class Input :
return cols,_matrix return cols,_matrix
def convert(self,**_args):
if 'columns' in _args or 'column' in _args :
columns = _args['columns'] if 'columns' in _args else [_args['column']]
else:
columns = self._columns
_df = self.df if 'data' not in _args else _args['data']
_values,_matrix = self.encode(_df,columns)
_, _matrix = self.tobinary(_matrix)
self._init_map(_values)
return _values,_matrix #-- matrix has been updated !
def revert(self,**_args):
# _columns = _args['column'] if 'column' in _args else None
_matrix = _args['matrix']
# print (_matrix)
return self.decode(_matrix,columns=self._columns)
pass
def encode(self,df,columns) :
_df = df[columns].drop_duplicates()
_values = _df.values.tolist()
_encoded = df[columns].apply(lambda row: _values.index( list(row)) ,axis=1)
return np.array(_values),_encoded
def decode (self,_matrix,**_args):
#
# _matrix binary matrix
# _values value space given the columns
# columns name of the columns ...
#
columns = _args['columns']
_values = np.array( list(self._map.values()))
_matrix = pd.DataFrame(_matrix) #if type(_matrix) != pd.DataFrame else _matrix
x = _matrix.apply(lambda row: _values[row.values == 1 ].tolist()[0] if row.values.sum() > 0 else np.repeat(None,row.size), axis=1).tolist()
return pd.DataFrame(x,columns=columns)

Loading…
Cancel
Save