bug fix: process causing error when writing to bigquery

dev
Steve L. Nyemba 5 years ago
parent 205adf8fa6
commit e8906d1646

@ -4,7 +4,7 @@ from transport import factory
import numpy as np import numpy as np
import time import time
import os import os
from multiprocessing import Process from multiprocessing import Process, Lock
import pandas as pd import pandas as pd
from google.oauth2 import service_account from google.oauth2 import service_account
import data.maker import data.maker
@ -16,9 +16,11 @@ from data.params import SYS_ARGS
DATASET='combined20191004v2_deid' DATASET='combined20191004v2_deid'
class Components : class Components :
lock = Lock()
class KEYS : class KEYS :
PIPELINE_KEY = 'pipeline' PIPELINE_KEY = 'pipeline'
SQL_FILTER = 'filter' SQL_FILTER = 'filter'
@staticmethod @staticmethod
def get_logger(**args) : def get_logger(**args) :
return factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) return factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
@ -232,10 +234,12 @@ class Components :
if 'dump' in args : if 'dump' in args :
print (_args['data'].head()) print (_args['data'].head())
else: else:
Components.lock.acquire()
data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append' INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append'
_args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000) _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000)
Components.lock.release()
_id = 'dataset' _id = 'dataset'
info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} } info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} }
if partition : if partition :
@ -327,8 +331,8 @@ if __name__ == '__main__' :
job.name = 'generator # '+str(index) job.name = 'generator # '+str(index)
job.start() job.start()
jobs.append(job) jobs.append(job)
if len(jobs) == 1 : # if len(jobs) == 1 :
job.join() # job.join()
print (["Started ",len(jobs),"generators" if len(jobs)>1 else "generator" ]) print (["Started ",len(jobs),"generators" if len(jobs)>1 else "generator" ])
while len(jobs)> 0 : while len(jobs)> 0 :

Loading…
Cancel
Save