You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
160 lines
6.8 KiB
Python
160 lines
6.8 KiB
Python
8 months ago
|
"""
|
||
|
Implementing support for google's bigquery
|
||
|
- cloud.bigquery.Read
|
||
|
- cloud.bigquery.Write
|
||
|
"""
|
||
|
import json
|
||
|
from google.oauth2 import service_account
|
||
|
from google.cloud import bigquery as bq
|
||
|
|
||
|
from multiprocessing import Lock, RLock
|
||
|
import pandas as pd
|
||
|
import pandas_gbq as pd_gbq
|
||
|
import numpy as np
|
||
|
import time
|
||
|
|
||
|
MAX_CHUNK = 2000000
|
||
|
class BigQuery:
|
||
8 months ago
|
# return self.client.query(SQL).to_dataframe() if SQL else None
|
||
|
|
||
|
class Writer (BigQuery):
|
||
|
"""
|
||
|
This class implements support for writing against bigquery
|
||
|
"""
|
||
|
lock = RLock()
|
||
|
def __init__(self,**_args):
|
||
|
super().__init__(**_args)
|
||
|
|
||
|
self.parallel = False if 'lock' not in _args else _args['lock']
|
||
|
self.table = _args['table'] if 'table' in _args else None
|
||
|
self.mode = {'if_exists':'append','chunksize':900000,'destination_table':self.table,'credentials':self.credentials}
|
||
|
self._chunks = 1 if 'chunks' not in _args else int(_args['chunks'])
|
||
|
self._location = 'US' if 'location' not in _args else _args['location']
|
||
|
def write(self,_data,**_args) :
|
||
|
"""
|
||
|
This function will perform a write to bigquery
|
||
|
:_data data-frame to be written to bigquery
|
||
|
"""
|
||
|
try:
|
||
|
if self.parallel or 'lock' in _args :
|