From afcc5ed690e7c73a14d412d64ef4891fcfec5274 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Mon, 29 Mar 2021 23:30:54 -0500 Subject: [PATCH] bug fix ... --- setup.py | 4 ++-- transport/sql.py | 45 ++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/setup.py b/setup.py index 5f09279..f316e24 100644 --- a/setup.py +++ b/setup.py @@ -8,12 +8,12 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { "name":"data-transport", - "version":"1.3.8", + "version":"1.3.8.1", "author":"The Phi Technology LLC","author_email":"info@the-phi.com", "license":"MIT", "packages":["transport"]} args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite'] -args["install_requires"] = ['pymongo','numpy','cloudant','pika','boto','google-cloud-bigquery','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] +args["install_requires"] = ['pymongo','numpy','cloudant','pika','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python'] args["url"] = "https://healthcareio.the-phi.com/git/code/transport.git" if sys.version_info[0] == 2 : diff --git a/transport/sql.py b/transport/sql.py index 5e817ca..dcf9e15 100644 --- a/transport/sql.py +++ b/transport/sql.py @@ -18,9 +18,13 @@ else: from common import Reader,Writer import json from google.oauth2 import service_account +from google.cloud import bigquery as bq from multiprocessing import Lock import pandas as pd import numpy as np +import copy + + class SQLRW : PROVIDERS = {"postgresql":"5432","redshift":"5432","mysql":"3306","mariadb":"3306"} DRIVERS = {"postgresql":pg,"redshift":pg,"mysql":my,"mariadb":my} @@ -177,9 +181,32 @@ class SQLWriter(SQLRW,Writer): pass class BigQuery: def __init__(self,**_args): - path = _args['service_key'] + path = _args['service_key'] if 'service_key' in _args else _args['private_key'] self.credentials = service_account.Credentials.from_service_account_file(path) - + self.dataset = _args['dataset'] if 'dataset' in _args else None + self.path = path + def meta(self,**_args): + """ + This function returns meta data for a given table or query with dataset/table properly formatted + :param table name of the name WITHOUT including dataset + :param sql sql query to be pulled, + """ + #if 'table' in _args : + # sql = "SELECT * from :dataset."+ _args['table']" limit 1" + #else: + # sql = _args['sql'] + # if 'limit' not in sql.lower() : + # sql = sql + ' limit 1' + + #sql = sql.replace(':dataset',self.dataset) if ':dataset' in args else sql + + # + # Let us return the schema information now for a given table + # + table = _args['table'] + client = bq.Client.from_service_account_json(self.path) + ref = client.dataset(self.dataset).table(table) + return client.get_table(ref).schema class BQReader(BigQuery,Reader) : def __init__(self,**_args): @@ -196,7 +223,8 @@ class BQReader(BigQuery,Reader) : SQL = "SELECT * FROM :table ".replace(":table",table) if SQL and 'limit' in _args: SQL += " LIMIT "+str(_args['limit']) - + if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset: + SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset) return pd.read_gbq(SQL,credentials=self.credentials,dialect='standard') if SQL else None class BQWriter(BigQuery,Writer): Lock = Lock() @@ -223,7 +251,14 @@ class BQWriter(BigQuery,Writer): elif type(_info) == pd.DataFrame : _df = _info - self.mode['destination_table'] = _args['table'].strip() + if '.' not in _args['table'] : + self.mode['destination_table'] = '.'.join([self.dataset,_args['table']]) + else: + + self.mode['destination_table'] = _args['table'].strip() + if 'schema' in _args : + self.mode['table_schema'] = _args['schema'] + _mode = copy.deepcopy(self.mode) _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000) pass @@ -237,4 +272,4 @@ class BQWriter(BigQuery,Writer): # # # w = SQLWriter(**_args) # # # w.write({"name":"kalara.io","email":"ceo@kalara.io","age":10}) # r = SQLReader(**_args) -# print (r.read(filter='age > 0',limit = 20)) \ No newline at end of file +# print (r.read(filter='age > 0',limit = 20))