commit
5960caa7f8
@ -0,0 +1,251 @@
|
|||||||
|
"""
|
||||||
|
Create pseudonyms map as follows :
|
||||||
|
table, field,value,enc,filter
|
||||||
|
"""
|
||||||
|
import pandas as pd
|
||||||
|
import numpy as np
|
||||||
|
from google.oauth2 import service_account
|
||||||
|
from google.cloud import bigquery as bq
|
||||||
|
import json
|
||||||
|
import threading
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import itertools
|
||||||
|
|
||||||
|
DATASET_SUFFIX = '_pseudo'
|
||||||
|
PSEUDO_TABLENAME = 'map'
|
||||||
|
|
||||||
|
SYS_ARGS = {'context':''}
|
||||||
|
if len(sys.argv) > 1:
|
||||||
|
|
||||||
|
N = len(sys.argv)
|
||||||
|
for i in range(1,N):
|
||||||
|
value = None
|
||||||
|
if sys.argv[i].startswith('--'):
|
||||||
|
key = sys.argv[i].replace('-','')
|
||||||
|
|
||||||
|
if i + 1 < N:
|
||||||
|
value = sys.argv[i + 1] = sys.argv[i+1].strip()
|
||||||
|
if key and value:
|
||||||
|
SYS_ARGS[key] = value
|
||||||
|
if key == 'context':
|
||||||
|
SYS_ARGS[key] = ('/'+value).replace('//','/')
|
||||||
|
|
||||||
|
i += 2
|
||||||
|
|
||||||
|
|
||||||
|
class void :
|
||||||
|
pass
|
||||||
|
|
||||||
|
class pseudonym :
|
||||||
|
@staticmethod
|
||||||
|
def meta(**args) :
|
||||||
|
"""
|
||||||
|
:key Bigquery private key (service account)
|
||||||
|
:dataset dataset of the input table
|
||||||
|
:table table name
|
||||||
|
:filter optional filter (SQL statement)
|
||||||
|
"""
|
||||||
|
credentials = service_account.Credentials.from_service_account_file(args['key'])
|
||||||
|
SQL = ["SELECT * FROM :dataset.:table"]
|
||||||
|
if 'filter' in args :
|
||||||
|
SQL += ['WHERE',args['filter']]
|
||||||
|
dataset = args['dataset']
|
||||||
|
table = args['table']
|
||||||
|
SQL = " ".join(SQL+["LIMIT 1"]).replace(":dataset",dataset).replace(":table",table)
|
||||||
|
|
||||||
|
df = pd.read_gbq(SQL,credentials=credentials,dialect='standard')
|
||||||
|
return df.columns
|
||||||
|
@staticmethod
|
||||||
|
def apply(**args):
|
||||||
|
"""
|
||||||
|
This function applies the
|
||||||
|
"""
|
||||||
|
columns = pseudonym.meta(**args)
|
||||||
|
#
|
||||||
|
# we need to make the schema here
|
||||||
|
client = bq.Client.from_service_account_json(args['key'])
|
||||||
|
datasets = list(client.list_datasets())
|
||||||
|
dataset_name = args['dataset']+DATASET_SUFFIX
|
||||||
|
if np.sum( [ 1*(item.dataset_id == dataset_name) for item in datasets]) == 0:
|
||||||
|
#-- make the target dataset
|
||||||
|
|
||||||
|
dataset = bq.Dataset(client.dataset(dataset_name))
|
||||||
|
client.create_dataset(dataset)
|
||||||
|
|
||||||
|
for name in columns :
|
||||||
|
p = dict(args,**{"field":name})
|
||||||
|
p['filter'] = '' if 'filter' not in args else args['filter']
|
||||||
|
# thread = threading.Thread(target=pseudonym.post, args=(p,))
|
||||||
|
# thread.start()
|
||||||
|
# if columns.tolist().index(name) == 0 :
|
||||||
|
# thread.join()
|
||||||
|
pseudonym.post(**p)
|
||||||
|
|
||||||
|
|
||||||
|
#
|
||||||
|
# let us submit the query
|
||||||
|
pass
|
||||||
|
@staticmethod
|
||||||
|
def post(**args) :
|
||||||
|
"""
|
||||||
|
This function will submit a query to bigquery for insertion
|
||||||
|
"""
|
||||||
|
SQL = " ".join(['SELECT DISTINCT CAST(',args['field']," AS STRING) AS values, COUNT(*) as counts FROM :dataset.:table :filter"]).replace(':dataset',args['dataset'])
|
||||||
|
SQL = SQL.replace(':table',args['table'])
|
||||||
|
if args['filter'].strip() != '' :
|
||||||
|
SQL = SQL.replace(":filter", "WHERE "+args['filter'])
|
||||||
|
SQL += " ".join(['GROUP BY ',args['field'],'ORDER BY 1 '])
|
||||||
|
TABLE_NAME = ".".join([args['dataset'],args['table']])
|
||||||
|
credentials = service_account.Credentials.from_service_account_file(args['key'])
|
||||||
|
df = pd.read_gbq(SQL,credentials=credentials,dialect='standard')
|
||||||
|
df['table'] = args['table']
|
||||||
|
df['field'] = args['field']
|
||||||
|
# df['filter']= args['filter']
|
||||||
|
N = df.shape[0] + 10000
|
||||||
|
beg = np.random.randint(11,200)
|
||||||
|
df['encoded'] = np.random.choice(np.arange(beg,N),df.shape[0],replace=False)
|
||||||
|
df = df[['table','field','values','counts','encoded']]
|
||||||
|
# print (df.head()[:5])
|
||||||
|
# sys.stdout.flush()
|
||||||
|
TABLE_NAME = ".".join([args['dataset']+DATASET_SUFFIX,PSEUDO_TABLENAME])
|
||||||
|
df.to_gbq(TABLE_NAME,credentials=credentials,if_exists='append')
|
||||||
|
# df.to_gbq(TABLE_NAME.replace('.','_pseudo.'),credentials=credentials,if_exists='append')
|
||||||
|
|
||||||
|
class Builder :
|
||||||
|
"""
|
||||||
|
This class will build a dataset from encoded values
|
||||||
|
"""
|
||||||
|
def encode(self,**args):
|
||||||
|
"""
|
||||||
|
This function will create pseudonyms for a given table from the mapping tables
|
||||||
|
"""
|
||||||
|
SQL = "SELECT * FROM :dataset.:table limit 1".replace(':dataset',args['dataset']).replace(":table",args['table'])
|
||||||
|
credentials = service_account.Credentials.from_service_account_file(args['key'])
|
||||||
|
columns = pd.read_gbq(SQL,credentials=credentials,dialect='standard').columns.tolist()
|
||||||
|
TEMPLATE = ['(SELECT encoded FROM :dataset'+DATASET_SUFFIX+'.'+PSEUDO_TABLENAME,"WHERE table=':table' AND field = ':name' AND CAST(values AS STRING)=CAST(:table.:name AS STRING ) ) as :name"]
|
||||||
|
SQL = ["SELECT"]
|
||||||
|
FIELDS = []
|
||||||
|
for field in columns :
|
||||||
|
FIELDS += [" ".join(TEMPLATE).replace(":name",field)]
|
||||||
|
SQL += [",\n\t".join(FIELDS)]
|
||||||
|
SQL += ['FROM :dataset.:table']
|
||||||
|
return ("\n".join(SQL).replace(":dataset",args['dataset']).replace(':table',args['table']) )
|
||||||
|
|
||||||
|
def process(self,**args):
|
||||||
|
"""
|
||||||
|
:dataset
|
||||||
|
:table
|
||||||
|
:key
|
||||||
|
"""
|
||||||
|
pseudonym.apply(**args)
|
||||||
|
def decode(self,**args):
|
||||||
|
"""
|
||||||
|
This function should be able to take a pseudonymized data frame and convert it to original values
|
||||||
|
...
|
||||||
|
"""
|
||||||
|
|
||||||
|
pass
|
||||||
|
class Binary :
|
||||||
|
"""
|
||||||
|
This is a utility class to import and export a data to/from a binary matrix
|
||||||
|
"""
|
||||||
|
def __stream(self,column) :
|
||||||
|
"""
|
||||||
|
This function will convert a column into a binary matrix with the value-space representing each column of the resulting matrix
|
||||||
|
:column a column vector i.e every item is a row
|
||||||
|
"""
|
||||||
|
values = np.unique(column)
|
||||||
|
values.sort()
|
||||||
|
|
||||||
|
row_count,col_count = column.size,values.size
|
||||||
|
matrix = [ np.zeros(col_count) for i in np.arange(row_count)]
|
||||||
|
#
|
||||||
|
# let's create a binary matrix of the feature that was passed in
|
||||||
|
# The indices of the matrix are inspired by classical x,y axis
|
||||||
|
for yi in np.arange(row_count) :
|
||||||
|
value = column[yi]
|
||||||
|
xi = np.where(values == value)[0][0] #-- column index
|
||||||
|
matrix[yi][xi] = 1
|
||||||
|
|
||||||
|
return matrix
|
||||||
|
def Export(self,df) :
|
||||||
|
"""
|
||||||
|
This function will convert a data-frame to a binary matrix
|
||||||
|
:return _map,matrix
|
||||||
|
"""
|
||||||
|
#
|
||||||
|
# This will give us a map of how each column was mapped to a bitstream
|
||||||
|
_map = df.apply(lambda column: self.__stream(column.values),axis=0)
|
||||||
|
#
|
||||||
|
# We will merge this to have a healthy matrix
|
||||||
|
_matrix = _map.apply(lambda row: list(list(itertools.chain(*row.values.tolist()))),axis=1)
|
||||||
|
_matrix = np.matrix([list(item) for item in _matrix])
|
||||||
|
#
|
||||||
|
# let's format the map so we don't have an unreasonable amount of data
|
||||||
|
#
|
||||||
|
columns = _map.columns.tolist()
|
||||||
|
beg = 0
|
||||||
|
end = 0
|
||||||
|
_map = _map.loc[0]
|
||||||
|
_m = {}
|
||||||
|
for name in columns :
|
||||||
|
end += _map[name].size
|
||||||
|
_m[name] = {"start":beg,"end":end}
|
||||||
|
beg = end
|
||||||
|
|
||||||
|
return _m,_matrix
|
||||||
|
|
||||||
|
def Import(self,df,values,_map):
|
||||||
|
"""
|
||||||
|
This function will convert a binary stream into a
|
||||||
|
:values original/pseudonymed values
|
||||||
|
:_map field map of the binary matrix
|
||||||
|
"""
|
||||||
|
r = pd.DataFrame(None,columns=_map.keys())
|
||||||
|
for key in _map:
|
||||||
|
i = np.arange(_map[key]['start'],_map[key]['end'])
|
||||||
|
columns = values[key]
|
||||||
|
r[key] = df[i].apply(lambda row: np.array( columns)[row==1][0], axis=1 )
|
||||||
|
return r
|
||||||
|
pass
|
||||||
|
|
||||||
|
# has_basic = 'dataset' in SYS_ARGS.keys() and 'table' in SYS_ARGS.keys() and 'key' in SYS_ARGS.keys()
|
||||||
|
# has_action= 'export' in SYS_ARGS.keys() or 'pseudo' in SYS_ARGS.keys()
|
||||||
|
df = pd.DataFrame({"fname":['james','james','steve','kevin','kevin'],"lname":["bond","dean","nyemba",'james','johnson']})
|
||||||
|
df['age'] = (np.random.sample(df.shape[0]) * 100).astype(np.int32)
|
||||||
|
if __name__ == '__main__' :
|
||||||
|
"""
|
||||||
|
Run the program from the command line passing the following mandatory arguments
|
||||||
|
python bridge.py <[--pseudo|--export <PATH>]> --dataset <dataset> --table <tablename> [--filter <table filter>]
|
||||||
|
--pseudo will create pseudonyms for a given
|
||||||
|
--export will export data to a specified location
|
||||||
|
"""
|
||||||
|
has_basic = 'dataset' in SYS_ARGS.keys() and 'table' in SYS_ARGS.keys() and 'key' in SYS_ARGS.keys()
|
||||||
|
has_action= 'export' in SYS_ARGS.keys() or 'pseudo' in SYS_ARGS.keys()
|
||||||
|
if has_basic and has_action :
|
||||||
|
builder = Builder()
|
||||||
|
if 'export' in SYS_ARGS :
|
||||||
|
print ()
|
||||||
|
print ("exporting ....")
|
||||||
|
if not os.path.exists(SYS_ARGS['export']) :
|
||||||
|
os.mkdir(SYS_ARGS['export'])
|
||||||
|
SQL = builder.encode(**SYS_ARGS)
|
||||||
|
credentials = service_account.Credentials.from_service_account_file(SYS_ARGS['key'])
|
||||||
|
df = pd.read_gbq(SQL,credentials =credentials,dialect='standard')
|
||||||
|
FILENAME = os.sep.join([SYS_ARGS['export'],SYS_ARGS['table']+'.csv'])
|
||||||
|
#
|
||||||
|
# This would allow us to export it to wherever we see fit
|
||||||
|
print (FILENAME)
|
||||||
|
df.to_csv(FILENAME,index=False)
|
||||||
|
elif 'pseudo' in SYS_ARGS :
|
||||||
|
builder.process(**SYS_ARGS)
|
||||||
|
else:
|
||||||
|
print ("")
|
||||||
|
print ("has basic ",has_basic)
|
||||||
|
print ("has action ",has_action)
|
||||||
|
# pseudonym.apply(table='person',dataset='wgan_original',key='./curation-test-2.json')
|
||||||
|
# args = {"dataset":"wgan_original","table":"observation","key":"./curation-test-2.json"}
|
||||||
|
# builder = Builder()
|
||||||
|
# # builder.encode(dataset='wgan_original',table='person',key='./curation-test-2.json')
|
||||||
|
# builder.process(**args)
|
Loading…
Reference in new issue