From 677239585c4520fbca494ba25d670815a11f768e Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Tue, 2 Apr 2024 16:58:58 -0500 Subject: [PATCH] bug fix, with bigquery write --- transport/cloud/bigquery.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/transport/cloud/bigquery.py b/transport/cloud/bigquery.py index 479c060..ba720af 100644 --- a/transport/cloud/bigquery.py +++ b/transport/cloud/bigquery.py @@ -104,12 +104,12 @@ class Writer (BigQuery): """ try: if self.parallel or 'lock' in _args : - Write.lock.acquire() + Writer.lock.acquire() _args['table'] = self.table if 'table' not in _args else _args['table'] self._write(_data,**_args) finally: if self.parallel: - Write.lock.release() + Writer.lock.release() def submit(self,_sql): """ Write the output of a massive query to a given table, biquery will handle this as a job @@ -144,13 +144,16 @@ class Writer (BigQuery): # Let us insure that the types are somewhat compatible ... # _map = {'INTEGER':np.int64,'DATETIME':'datetime64[ns]','TIMESTAMP':'datetime64[ns]','FLOAT':np.float64,'DOUBLE':np.float64,'STRING':str} # _mode = copy.deepcopy(self.mode) - _mode = self.mode + # _mode = self.mode # _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) # # Let us adjust the chunking here + if 'if_exists' in _args : + self.mode['if_exists'] = _args['if_exists'] self._chunks = 10 if _df.shape[0] > MAX_CHUNK and self._chunks == 1 else self._chunks _indexes = np.array_split(np.arange(_df.shape[0]),self._chunks) for i in _indexes : - _df.iloc[i].to_gbq(**self.mode) + # _df.iloc[i].to_gbq(**self.mode) + pd_gbq.to_gbq(_df.iloc[i],**self.mode) time.sleep(1) pass \ No newline at end of file