@ -28,6 +28,7 @@ class Learner(Process):
super ( Learner , self ) . __init__ ( )
self . ndx = 0
self . lock = RLock ( )
if ' gpu ' in _args :
os . environ [ ' CUDA_VISIBLE_DEVICES ' ] = str ( _args [ ' gpu ' ] )
@ -63,13 +64,21 @@ class Learner(Process):
# self.logpath= _args['logpath'] if 'logpath' in _args else 'logs'
# sel.max_epoc
def log ( self , * * _args ) :
self . lock . acquire ( )
try :
logger = transport . factory . instance ( * * self . store [ ' logger ' ] ) if ' logger ' in self . store else transport . factory . instance ( provider = ' console ' , context = ' write ' , lock = True )
_args = dict ( { ' ndx ' : self . ndx , ' module ' : self . name , ' table ' : self . info [ ' from ' ] , ' info ' : self . info [ ' context ' ] , * * _args } )
logger . write ( _args )
self . ndx + = 1
if hasattr ( logger , ' close ' ) :
logger . close ( )
except Exception as e :
print ( )
print ( _args )
print ( e )
pass
finally :
self . lock . release ( )
def get_schema ( self ) :
if self . store [ ' source ' ] [ ' provider ' ] != ' bigquery ' :
return [ { ' name ' : self . _df . dtypes . index . tolist ( ) [ i ] , ' type ' : self . _df . dtypes . astype ( str ) . tolist ( ) [ i ] } for i in range ( self . _df . dtypes . shape [ 0 ] ) ]
@ -89,8 +98,7 @@ class Learner(Process):
if self . _map :
_args [ ' map ' ] = self . _map
self . _encoder = prepare . Input ( * * _args ) if self . _df . shape [ 0 ] > 0 else None
_log = { ' action ' : ' data-prep ' , ' input ' : { ' rows ' : self . _df . shape [ 0 ] , ' cols ' : self . _df . shape [ 1 ] } }
_log = { ' action ' : ' data-prep ' , ' input ' : { ' rows ' : int ( self . _df . shape [ 0 ] ) , ' cols ' : int ( self . _df . shape [ 1 ] ) } }
self . log ( * * _log )
class Trainer ( Learner ) :
"""
@ -139,7 +147,7 @@ class Trainer(Learner):
# g.run()
end = datetime . now ( ) #.strftime('%Y-%m-%d %H:%M:%S')
_min = float ( timedelta ( end , beg ) . seconds / 60 )
_min = float ( ( end - beg ) . seconds / 60 )
_logs = { ' action ' : ' train ' , ' input ' : { ' start ' : beg . strftime ( ' % Y- % m- %d % H: % M: % S ' ) , ' minutes ' : _min , " unique_counts " : self . _encoder . _io [ 0 ] } }
self . log ( * * _logs )
self . generate = g
@ -293,12 +301,27 @@ class Generator (Learner):
writer . write ( _df , schema = _schema )
self . log ( * * { ' action ' : ' write ' , ' input ' : { ' rows ' : N , ' candidates ' : len ( _candidates ) } } )
class Shuffle ( Traine r) :
class Shuffle ( Generato r) :
"""
This is a method that will yield data with low utility
"""
def __init__ ( self , * * _args ) :
super ( ) . __init__ ( self )
def run ( self ) :
self . initalize ( )
_index = np . arange ( self . _df . shape [ 0 ] )
np . random . shuffle ( _index )
_iocolumns = self . info [ ' columns ' ]
_ocolumns = list ( set ( self . _df . columns ) - set ( _iocolumns ) )
_iodf = pd . DataFrame ( self . _df [ _ocolumns ] , self . _df . loc [ _index ] [ _iocolumns ] , index = np . arange ( self . _df . shape [ 0 ] ) )
self . _df = self . _df [ _ocolumns ] . join ( _iodf )
_log = { ' action ' : ' io-data ' , ' input ' : { ' candidates ' : 1 , ' rows ' : int ( self . _df . shape [ 0 ] ) } }
self . log ( * * _log )
self . post ( [ self . _df ] )
class factory :
_infocache = { }
@staticmethod
@ -313,4 +336,9 @@ class factory :
: param autopilot will generate output automatically
: param batch ( default 2 k ) size of the batch
"""
if ' apply ' not in _args :
return Trainer ( * * _args )
elif _args [ ' apply ' ] == ' shuffe ' :
return Shuffle ( * * _args )
elif _args [ ' apply ' ] == ' generate ' :
return Generator ( * * _args )