#!/usr/bin/env python3 """ This file will perform basic tasks to finalize the GAN process by performing the following : - basic stats & analytics - rebuild io to another dataset """ import pandas as pd import numpy as np from multiprocessing import Process, Lock from google.oauth2 import service_account from google.cloud import bigquery as bq import transport from data.params import SYS_ARGS import json class Analytics : """ This class will compile basic analytics about a given dataset i.e compare original/synthetic """ @staticmethod def distribution(**args): context = args['context'] df = args['data'] # #-- This data frame counts unique values for each feature (space) df_counts = pd.DataFrame(df.apply(lambda col: col.unique().size),columns=['counts']).T # unique counts # #-- Get the distributions for common values # names = [name for name in df_counts.columns.tolist() if name.endswith('_io') == False] ddf = df.apply(lambda col: pd.DataFrame(col.values,columns=[col.name]).groupby([col.name]).size() ).fillna(0) ddf[context] = ddf.index pass def distance(**args): """ This function will measure the distance between """ pass class Utils : @staticmethod def log(**args): logger = transport.factory.instance(type="mongo.MongoWriter",args={"dbname":"aou","doc":"logs"}) logger.write(args) logger.close() class get : @staticmethod def pipeline(table,path) : # contexts = args['contexts'].split(',') if type(args['contexts']) == str else args['contexts'] config = json.loads((open(path)).read()) pipeline = config['pipeline'] # return [ item for item in pipeline if item['context'] in contexts] pipeline = [item for item in pipeline if 'from' in item and item['from'].strip() == table] Utils.log(module=table,action='init',input={"pipeline":pipeline}) return pipeline @staticmethod def sql(**args) : """ This function is intended to build SQL query for the remainder of the table that was not synthesized :config configuration entries :from source of the table name :dataset name of the source dataset """ SQL = ["SELECT * FROM :from "] SQL_FILTER = [] NO_FILTERS_FOUND = True # pipeline = Utils.get.config(**args) pipeline = args['pipeline'] REVERSE_QUALIFIER = {'IN':'NOT IN','NOT IN':'IN','=':'<>','<>':'='} for item in pipeline : if 'filter' in item : if NO_FILTERS_FOUND : NO_FILTERS_FOUND = False SQL += ['WHERE'] # # Let us load the filter in the SQL Query FILTER = item['filter'] QUALIFIER = REVERSE_QUALIFIER[FILTER['qualifier'].upper()] SQL_FILTER += [" ".join([FILTER['field'], QUALIFIER,'(',FILTER['value'],')']).replace(":dataset",args['dataset'])] src = ".".join([args['dataset'],args['from']]) SQL += [" AND ".join(SQL_FILTER)] # # let's pull the field schemas out of the table definition # Utils.log(module=args['from'],action='sql',input={"sql":" ".join(SQL) }) return " ".join(SQL).replace(":from",src) def mk(**args) : dataset = args['dataset'] client = args['client'] if 'client' in args else bq.Client.from_service_account_file(args['private_key']) # # let us see if we have a dataset handy here # datasets = list(client.list_datasets()) found = [item for item in datasets if item.dataset_id == dataset] if not found : return client.create_dataset(dataset) return found[0] def move (args): """ This function will move a table from the synthetic dataset into a designated location This is the simplest case for finalizing a synthetic data set :private_key """ pipeline = Utils.get.pipeline(args['from'],args['config']) _args = json.loads((open(args['config'])).read()) _args['pipeline'] = pipeline # del _args['pipeline'] args = dict(args,**_args) # del args['pipeline'] # private_key = args['private_key'] client = bq.Client.from_service_account_json(args['private_key']) dataset = args['dataset'] if pipeline : SQL = [ ''.join(["SELECT * FROM io.",item['context'],'_full_io']) for item in pipeline] SQL += [Utils.get.sql(**args)] SQL = ('\n UNION ALL \n'.join(SQL).replace(':dataset','io')) else: # # moving a table to a designated location tablename = args['from'] if 'sql' not in args : SQL = "SELECT * FROM :dataset.:table" else: SQL = args['sql'] SQL = SQL.replace(":dataset",dataset).replace(":table",tablename) Utils.log(module=args['from'],action='sql',input={'sql':SQL}) # # At this point we have gathered all the tables in the io folder and we should now see if we need to merge with the remainder from the original table # odataset = mk(dataset=dataset+'_io',client=client) # SQL = "SELECT * FROM io.:context_full_io".replace(':context',context) config = bq.QueryJobConfig() config.destination = client.dataset(odataset.dataset_id).table(args['from']) config.use_query_cache = True config.allow_large_results = True config.priority = 'INTERACTIVE' # # schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema fields = [" ".join(["CAST (",item.name,"AS",item.field_type.replace("INTEGER","INT64").replace("FLOAT","FLOAT64"),") ",item.name]) for item in schema] SQL = SQL.replace("*"," , ".join(fields)) # print (SQL) out = client.query(SQL,location='US',job_config=config) Utils.log(module=args['from'],action='move',input={'job':out.job_id}) return (out.job_id) import pandas as pd import numpy as np from google.oauth2 import service_account import json # path = '../curation-prod.json' # credentials = service_account.Credentials.from_service_account_file(path) # df = pd.read_gbq("SELECT * FROM io.icd10_partial_io",credentials=credentials,dialect='standard') filename = 'config.json' if 'config' not in SYS_ARGS else SYS_ARGS['config'] f = open(filename) config = json.loads(f.read()) args = config['pipeline'] f.close() if __name__ == '__main__' : """ Usage : finalize -- --contexts --from """ if 'move' in SYS_ARGS : if 'init' in SYS_ARGS : dep = config['dep'] if 'dep' in config else {} info = [] if 'queries' in dep : info += dep['queries'] print ('________') if 'tables' in dep : info += dep['tables'] args = {} jobs = [] for item in info : args = {} if type(item) == str : args['from'] = item name = item else: args = item name = item['from'] args['config'] = SYS_ARGS['config'] # args['pipeline'] = [] job = Process(target=move,args=(args,)) job.name = name jobs.append(job) job.start() # while len(jobs) > 0 : # jobs = [job for job in jobs if job.is_alive()] # time.sleep(1) else: move(SYS_ARGS) # # table = SYS_ARGS['from'] # # args = dict(config,**{"private_key":"../curation-prod.json"}) # args = dict(args,**SYS_ARGS) # contexts = [item['context'] for item in config['pipeline'] if item['from'] == SYS_ARGS['from']] # log = [] # if contexts : # args['contexts'] = contexts # log = move(**args) # else: # tables = args['from'].split(',') # for name in tables : # name = name.strip() # args['from'] = name # log += [move(**args)] # print ("\n".join(log)) else: print ("NOT YET READY !")