From 5960caa7f89d98d45356ea1a998ba07b8cd412db Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Mon, 7 Oct 2019 11:49:12 -0500 Subject: [PATCH] aou-2-pgphx --- bridge.py | 251 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 251 insertions(+) create mode 100644 bridge.py diff --git a/bridge.py b/bridge.py new file mode 100644 index 0000000..ea7215e --- /dev/null +++ b/bridge.py @@ -0,0 +1,251 @@ +""" + Create pseudonyms map as follows : + table, field,value,enc,filter +""" +import pandas as pd +import numpy as np +from google.oauth2 import service_account +from google.cloud import bigquery as bq +import json +import threading +import sys +import os +import itertools + +DATASET_SUFFIX = '_pseudo' +PSEUDO_TABLENAME = 'map' + +SYS_ARGS = {'context':''} +if len(sys.argv) > 1: + + N = len(sys.argv) + for i in range(1,N): + value = None + if sys.argv[i].startswith('--'): + key = sys.argv[i].replace('-','') + + if i + 1 < N: + value = sys.argv[i + 1] = sys.argv[i+1].strip() + if key and value: + SYS_ARGS[key] = value + if key == 'context': + SYS_ARGS[key] = ('/'+value).replace('//','/') + + i += 2 + + +class void : + pass + +class pseudonym : + @staticmethod + def meta(**args) : + """ + :key Bigquery private key (service account) + :dataset dataset of the input table + :table table name + :filter optional filter (SQL statement) + """ + credentials = service_account.Credentials.from_service_account_file(args['key']) + SQL = ["SELECT * FROM :dataset.:table"] + if 'filter' in args : + SQL += ['WHERE',args['filter']] + dataset = args['dataset'] + table = args['table'] + SQL = " ".join(SQL+["LIMIT 1"]).replace(":dataset",dataset).replace(":table",table) + + df = pd.read_gbq(SQL,credentials=credentials,dialect='standard') + return df.columns + @staticmethod + def apply(**args): + """ + This function applies the + """ + columns = pseudonym.meta(**args) + # + # we need to make the schema here + client = bq.Client.from_service_account_json(args['key']) + datasets = list(client.list_datasets()) + dataset_name = args['dataset']+DATASET_SUFFIX + if np.sum( [ 1*(item.dataset_id == dataset_name) for item in datasets]) == 0: + #-- make the target dataset + + dataset = bq.Dataset(client.dataset(dataset_name)) + client.create_dataset(dataset) + + for name in columns : + p = dict(args,**{"field":name}) + p['filter'] = '' if 'filter' not in args else args['filter'] + # thread = threading.Thread(target=pseudonym.post, args=(p,)) + # thread.start() + # if columns.tolist().index(name) == 0 : + # thread.join() + pseudonym.post(**p) + + + # + # let us submit the query + pass + @staticmethod + def post(**args) : + """ + This function will submit a query to bigquery for insertion + """ + SQL = " ".join(['SELECT DISTINCT CAST(',args['field']," AS STRING) AS values, COUNT(*) as counts FROM :dataset.:table :filter"]).replace(':dataset',args['dataset']) + SQL = SQL.replace(':table',args['table']) + if args['filter'].strip() != '' : + SQL = SQL.replace(":filter", "WHERE "+args['filter']) + SQL += " ".join(['GROUP BY ',args['field'],'ORDER BY 1 ']) + TABLE_NAME = ".".join([args['dataset'],args['table']]) + credentials = service_account.Credentials.from_service_account_file(args['key']) + df = pd.read_gbq(SQL,credentials=credentials,dialect='standard') + df['table'] = args['table'] + df['field'] = args['field'] + # df['filter']= args['filter'] + N = df.shape[0] + 10000 + beg = np.random.randint(11,200) + df['encoded'] = np.random.choice(np.arange(beg,N),df.shape[0],replace=False) + df = df[['table','field','values','counts','encoded']] + # print (df.head()[:5]) + # sys.stdout.flush() + TABLE_NAME = ".".join([args['dataset']+DATASET_SUFFIX,PSEUDO_TABLENAME]) + df.to_gbq(TABLE_NAME,credentials=credentials,if_exists='append') + # df.to_gbq(TABLE_NAME.replace('.','_pseudo.'),credentials=credentials,if_exists='append') + +class Builder : + """ + This class will build a dataset from encoded values + """ + def encode(self,**args): + """ + This function will create pseudonyms for a given table from the mapping tables + """ + SQL = "SELECT * FROM :dataset.:table limit 1".replace(':dataset',args['dataset']).replace(":table",args['table']) + credentials = service_account.Credentials.from_service_account_file(args['key']) + columns = pd.read_gbq(SQL,credentials=credentials,dialect='standard').columns.tolist() + TEMPLATE = ['(SELECT encoded FROM :dataset'+DATASET_SUFFIX+'.'+PSEUDO_TABLENAME,"WHERE table=':table' AND field = ':name' AND CAST(values AS STRING)=CAST(:table.:name AS STRING ) ) as :name"] + SQL = ["SELECT"] + FIELDS = [] + for field in columns : + FIELDS += [" ".join(TEMPLATE).replace(":name",field)] + SQL += [",\n\t".join(FIELDS)] + SQL += ['FROM :dataset.:table'] + return ("\n".join(SQL).replace(":dataset",args['dataset']).replace(':table',args['table']) ) + + def process(self,**args): + """ + :dataset + :table + :key + """ + pseudonym.apply(**args) + def decode(self,**args): + """ + This function should be able to take a pseudonymized data frame and convert it to original values + ... + """ + + pass +class Binary : + """ + This is a utility class to import and export a data to/from a binary matrix + """ + def __stream(self,column) : + """ + This function will convert a column into a binary matrix with the value-space representing each column of the resulting matrix + :column a column vector i.e every item is a row + """ + values = np.unique(column) + values.sort() + + row_count,col_count = column.size,values.size + matrix = [ np.zeros(col_count) for i in np.arange(row_count)] + # + # let's create a binary matrix of the feature that was passed in + # The indices of the matrix are inspired by classical x,y axis + for yi in np.arange(row_count) : + value = column[yi] + xi = np.where(values == value)[0][0] #-- column index + matrix[yi][xi] = 1 + + return matrix + def Export(self,df) : + """ + This function will convert a data-frame to a binary matrix + :return _map,matrix + """ + # + # This will give us a map of how each column was mapped to a bitstream + _map = df.apply(lambda column: self.__stream(column.values),axis=0) + # + # We will merge this to have a healthy matrix + _matrix = _map.apply(lambda row: list(list(itertools.chain(*row.values.tolist()))),axis=1) + _matrix = np.matrix([list(item) for item in _matrix]) + # + # let's format the map so we don't have an unreasonable amount of data + # + columns = _map.columns.tolist() + beg = 0 + end = 0 + _map = _map.loc[0] + _m = {} + for name in columns : + end += _map[name].size + _m[name] = {"start":beg,"end":end} + beg = end + + return _m,_matrix + + def Import(self,df,values,_map): + """ + This function will convert a binary stream into a + :values original/pseudonymed values + :_map field map of the binary matrix + """ + r = pd.DataFrame(None,columns=_map.keys()) + for key in _map: + i = np.arange(_map[key]['start'],_map[key]['end']) + columns = values[key] + r[key] = df[i].apply(lambda row: np.array( columns)[row==1][0], axis=1 ) + return r + pass + +# has_basic = 'dataset' in SYS_ARGS.keys() and 'table' in SYS_ARGS.keys() and 'key' in SYS_ARGS.keys() +# has_action= 'export' in SYS_ARGS.keys() or 'pseudo' in SYS_ARGS.keys() +df = pd.DataFrame({"fname":['james','james','steve','kevin','kevin'],"lname":["bond","dean","nyemba",'james','johnson']}) +df['age'] = (np.random.sample(df.shape[0]) * 100).astype(np.int32) +if __name__ == '__main__' : + """ + Run the program from the command line passing the following mandatory arguments + python bridge.py <[--pseudo|--export ]> --dataset --table [--filter ] + --pseudo will create pseudonyms for a given + --export will export data to a specified location + """ + has_basic = 'dataset' in SYS_ARGS.keys() and 'table' in SYS_ARGS.keys() and 'key' in SYS_ARGS.keys() + has_action= 'export' in SYS_ARGS.keys() or 'pseudo' in SYS_ARGS.keys() + if has_basic and has_action : + builder = Builder() + if 'export' in SYS_ARGS : + print () + print ("exporting ....") + if not os.path.exists(SYS_ARGS['export']) : + os.mkdir(SYS_ARGS['export']) + SQL = builder.encode(**SYS_ARGS) + credentials = service_account.Credentials.from_service_account_file(SYS_ARGS['key']) + df = pd.read_gbq(SQL,credentials =credentials,dialect='standard') + FILENAME = os.sep.join([SYS_ARGS['export'],SYS_ARGS['table']+'.csv']) + # + # This would allow us to export it to wherever we see fit + print (FILENAME) + df.to_csv(FILENAME,index=False) + elif 'pseudo' in SYS_ARGS : + builder.process(**SYS_ARGS) + else: + print ("") + print ("has basic ",has_basic) + print ("has action ",has_action) +# pseudonym.apply(table='person',dataset='wgan_original',key='./curation-test-2.json') +# args = {"dataset":"wgan_original","table":"observation","key":"./curation-test-2.json"} +# builder = Builder() +# # builder.encode(dataset='wgan_original',table='person',key='./curation-test-2.json') +# builder.process(**args)