You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
data-maker/pipeline.py

311 lines
12 KiB
Python

#!/usr/bin/env python3
5 years ago
df = pd.read_gbq(SQL,credentials=credentials,dialect='standard')
for b in bounds :
part_index = bounds.index(b)
ubound = int(b.right)
_data = df.iloc[lbound:ubound][args['columns']]
lbound = ubound
# _args['logs'] = os.sep.join([log_folder,str(part_index)])
_args['partition'] = str(part_index)
_args['logger'] = {'args':{'dbname':'aou','doc':args['context']},'type':'mongo.MongoWriter'}
pass
base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it)
for name in cols :
_args['data'][name] = _dc[name]
info = {"module":"generate","action":"io","input":{"rows":_dc[name].shape[0],"name":name}}
if partition != '' :
info['partition'] = partition
logger.write(info)
# filename = os.sep.join([log_folder,'output',name+'.csv'])
# data_comp[[name]].to_csv(filename,index=False)
#
#-- Let us store all of this into bigquery
prefix = args['notify']+'.'+_args['context']
table = '_'.join([prefix,partition,'io']).replace('__','_')
folder = os.sep.join([args['logs'],args['context'],partition,'output'])
if 'file' in args :
_fname = os.sep.join([folder,table.replace('_io','_full_io.csv')])
_pname = os.sep.join([folder,table])+'.csv'
data_comp.to_csv( _pname,index=False)
_args['data'].to_csv(_fname,index=False)
else:
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
_pname = os.sep.join([folder,table+'.csv'])
_fname = table.replace('_io','_full_io')
data_comp.to_gbq(if_exists='replace',destination_table=_pname,credentials='credentials',chunk_size=50000)
data_comp.to_csv(_pname,index=False)
INSERT_FLAG = 'replace' if 'partition' not in args else 'append'
_args['data'].to_gbq(if_exists=INSERT_FLAG,destination_table=_fname,credentials='credentials',chunk_size=50000)
info = {"full":{"path":_fname,"rows":_args['data'].shape[0]},"compare":{"name":_pname,"rows":data_comp.shape[0]} }
if partition :
info ['partition'] = partition
logger.write({"module":"generate","action":"write","info":info} )
@staticmethod
def callback(channel,method,header,stream):
info = json.loads(stream)
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':SYS_ARGS['context']})
logger.write({'module':'process','action':'read-partition','input':info['info']})
df = pd.DataFrame(info['data'])
args = info['args']
args['reader'] = reader
# Components.train(**args)