|
|
|
#!/usr/bin/env python3
|
|
|
|
# bounds = Components.split(df,MAX_ROWS,PART_SIZE)
|
|
|
|
# Logging information about the training process for this partition (or not)
|
|
|
|
#
|
|
|
|
info = {"rows":df.shape[0],"cols":df.shape[1], "partition":partition,"logs":_args['logs']}
|
|
|
|
logger.write({"module":"train","action":"train","input":info})
|
|
|
|
data.maker.train(**_args)
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
# @staticmethod
|
|
|
|
def generate(self,args):
|
|
|
|
"""
|
|
|
|
This function will generate data and store it to a given,
|
|
|
|
"""
|
|
|
|
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
|
|
|
log_folder = args['logs'] if 'logs' in args else 'logs'
|
|
|
|
partition = args['partition'] if 'partition' in args else ''
|
|
|
|
log_folder = os.sep.join([log_folder,args['context'],partition])
|
|
|
|
_args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
|
|
|
|
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
|
|
|
|
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
|
|
|
|
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu']) if 'gpu' in args else '0'
|
|
|
|
_args['no_value']= args['no_value']
|
|
|
|
MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
|
|
|
|
PART_SIZE = args['part_size'] if 'part_size' in args else 0
|
|
|
|
|
|
|
|
# credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
|
|
|
|
# _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna()
|
|
|
|
reader = args['reader']
|
|
|
|
df = reader()
|
|
|
|
if 'partition' in args :
|
|
|
|
bounds = Components.split(df,MAX_ROWS,PART_SIZE)
|
|
|
|
# bounds = list(pd.cut( np.arange(df.shape[0]+1),PART_SIZE).categories)
|
|
|
|
lbound = int(bounds[int(partition)].left)
|
|
|
|
ubound = int(bounds[int(partition)].right)
|
|
|
|
df = df.iloc[lbound:ubound]
|
|
|
|
_args['data'] = df
|
|
|
|
# _args['data'] = reader()
|
|
|
|
#_args['data'] = _args['data'].astype(object)
|
|
|
|
_args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
|
|
|
|
_dc = data.maker.generate(**_args)
|
|
|
|
#
|
|
|
|
# We need to post the generate the data in order to :
|
|
|
|
# 1. compare immediately
|
|
|
|
# 2. synthetic copy
|
|
|
|
#
|
|
|
|
|
|
|
|
cols = _dc.columns.tolist()
|
|
|
|
|
|
|
|
data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query)
|
|
|
|
base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it)
|
|
|
|
|
|
|
|
for name in cols :
|
|
|
|
_args['data'][name] = _dc[name]
|
|
|
|
info = {"module":"generate","action":"io","input":{"rows":_dc[name].shape[0],"name":name}}
|
|
|
|
if partition != '' :
|
|
|
|
info['partition'] = partition
|
|
|
|
logger.write(info)
|
|
|
|
# filename = os.sep.join([log_folder,'output',name+'.csv'])
|
|
|
|
# data_comp[[name]].to_csv(filename,index=False)
|
|
|
|
|
|
|
|
#
|
|
|
|
#-- Let us store all of this into bigquery
|
|
|
|
prefix = args['notify']+'.'+_args['context']
|
|
|
|
table = '_'.join([prefix,partition,'io']).replace('__','_')
|
|
|
|
folder = os.sep.join([args['logs'],args['context'],partition,'output'])
|
|
|
|
if 'file' in args :
|
|
|
|
|
|
|
|
_fname = os.sep.join([folder,table.replace('_io','_full_io.csv')])
|
|
|
|
_pname = os.sep.join([folder,table])+'.csv'
|
|
|
|
data_comp.to_csv( _pname,index=False)
|
|
|
|
_args['data'].to_csv(_fname,index=False)
|
|
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
|
|
|
|
_pname = os.sep.join([folder,table+'.csv'])
|
|
|
|
_fname = table.replace('_io','_full_io')
|
|
|
|
data_comp.to_gbq(if_exists='replace',destination_table=_pname,credentials='credentials',chunk_size=50000)
|
|
|
|
data_comp.to_csv(_pname,index=False)
|
|
|
|
INSERT_FLAG = 'replace' if 'partition' not in args else 'append'
|
|
|
|
_args['data'].to_gbq(if_exists=INSERT_FLAG,destination_table=_fname,credentials='credentials',chunk_size=50000)
|
|
|
|
|
|
|
|
info = {"full":{"path":_fname,"rows":_args['data'].shape[0]},"compare":{"name":_pname,"rows":data_comp.shape[0]} }
|
|
|
|
if partition :
|
|
|
|
info ['partition'] = partition
|
|
|
|
logger.write({"module":"generate","action":"write","info":info} )
|
|
|
|
@staticmethod
|
|
|
|
def callback(channel,method,header,stream):
|
|
|
|
|
|
|
|
info = json.loads(stream)
|
|
|
|
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':SYS_ARGS['context']})
|
|
|
|
|
|
|
|
logger.write({'module':'process','action':'read-partition','input':info['info']})
|
|
|
|
df = pd.DataFrame(info['data'])
|
|
|
|
args = info['args']
|
|
|
|
# Log what was initiated so we have context of this processing ...
|
|
|
|
while len(jobs) > 0 :
|
|
|
|
|
|
|
|
jobs = [job for job in jobs if job.is_alive()]
|
|
|
|
|
|
|
|
# pointer(qhandler)
|
|
|
|
|