You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
data-maker/pipeline.py

384 lines
14 KiB
Python

#!/usr/bin/env python3
import json
from transport import factory
import numpy as np
import time
import os
from multiprocessing import Process
import pandas as pd
from google.oauth2 import service_account
import data.maker
from data.params import SYS_ARGS
#
# The configuration array is now loaded and we will execute the pipe line as follows
DATASET='combined20191004v2_deid'
class Components :
@staticmethod
def get(args):
"""
This function returns a data-frame provided a bigquery sql statement with conditions (and limits for testing purposes)
The function must be wrapped around a lambda this makes testing easier and changing data stores transparent to the rest of the code. (Vital when testing)
:sql basic sql statement
:condition optional condition and filters
"""
SQL = args['sql']
if 'condition' in args :
condition = ' '.join([args['condition']['field'],args['condition']['qualifier'],'(',args['condition']['value'],')'])
SQL = " ".join([SQL,'WHERE',condition])
SQL = SQL.replace(':dataset',args['dataset']) #+ " LI "
if 'limit' in args :
SQL = SQL + ' LIMIT ' + args['limit']
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
df = pd.read_gbq(SQL,credentials=credentials,dialect='standard').astype(object)
return df
# return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna()
@staticmethod
def split(X,MAX_ROWS=3,PART_SIZE=3):
return list(pd.cut( np.arange(X.shape[0]+1),PART_SIZE).categories)
def train(self,**args):
"""
This function will perform training on the basis of a given pointer that reads data
"""
#
# @TODO: we need to log something here about the parameters being passed
# pointer = args['reader'] if 'reader' in args else lambda: Components.get(**args)
df = args['data']
# if df.shape[0] == 0 :
# print ("CAN NOT TRAIN EMPTY DATASET ")
# return
#
# Now we can parse the arguments and submit the entire thing to training
#
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
log_folder = args['logs'] if 'logs' in args else 'logs'
# _args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
# _args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
# _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
# _args['gpu'] = args['gpu'] if 'gpu' in args else 0
# # MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
PART_SIZE = int(args['part_size'])
partition = args['partition']
log_folder = os.sep.join([log_folder,args['context'],str(partition)])
_args = {"batch_size":10000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
if 'batch_size' in args :
_args['batch_size'] = int(args['batch_size'])
#
# We ask the process to assume 1 gpu given the system number of GPU and that these tasks can run in parallel
#
if int(args['num_gpu']) > 1 :
_args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int)
else:
_args['gpu'] = 0
_args['num_gpu'] = 1
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu'])
_args['partition'] = int(partition)
_args['continuous']= args['continuous'] if 'continuous' in args else []
_args['store'] = {'type':'mongo.MongoWriter','args':{'dbname':'aou','doc':args['context']}}
_args['data'] = args['data']
# print (['partition ',partition,df.value_source_concept_id.unique()])
#
# @log :
# Logging information about the training process for this partition (or not)
#
info = {"rows":df.shape[0],"cols":df.shape[1], "partition":int(partition),"logs":_args['logs']}
logger.write({"module":"train","action":"train","input":info})
data.maker.train(**_args)
pass
# @staticmethod
def generate(self,args):
"""
This function will generate data and store it to a given,
"""
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
log_folder = args['logs'] if 'logs' in args else 'logs'
partition = args['partition'] if 'partition' in args else ''
log_folder = os.sep.join([log_folder,args['context'],str(partition)])
_args = {"batch_size":2000,"logs":log_folder,"context":args['context'],"max_epochs":150,"column":args['columns'],"id":"person_id","logger":logger}
_args['max_epochs'] = 150 if 'max_epochs' not in args else int(args['max_epochs'])
# _args['num_gpu'] = int(args['num_gpu']) if 'num_gpu' in args else 1
if 'batch_size' in args :
_args['batch_size'] = int(args['batch_size'])
if int(args['num_gpu']) > 1 :
_args['gpu'] = int(args['gpu']) if int(args['gpu']) < 8 else np.random.choice(np.arange(8)).astype(int)
else:
_args['gpu'] = 0
_args['num_gpu'] = 1
os.environ['CUDA_VISIBLE_DEVICES'] = str(args['gpu'])
_args['no_value']= args['no_value']
# MAX_ROWS = args['max_rows'] if 'max_rows' in args else 0
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
# credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
# _args['data'] = pd.read_gbq(SQL,credentials=credentials,dialect='standard').dropna()
# reader = args['reader']
# df = reader()
df = args['reader']() if 'reader' in args else args['data']
# bounds = Components.split(df,MAX_ROWS,PART_SIZE)
# if partition != '' :
# columns = args['columns']
# df = np.array_split(df[columns].values,PART_SIZE)
# df = pd.DataFrame(df[ int (partition) ],columns = columns)
info = {"parition":int(partition),"gpu":_args["gpu"],"rows":df.shape[0],"cols":df.shape[1],"part_size":PART_SIZE}
logger.write({"module":"generate","action":"partition","input":info})
_args['partition'] = int(partition)
_args['continuous']= args['continuous'] if 'continuous' in args else []
_args['data'] = df
# _args['data'] = reader()
#_args['data'] = _args['data'].astype(object)
# _args['num_gpu'] = 1
_dc = data.maker.generate(**_args)
#
# We need to post the generate the data in order to :
# 1. compare immediately
# 2. synthetic copy
#
cols = _dc.columns.tolist()
data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query)
base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it)
for name in cols :
_args['data'][name] = _dc[name]
info = {"module":"generate","action":"io","input":{"rows":_dc[name].shape[0],"name":name}}
if partition != '' :
info['partition'] = int(partition)
logger.write(info)
# filename = os.sep.join([log_folder,'output',name+'.csv'])
# data_comp[[name]].to_csv(filename,index=False)
#
#-- Let us store all of this into bigquery
prefix = args['notify']+'.'+_args['context']
partition = str(partition)
table = '_'.join([prefix,partition,'io']).replace('__','_')
folder = os.sep.join([args['logs'],args['context'],partition,'output'])
if 'file' in args :
_fname = os.sep.join([folder,table.replace('_io','_full_io.csv')])
_pname = os.sep.join([folder,table])+'.csv'
data_comp.to_csv( _pname,index=False)
_args['data'].to_csv(_fname,index=False)
_id = 'path'
else:
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
_pname = os.sep.join([folder,table+'.csv'])
_fname = table.replace('_io','_full_io')
partial = '.'.join(['io',args['context']+'_partial_io'])
complete= '.'.join(['io',args['context']+'_full_io'])
data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=50000)
data_comp.to_csv(_pname,index=False)
INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append'
_args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=50000)
_id = 'dataset'
info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} }
if partition :
info ['partition'] = int(partition)
logger.write({"module":"generate","action":"write","input":info} )
@staticmethod
def callback(channel,method,header,stream):
if stream.decode('utf8') in ['QUIT','EXIT','END'] :
channel.close()
channel.connection.close()
info = json.loads(stream)
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':SYS_ARGS['context']})
logger.write({'module':'process','action':'read-partition','input':info['input']})
df = pd.DataFrame(info['data'])
args = info['args']
if args['num_gpu'] > 1 :
args['gpu'] = int(info['input']['partition']) if info['input']['partition'] < 8 else np.random.choice(np.arange(8)).astype(int)
else:
args['gpu'] = 0
args['num_gpu'] = 1
# if int(args['num_gpu']) > 1 and args['gpu'] > 0:
# args['gpu'] = args['gpu'] + args['num_gpu'] if args['gpu'] + args['num_gpu'] < 8 else args['gpu'] #-- 8 max gpus
args['reader'] = lambda: df
#
# @TODO: Fix
# There is an inconsistency in column/columns ... fix this shit!
#
channel.close()
channel.connection.close()
args['columns'] = args['column']
(Components()).train(**args)
logger.write({"module":"process","action":"exit","input":info["input"]})
pass
if __name__ == '__main__' :
filename = SYS_ARGS['config'] if 'config' in SYS_ARGS else 'config.json'
f = open (filename)
PIPELINE = json.loads(f.read())
f.close()
index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else 0
args = (PIPELINE[index])
args = dict(args,**SYS_ARGS)
args['logs'] = args['logs'] if 'logs' in args else 'logs'
if 'dataset' not in args :
args['dataset'] = 'combined20191004v2_deid'
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
#
# @TODO:
# Log what was initiated so we have context of this processing ...
#
if 'listen' not in SYS_ARGS :
if 'file' in args :
DATA = pd.read_csv(args['file']) ;
else:
DATA = Components().get(args)
COLUMNS = DATA.columns
DATA = np.array_split(DATA,PART_SIZE)
if 'generate' in SYS_ARGS :
#
# Let us see if we have partitions given the log folder
content = os.listdir( os.sep.join([args['logs'],args['context']]))
generator = Components()
if ''.join(content).isnumeric() :
#
# we have partitions we are working with
jobs = []
# columns = DATA.columns.tolist()
# DATA = np.array_split(DATA,PART_SIZE)
for index in range(0,PART_SIZE) :
if 'focus' in args and int(args['focus']) != index :
#
# This handles failures/recoveries for whatever reason
# If we are only interested in generating data for a given partition
continue
# index = id.index(id)
args['partition'] = index
args['data'] = DATA[index]
if int(args['num_gpu']) > 1 :
args['gpu'] = index
else:
args['gpu']=0
make = lambda _args: (Components()).generate(_args)
job = Process(target=make,args=(args,))
job.name = 'generator # '+str(index)
job.start()
jobs.append(job)
print (["Started ",len(jobs),"generators" if len(jobs)>1 else "generator" ])
while len(jobs)> 0 :
jobs = [job for job in jobs if job.is_alive()]
time.sleep(2)
# generator.generate(args)
else:
generator.generate(args)
# Components.generate(args)
elif 'listen' in args :
#
# This will start a worker just in case to listen to a queue
SYS_ARGS = dict(args) #-- things get lost in context
if 'read' in SYS_ARGS :
QUEUE_TYPE = 'queue.QueueReader'
pointer = lambda qreader: qreader.read()
else:
QUEUE_TYPE = 'queue.QueueListener'
pointer = lambda qlistener: qlistener.listen()
N = int(SYS_ARGS['jobs']) if 'jobs' in SYS_ARGS else 1
qhandlers = [factory.instance(type=QUEUE_TYPE,args={'queue':'aou.io'}) for i in np.arange(N)]
jobs = []
for qhandler in qhandlers :
qhandler.callback = Components.callback
job = Process(target=pointer,args=(qhandler,))
job.start()
jobs.append(job)
#
# let us wait for the jobs
print (["Started ",len(jobs)," trainers"])
while len(jobs) > 0 :
jobs = [job for job in jobs if job.is_alive()]
time.sleep(2)
# pointer(qhandler)
# qreader.read(1)
pass
else:
# DATA = np.array_split(DATA,PART_SIZE)
jobs = []
for index in range(0,PART_SIZE) :
if 'focus' in args and int(args['focus']) != index :
continue
args['part_size'] = PART_SIZE
args['partition'] = index
# _df = pd.DataFrame(DATA[index],columns=args['columns'])
args['data'] = DATA[index]
args['data'].to_csv('aou-'+str(index)+'csv',index=False)
# args['reader'] = lambda: _df
if int(args['num_gpu']) > 1 :
args['gpu'] = index
else:
args['gpu']=0
make = lambda _args: (Components()).train(**_args)
job = Process(target=make,args=( dict(args),))
job.name = 'Trainer # ' + str(index)
job.start()
jobs.append(job)
# args['gpu']
print (["Started ",len(jobs),"trainers" if len(jobs)>1 else "trainer" ])
while len(jobs)> 0 :
jobs = [job for job in jobs if job.is_alive()]
time.sleep(2)
# trainer = Components()
# trainer.train(**args)
# Components.train(**args)
#for args in PIPELINE :
#args['dataset'] = 'combined20190510'
#process = Process(target=Components.train,args=(args,))
#process.name = args['context']
#process.start()
# Components.train(args)