From 89ed5d5d46ae0d109cdd1d7b5415c1778f30bce6 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Wed, 1 Apr 2020 00:22:21 -0500 Subject: [PATCH] simplify the CLI interface to leverage existing configuration --- finalize.py | 159 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 finalize.py diff --git a/finalize.py b/finalize.py new file mode 100644 index 0000000..a375b37 --- /dev/null +++ b/finalize.py @@ -0,0 +1,159 @@ +""" +This file will perform basic tasks to finalize the GAN process by performing the following : + - basic stats & analytics + - rebuild io to another dataset +""" +import pandas as pd +import numpy as np +from google.oauth2 import service_account +from google.cloud import bigquery as bq +from data.params import SYS_ARGS +import json +class Analytics : + """ + This class will compile basic analytics about a given dataset i.e compare original/synthetic + """ + @staticmethod + def distribution(**args): + context = args['context'] + df = args['data'] + # + #-- This data frame counts unique values for each feature (space) + df_counts = pd.DataFrame(df.apply(lambda col: col.unique().size),columns=['counts']).T # unique counts + # + #-- Get the distributions for common values + # + names = [name for name in df_counts.columns.tolist() if name.endswith('_io') == False] + ddf = df.apply(lambda col: pd.DataFrame(col.values,columns=[col.name]).groupby([col.name]).size() ).fillna(0) + ddf[context] = ddf.index + + pass + def distance(**args): + """ + This function will measure the distance between + """ + df = args['data'] + names = [name for name in df_counts.columns.tolist() if name.endswith('_io') == False] +class Utils : + class get : + @staticmethod + def config(**args) : + contexts = args['contexts'].split(',') if type(args['contexts']) == str else args['contexts'] + pipeline = args['pipeline'] + return [ item for item in pipeline if item['context'] in contexts] + @staticmethod + def sql(**args) : + """ + This function is intended to build SQL query for the remainder of the table that was not synthesized + :config configuration entries + :from source of the table name + :dataset name of the source dataset + + """ + SQL = ["SELECT * FROM :from "] + SQL_FILTER = [] + NO_FILTERS_FOUND = True + pipeline = Utils.get.config(**args) + REVERSE_QUALIFIER = {'IN':'NOT IN','NOT IN':'IN','=':'<>','<>':'='} + for item in pipeline : + + + if 'filter' in item : + if NO_FILTERS_FOUND : + NO_FILTERS_FOUND = False + SQL += ['WHERE'] + # + # Let us load the filter in the SQL Query + FILTER = item['filter'] + QUALIFIER = REVERSE_QUALIFIER[FILTER['qualifier'].upper()] + SQL_FILTER += [" ".join([FILTER['field'], QUALIFIER,'(',FILTER['value'],')'])] + src = ".".join([args['dataset'],args['from']]) + SQL += [" AND ".join(SQL_FILTER)] + # + # let's pull the field schemas out of the table definition + # + + return " ".join(SQL).replace(":from",src) + + +def mk(**args) : + dataset = args['dataset'] + client = args['client'] if 'client' in args else bq.Client.from_service_account_file(args['private_key']) + # + # let us see if we have a dataset handy here + # + datasets = list(client.list_datasets()) + found = [item for item in datasets if item.dataset_id == dataset] + + if not found : + + return client.create_dataset(dataset) + return found[0] + +def move (**args): + """ + This function will move a table from the synthetic dataset into a designated location + This is the simplest case for finalizing a synthetic data set + :private_key + """ + private_key = args['private_key'] + client = bq.Client.from_service_account_json(private_key) + config = Utils.get.config(**args) + dataset = args['dataset'] + SQL = [ ''.join(["SELECT * FROM io.",item['context'],'_full_io']) for item in config] + SQL += [Utils.get.sql(**args)] + SQL = ('\n UNION ALL \n'.join(SQL).replace(':dataset','io')) + + + # + # At this point we have gathered all the tables in the io folder and we should now see if we need to merge with the remainder from the original table + # + + + + odataset = mk(dataset=dataset+'_io',client=client) + # SQL = "SELECT * FROM io.:context_full_io".replace(':context',context) + config = bq.QueryJobConfig() + config.destination = client.dataset(odataset.dataset_id).table(args['from']) + config.use_query_cache = True + config.allow_large_results = True + config.priority = 'INTERACTIVE' + # + # + + schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema + fields = [" ".join(["CAST (",item.name,"AS",item.field_type.replace("INTEGER","INT64").replace("FLOAT","FLOAT64"),") ",item.name]) for item in schema] + SQL = SQL.replace("*"," , ".join(fields)) + # print (SQL) + out = client.query(SQL,location='US',job_config=config) + print (dir (out)) + + + + +import pandas as pd +import numpy as np +from google.oauth2 import service_account +import json + +# path = '../curation-prod.json' +# credentials = service_account.Credentials.from_service_account_file(path) +# df = pd.read_gbq("SELECT * FROM io.icd10_partial_io",credentials=credentials,dialect='standard') +f = open('config.json') +config = json.loads(f.read()) +args = config['pipeline'] +f.close() + + +if __name__ == '__main__' : + """ + Usage : + finalize -- --contexts --from + """ + if 'move' in SYS_ARGS : + table = SYS_ARGS['from'] + contexts = [item['context'] for item in config['pipeline'] if item['from'] == args['from']] + args = dict(config,**{"private_key":"../curation-prod.json"}) + args = dict(args,**SYS_ARGS) + args['contexts'] = contexts + move(**args) \ No newline at end of file