From e8906d1646720294e08a8524587def89c36ce375 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Fri, 27 Mar 2020 00:34:05 -0500 Subject: [PATCH] bug fix: process causing error when writing to bigquery --- pipeline.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pipeline.py b/pipeline.py index 7620afd..6e847fb 100644 --- a/pipeline.py +++ b/pipeline.py @@ -4,7 +4,7 @@ from transport import factory import numpy as np import time import os -from multiprocessing import Process +from multiprocessing import Process, Lock import pandas as pd from google.oauth2 import service_account import data.maker @@ -16,9 +16,11 @@ from data.params import SYS_ARGS DATASET='combined20191004v2_deid' class Components : + lock = Lock() class KEYS : PIPELINE_KEY = 'pipeline' SQL_FILTER = 'filter' + @staticmethod def get_logger(**args) : return factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']}) @@ -232,10 +234,12 @@ class Components : if 'dump' in args : print (_args['data'].head()) else: + Components.lock.acquire() data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) INSERT_FLAG = 'replace' if 'partition' not in args or 'segment' not in args else 'append' _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000) + Components.lock.release() _id = 'dataset' info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} } if partition : @@ -327,8 +331,8 @@ if __name__ == '__main__' : job.name = 'generator # '+str(index) job.start() jobs.append(job) - if len(jobs) == 1 : - job.join() + # if len(jobs) == 1 : + # job.join() print (["Started ",len(jobs),"generators" if len(jobs)>1 else "generator" ]) while len(jobs)> 0 :