@ -21,181 +21,6 @@ import json
from multiprocessing import Process , RLock
from datetime import datetime , timedelta
class ContinuousToDiscrete :
ROUND_UP = 2
@staticmethod
def binary ( X , n = 4 ) :
"""
This function will convert a continous stream of information into a variety a bit stream of bins
"""
values = np . array ( X ) . astype ( np . float32 )
BOUNDS = ContinuousToDiscrete . bounds ( values , n )
matrix = np . repeat ( np . zeros ( n ) , len ( X ) ) . reshape ( len ( X ) , n )
@staticmethod
def bounds ( x , n ) :
# return np.array_split(x,n)
values = np . round ( x , ContinuousToDiscrete . ROUND_UP )
return list ( pd . cut ( values , n ) . categories )
@staticmethod
def continuous ( X , BIN_SIZE = 4 ) :
"""
This function will approximate a binary vector given boundary information
: X binary matrix
: BIN_SIZE
"""
BOUNDS = ContinuousToDiscrete . bounds ( X , BIN_SIZE )
values = [ ]
# _BINARY= ContinuousToDiscrete.binary(X,BIN_SIZE)
# # # print (BOUNDS)
l = { }
for i in np . arange ( len ( X ) ) : #value in X :
value = X [ i ]
for item in BOUNDS :
if value > = item . left and value < = item . right :
values + = [ np . round ( np . random . uniform ( item . left , item . right ) , ContinuousToDiscrete . ROUND_UP ) ]
break
# values += [ np.round(np.random.uniform(item.left,item.right),ContinuousToDiscrete.ROUND_UP) for item in BOUNDS if value >= item.left and value <= item.right ]
# # values = []
# for row in _BINARY :
# # ubound = BOUNDS[row.index(1)]
# index = np.where(row == 1)[0][0]
# ubound = BOUNDS[ index ].right
# lbound = BOUNDS[ index ].left
# x_ = np.round(np.random.uniform(lbound,ubound),ContinuousToDiscrete.ROUND_UP).astype(float)
# values.append(x_)
# lbound = ubound
# values = [np.random.uniform() for item in BOUNDS]
return values
def train ( * * _args ) :
"""
: params sql
: params store
"""
_inputhandler = prepare . Input ( * * _args )
values , _matrix = _inputhandler . convert ( )
args = { " real " : _matrix , " context " : _args [ ' context ' ] }
_map = { }
if ' store ' in _args :
#
# This
args [ ' store ' ] = copy . deepcopy ( _args [ ' store ' ] [ ' logs ' ] )
if ' args ' in _args [ ' store ' ] :
args [ ' store ' ] [ ' args ' ] [ ' doc ' ] = _args [ ' context ' ]
else :
args [ ' store ' ] [ ' doc ' ] = _args [ ' context ' ]
logger = transport . factory . instance ( * * args [ ' store ' ] )
args [ ' logger ' ] = logger
for key in _inputhandler . _map :
beg = _inputhandler . _map [ key ] [ ' beg ' ]
end = _inputhandler . _map [ key ] [ ' end ' ]
values = _inputhandler . _map [ key ] [ ' values ' ] . tolist ( )
_map [ key ] = { " beg " : beg , " end " : end , " values " : np . array ( values ) . astype ( str ) . tolist ( ) }
info = { " rows " : _matrix . shape [ 0 ] , " cols " : _matrix . shape [ 1 ] , " map " : _map }
print ( )
# print ([_args['context'],_inputhandler._io])
logger . write ( { " module " : " gan-train " , " action " : " data-prep " , " context " : _args [ ' context ' ] , " input " : _inputhandler . _io } )
args [ ' logs ' ] = _args [ ' logs ' ] if ' logs ' in _args else ' logs '
args [ ' max_epochs ' ] = _args [ ' max_epochs ' ]
args [ ' matrix_size ' ] = _matrix . shape [ 0 ]
args [ ' batch_size ' ] = 2000
if ' partition ' in _args :
args [ ' partition ' ] = _args [ ' partition ' ]
if ' gpu ' in _args :
args [ ' gpu ' ] = _args [ ' gpu ' ]
# os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0'
trainer = gan . Train ( * * args )
#
# @TODO: Write the map.json in the output directory for the logs
#
# f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json']),'w')
f = open ( os . sep . join ( [ trainer . out_dir , ' map.json ' ] ) , ' w ' )
f . write ( json . dumps ( _map ) )
f . close ( )
trainer . apply ( )
pass
def get ( * * args ) :
"""
This function will restore a checkpoint from a persistant storage on to disk
"""
pass
def generate ( * * _args ) :
"""
This function will generate a set of records , before we must load the parameters needed
: param data
: param context
: param logs
"""
_args [ ' logs ' ] = _args [ ' logs ' ] if ' logs ' in _args else ' logs '
partition = _args [ ' partition ' ] if ' partition ' in _args else None
if not partition :
MAP_FOLDER = os . sep . join ( [ _args [ ' logs ' ] , ' output ' , _args [ ' context ' ] ] )
# f = open(os.sep.join([_args['logs'],'output',_args['context'],'map.json']))
else :
MAP_FOLDER = os . sep . join ( [ _args [ ' logs ' ] , ' output ' , _args [ ' context ' ] , str ( partition ) ] )
# f = open(os.sep.join([_args['logs'],'output',_args['context'],str(partition),'map.json']))
f = open ( os . sep . join ( [ MAP_FOLDER , ' map.json ' ] ) )
_map = json . loads ( f . read ( ) )
f . close ( )
#
#
# if 'file' in _args :
# df = pd.read_csv(_args['file'])
# else:
# df = _args['data'] if not isinstance(_args['data'],str) else pd.read_csv(_args['data'])
args = { " context " : _args [ ' context ' ] , " max_epochs " : _args [ ' max_epochs ' ] , " candidates " : _args [ ' candidates ' ] }
args [ ' logs ' ] = _args [ ' logs ' ] if ' logs ' in _args else ' logs '
args [ ' max_epochs ' ] = _args [ ' max_epochs ' ]
# args['matrix_size'] = _matrix.shape[0]
args [ ' batch_size ' ] = 2000
args [ ' partition ' ] = 0 if ' partition ' not in _args else _args [ ' partition ' ]
args [ ' row_count ' ] = _args [ ' data ' ] . shape [ 0 ]
#
# @TODO: perhaps get the space of values here ... (not sure it's a good idea)
#
_args [ ' map ' ] = _map
_inputhandler = prepare . Input ( * * _args )
values , _matrix = _inputhandler . convert ( )
args [ ' values ' ] = np . array ( values )
if ' gpu ' in _args :
args [ ' gpu ' ] = _args [ ' gpu ' ]
handler = gan . Predict ( * * args )
lparams = { ' columns ' : None }
if partition :
lparams [ ' partition ' ] = partition
handler . load_meta ( * * lparams )
#
# Let us now format the matrices by reverting them to a data-frame with values
#
candidates = handler . apply ( candidates = args [ ' candidates ' ] )
return [ _inputhandler . revert ( matrix = _matrix ) for _matrix in candidates ]
class Learner ( Process ) :
def __init__ ( self , * * _args ) :
@ -211,7 +36,7 @@ class Learner(Process):
self . info = _args [ ' info ' ]
self . columns = self . info [ ' columns ' ] if ' columns ' in self . info else None
self . store = _args [ ' store ' ]
self . logger = transport . factory . instance ( _args [ ' logger ' ] ) if ' logger ' in self . store else transport . factory . instance ( provider = ' console ' , context = ' write ' , lock = True )
if ' network_args ' not in _args :
self . network_args = {
' context ' : self . info [ ' context ' ] ,
@ -228,12 +53,18 @@ class Learner(Process):
#
# @TODO: allow for verbose mode so we have a sens of what is going on within the newtork
#
if self . logger :
_args = { ' module ' : self . name , ' action ' : ' init ' , ' context ' : self . info [ ' context ' ] , ' gpu ' : ( self . gpu if self . gpu is not None else - 1 ) }
self . logger . write ( _args )
_log = { ' module ' : self . name , ' action ' : ' init ' , ' context ' : self . info [ ' context ' ] , ' gpu ' : ( self . gpu if self . gpu is not None else - 1 ) }
self . log ( * * _log )
# self.logpath= _args['logpath'] if 'logpath' in _args else 'logs'
# sel.max_epoc
def log ( self , * * _args ) :
logger = transport . factory . instance ( * * self . store [ ' logger ' ] ) if ' logger ' in self . store else transport . factory . instance ( provider = ' console ' , context = ' write ' , lock = True )
logger . write ( _args )
if hasattr ( logger , ' close ' ) :
logger . close ( )
def get_schema ( self ) :
if self . store [ ' source ' ] [ ' provider ' ] != ' bigquery ' :
return [ { ' name ' : self . _df . dtypes . index . tolist ( ) [ i ] , ' type ' : self . _df . dtypes . astype ( str ) . tolist ( ) [ i ] } for i in range ( self . _df . dtypes . shape [ 0 ] ) ]
@ -253,9 +84,9 @@ class Learner(Process):
if self . _map :
_args [ ' map ' ] = self . _map
self . _encoder = prepare . Input ( * * _args ) if self . _df . shape [ 0 ] > 0 else None
if self . logger :
_args = { ' module ' : self . name , ' action ' : ' data-prep ' , ' input ' : { ' rows ' : self . _df . shape [ 0 ] , ' cols ' : self . _df . shape [ 1 ] } }
self . logger . write ( _args )
_log = { ' module ' : self . name , ' action ' : ' data-prep ' , ' input ' : { ' rows ' : self . _df . shape [ 0 ] , ' cols ' : self . _df . shape [ 1 ] } }
self . log ( * * _log )
class Trainer ( Learner ) :
"""
This will perform training using a GAN
@ -301,10 +132,10 @@ class Trainer(Learner):
_args [ ' gpu ' ] = self . gpu
g = Generator ( * * _args )
# g.run()
if self . logger :
end = datetime . now ( ) . strftime ( ' % Y- % m- %d % H: % M: % S ' )
logs = { ' module ' : self . name , ' action ' : ' train ' , ' input ' : { ' start ' : beg , ' end ' : end } }
self . log ger. write ( logs)
end = datetime . now ( ) . strftime ( ' % Y- % m- %d % H: % M: % S ' )
_ logs = { ' module ' : self . name , ' action ' : ' train ' , ' input ' : { ' start ' : beg , ' end ' : end } }
self . log ( * * _ logs)
self . generate = g
if self . autopilot :
self . generate . run ( )
@ -347,10 +178,10 @@ class Generator (Learner):
gHandler . load_meta ( columns = None )
_iomatrix = gHandler . apply ( )
_candidates = [ self . _encoder . revert ( matrix = _item ) for _item in _iomatrix ]
if self . logger :
_size = np . sum ( [ len ( _item ) for _item in _iomatrix ] )
_log = { ' module ' : self . name , ' action ' : ' io-data ' , ' input ' : { ' candidates ' : len ( _candidates ) , ' rows ' : _size } }
self . logger . write ( _log )
_size = np . sum ( [ len ( _item ) for _item in _iomatrix ] )
_log = { ' module ' : self . name , ' action ' : ' io-data ' , ' input ' : { ' candidates ' : len ( _candidates ) , ' rows ' : int ( _size ) } }
self . log ( * * _log )
self . post ( _candidates )
def approximate ( self , _df ) :
_columns = self . info [ ' approximate ' ]
@ -373,10 +204,10 @@ class Generator (Learner):
values [ index ] = values [ index ] . astype ( _type )
x + = values . tolist ( )
if x :
_log [ ' input ' ] [ ' diff ' ] = 1 - np . divide ( ( _df [ name ] . dropna ( ) == x ) . sum ( ) , _df [ name ] . dropna ( ) . size )
_log [ ' input ' ] [ ' diff _pct ' ] = 100 * ( 1 - np . divide ( ( _df [ name ] . dropna ( ) == x ) . sum ( ) , _df [ name ] . dropna ( ) . size ) )
_df [ name ] = x #np.array(x,dtype=np.int64) if 'int' in _type else np.arry(x,dtype=np.float64)
if self . logger :
self . logger . write ( _log )
self . log ( * * _log )
return _df
def make_date ( self , * * _args ) :
"""
@ -446,8 +277,8 @@ class Generator (Learner):
_schema = [ { ' name ' : _item . name , ' type ' : _item . field_type } for _item in _schema ]
writer . write ( _df , schema = _schema )
if self . logger :
self . logger . write ( { ' module ' : self . name , ' action ' : ' write ' , ' input ' : { ' rows ' : N , ' candidates ' : len ( _candidates ) } } )
self . log ( * * { ' module ' : self . name , ' action ' : ' write ' , ' input ' : { ' rows ' : N , ' candidates ' : len ( _candidates ) } } )
class factory :
_infocache = { }
@staticmethod