diff --git a/transport/sql.py b/transport/sql.py index 7be3900..3176cf7 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -25,6 +25,8 @@ else: import json from google.oauth2 import service_account from google.cloud import bigquery as bq +# import constants.bq_utils as bq_consts + from multiprocessing import Lock, RLock import pandas as pd import numpy as np @@ -462,7 +464,7 @@ class BQWriter(BigQuery,Writer): self.table = _args['table'] if 'table' in _args else None self.mode = {'if_exists':'append','chunksize':900000,'destination_table':self.table,'credentials':self.credentials} self._chunks = 1 if 'chunks' not in _args else int(_args['chunks']) - + self._location = 'US' if 'location' not in _args else _args['location'] def write(self,_info,**_args) : try: if self.parallel or 'lock' in _args : @@ -472,6 +474,21 @@ class BQWriter(BigQuery,Writer): finally: if self.parallel: BQWriter.lock.release() + def submit(self,_sql): + """ + Write the output of a massive query to a given table, biquery will handle this as a job + This function will return the job identifier + """ + _config = bq.QueryJobConfig() + _config.destination = self.client.dataset(self.dataset).table(self.table) + _config.allow_large_results = True + # _config.write_disposition = bq.bq_consts.WRITE_APPEND + _config.dry_run = False + # _config.priority = 'BATCH' + _resp = self.client.query(_sql,location=self._location,job_config=_config) + return _resp.job_id + def status (self,_id): + return self.client.get_job(_id,location=self._location) def _write(self,_info,**_args) : _df = None if type(_info) in [list,pd.DataFrame] : diff --git a/transport/version.py b/transport/version.py index 5ad4744..2b34f5b 100644 --- a/transport/version.py +++ b/transport/version.py @@ -1,2 +1,2 @@ __author__ = 'The Phi Technology' -__version__= '1.9.3' +__version__= '1.9.4'