diff --git a/transport/sql.py b/transport/sql.py index 180b962..542b535 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -185,6 +185,7 @@ class BigQuery: self.credentials = service_account.Credentials.from_service_account_file(path) self.dataset = _args['dataset'] if 'dataset' in _args else None self.path = path + self.dtypes = _args['dtypes'] if 'dtypes' in _args else None def meta(self,**_args): """ This function returns meta data for a given table or query with dataset/table properly formatted @@ -225,7 +226,13 @@ class BQReader(BigQuery,Reader) : SQL += " LIMIT "+str(_args['limit']) if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset: SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset) - return pd.read_gbq(SQL,credentials=self.credentials,dialect='standard') if SQL else None + _info = {'credentials':self.credentials,'dialect':'standard'} + if 'dtypes' in _args or self.dtypes : + self.dtypes = _args ['dtypes'] if 'dtypes' in self.dtypes else None + if self.dtypes : + _info['dtypes'] = self.dtypes + return pd.read_gbq(SQL,*_info) if SQL else None + # return pd.read_gbq(SQL,credentials=self.credentials,dialect='standard') if SQL else None class BQWriter(BigQuery,Writer): lock = Lock() def __init__(self,**_args): @@ -237,7 +244,7 @@ class BQWriter(BigQuery,Writer): def write(self,_info,**_args) : try: - if self.parallel : + if self.parallel or 'lock' in _args : BQWriter.lock.acquire() self._write(_info,**_args) finally: