@ -16,7 +16,12 @@ from data.params import SYS_ARGS
DATASET = ' combined20191004v2_deid '
class Components :
class KEYS :
PIPELINE_KEY = ' pipeline '
SQL_FILTER = ' filter '
@staticmethod
def get_logger ( * * args ) :
return factory . instance ( type = ' mongo.MongoWriter ' , args = { ' dbname ' : ' aou ' , ' doc ' : args [ ' context ' ] } )
@staticmethod
def get ( args ) :
"""
@ -26,15 +31,19 @@ class Components :
: condition optional condition and filters
"""
SQL = args [ ' sql ' ]
if ' condition ' in args :
condition = ' ' . join ( [ args [ ' condition ' ] [ ' field ' ] , args [ ' condition ' ] [ ' qualifier ' ] , ' ( ' , args [ ' condition ' ] [ ' value ' ] , ' ) ' ] )
if Components . KEYS . SQL_FILTER in args :
SQL_FILTER = Components . KEYS . SQL_FILTER
condition = ' ' . join ( [ args [ SQL_FILTER ] [ ' field ' ] , args [ SQL_FILTER ] [ ' qualifier ' ] , ' ( ' , args [ SQL_FILTER ] [ ' value ' ] , ' ) ' ] )
SQL = " " . join ( [ SQL , ' WHERE ' , condition ] )
SQL = SQL . replace ( ' :dataset ' , args [ ' dataset ' ] ) #+ " LI "
if ' limit ' in args :
SQL = SQL + ' LIMIT ' + args [ ' limit ' ]
#
# let's log the sql query that has been performed here
logger = factory . instance ( type = ' mongo.MongoWriter ' , args = { ' dbname ' : ' aou ' , ' doc ' : args [ ' context ' ] } )
logger . write ( { " module " : " bigquery " , " action " : " read " , " input " : { " sql " : SQL } } )
credentials = service_account . Credentials . from_service_account_file ( ' /home/steve/dev/aou/accounts/curation-prod.json ' )
df = pd . read_gbq ( SQL , credentials = credentials , dialect = ' standard ' ) . astype ( object )
return df
@ -131,6 +140,7 @@ class Components :
_args [ ' num_gpu ' ] = 1
os . environ [ ' CUDA_VISIBLE_DEVICES ' ] = str ( args [ ' gpu ' ] )
_args [ ' no_value ' ] = args [ ' no_value ' ]
# MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
PART_SIZE = int ( args [ ' part_size ' ] ) if ' part_size ' in args else 8
@ -166,19 +176,27 @@ class Components :
#
# performing basic analytics on the synthetic data generated (easy to quickly asses)
#
info = { " module " : " generate " , " action " : " io-stats " , " input " : { " rows " : data_comp . shape [ 0 ] , " partition " : partition , " logs " : [ ] } }
logs = [ ]
for name in data_comp . columns . tolist ( ) :
g = pd . DataFrame ( data_comp . groupby ( [ name ] ) . size ( ) )
g . columns = [ ' counts ' ]
g [ name ] = g . index . tolist ( )
g . index = np . arange ( g . shape [ 0 ] )
logs . append ( { " name " : name , " counts " : g . to_dict ( orient = ' records ' ) } )
info [ ' input ' ] [ ' logs ' ] = logs
info = { " module " : " generate " , " action " : " io.metrics " , " input " : { " rows " : data_comp . shape [ 0 ] , " partition " : partition , " logs " : [ ] } }
x = { }
for name in args [ ' columns ' ] :
ident = data_comp . apply ( lambda row : 1 * ( row [ name ] == row [ name + ' _io ' ] ) , axis = 1 ) . sum ( )
count = data_comp [ name ] . unique ( ) . size
_ident = data_comp . shape [ 1 ] - ident
_count = data_comp [ name + ' _io ' ] . unique ( ) . size
info [ ' input ' ] [ ' logs ' ] + = [ { " name " : name , " identical " : int ( ident ) , " no_identical " : int ( _ident ) , " original_count " : count , " synthetic_count " : _count } ]
# for name in data_comp.columns.tolist() :
# g = pd.DataFrame(data_comp.groupby([name]).size())
# g.columns = ['counts']
# g[name] = g.index.tolist()
# g.index = np.arange(g.shape[0])
# logs.append({"name":name,"counts": g.to_dict(orient='records')})
# info['input']['logs'] = logs
logger . write ( info )
base_cols = list ( set ( _args [ ' data ' ] . columns ) - set ( args [ ' columns ' ] ) ) #-- rebuilt the dataset (and store it)
cols = _dc . columns . tolist ( )
for name in cols :
_args [ ' data ' ] [ name ] = _dc [ name ]
info = { " module " : " generate " , " action " : " io " , " input " : { " rows " : _dc [ name ] . shape [ 0 ] , " name " : name } }
@ -223,43 +241,14 @@ class Components :
info [ ' partition ' ] = int ( partition )
logger . write ( { " module " : " generate " , " action " : " write " , " input " : info } )
@staticmethod
def callback ( channel , method , header , stream ) :
if stream . decode ( ' utf8 ' ) in [ ' QUIT ' , ' EXIT ' , ' END ' ] :
channel . close ( )
channel . connection . close ( )
info = json . loads ( stream )
logger = factory . instance ( type = ' mongo.MongoWriter ' , args = { ' dbname ' : ' aou ' , ' doc ' : SYS_ARGS [ ' context ' ] } )
logger . write ( { ' module ' : ' process ' , ' action ' : ' read-partition ' , ' input ' : info [ ' input ' ] } )
df = pd . DataFrame ( info [ ' data ' ] )
args = info [ ' args ' ]
if args [ ' num_gpu ' ] > 1 :
args [ ' gpu ' ] = int ( info [ ' input ' ] [ ' partition ' ] ) if info [ ' input ' ] [ ' partition ' ] < 8 else np . random . choice ( np . arange ( 8 ) ) . astype ( int )
else :
args [ ' gpu ' ] = 0
args [ ' num_gpu ' ] = 1
# if int(args['num_gpu']) > 1 and args['gpu'] > 0:
# args['gpu'] = args['gpu'] + args['num_gpu'] if args['gpu'] + args['num_gpu'] < 8 else args['gpu'] #-- 8 max gpus
args [ ' reader ' ] = lambda : df
#
# @TODO: Fix
# There is an inconsistency in column/columns ... fix this shit!
#
channel . close ( )
channel . connection . close ( )
args [ ' columns ' ] = args [ ' column ' ]
( Components ( ) ) . train ( * * args )
logger . write ( { " module " : " process " , " action " : " exit " , " input " : info [ " input " ] } )
pass
if __name__ == ' __main__ ' :
filename = SYS_ARGS [ ' config ' ] if ' config ' in SYS_ARGS else ' config.json '
f = open ( filename )
PIPELINE = json . loads ( f . read ( ) )
_config = json . loads ( f . read ( ) )
f . close ( )
PIPELINE = _config [ ' pipeline ' ]
index = SYS_ARGS [ ' index ' ]
if index . isnumeric ( ) :
index = int ( SYS_ARGS [ ' index ' ] )
@ -274,10 +263,17 @@ if __name__ == '__main__' :
# print
print ( " ..::: " , PIPELINE [ index ] [ ' context ' ] )
args = ( PIPELINE [ index ] )
for key in _config :
if key == ' pipeline ' or key in args :
#
# skip in case of pipeline or if key exists in the selected pipeline (provided by index)
#
continue
args [ key ] = _config [ key ]
args = dict ( args , * * SYS_ARGS )
args [ ' logs ' ] = args [ ' logs ' ] if ' logs ' in args else ' logs '
args [ ' batch_size ' ] = 2000 if ' batch_size ' not in args else int ( args [ ' batch_size ' ] )
if ' dataset ' not in args :
args [ ' dataset ' ] = ' combined20191004v2_deid '
@ -340,38 +336,14 @@ if __name__ == '__main__' :
else :
generator . generate ( args )
# Components.generate(args)
elif ' listen ' in args :
elif ' finalize ' in args :
#
# This will start a worker just in case to listen to a queue
SYS_ARGS = dict ( args ) #-- things get lost in context
if ' read ' in SYS_ARGS :
QUEUE_TYPE = ' queue.QueueReader '
pointer = lambda qreader : qreader . read ( )
else :
QUEUE_TYPE = ' queue.QueueListener '
pointer = lambda qlistener : qlistener . listen ( )
N = int ( SYS_ARGS [ ' jobs ' ] ) if ' jobs ' in SYS_ARGS else 1
qhandlers = [ factory . instance ( type = QUEUE_TYPE , args = { ' queue ' : ' aou.io ' } ) for i in np . arange ( N ) ]
jobs = [ ]
for qhandler in qhandlers :
qhandler . callback = Components . callback
job = Process ( target = pointer , args = ( qhandler , ) )
job . start ( )
jobs . append ( job )
# This will finalize a given set of synthetic operations into a table
#
# let us wait for the jobs
print ( [ " Started " , len ( jobs ) , " trainers " ] )
while len ( jobs ) > 0 :
idataset = args [ ' input ' ] if ' input ' in args else ' io ' #-- input dataset
odataset = args [ ' output ' ] #-- output dataset
labels = [ name . strip ( ) for name in args [ ' labels ' ] . split ( ' , ' ) ]
jobs = [ job for job in jobs if job . is_alive ( ) ]
time . sleep ( 2 )
# pointer(qhandler)
# qreader.read(1)
pass
else :
# DATA = np.array_split(DATA,PART_SIZE)