"""
Create pseudonyms map as follows :
table , field , value , enc , filter
"""
import pandas as pd
import numpy as np
from google . oauth2 import service_account
from google . cloud import bigquery as bq
import json
import threading
import sys
import os
import itertools
DATASET_SUFFIX = ' _pseudo '
PSEUDO_TABLENAME = ' map '
SYS_ARGS = { ' context ' : ' ' }
if len ( sys . argv ) > 1 :
N = len ( sys . argv )
for i in range ( 1 , N ) :
value = None
if sys . argv [ i ] . startswith ( ' -- ' ) :
key = sys . argv [ i ] . replace ( ' - ' , ' ' )
if i + 1 < N :
value = sys . argv [ i + 1 ] = sys . argv [ i + 1 ] . strip ( )
if key and value :
SYS_ARGS [ key ] = value
if key == ' context ' :
SYS_ARGS [ key ] = ( ' / ' + value ) . replace ( ' // ' , ' / ' )
i + = 2
class void :
pass
class pseudonym :
@staticmethod
def meta ( * * args ) :
"""
: key Bigquery private key ( service account )
: dataset dataset of the input table
: table table name
: filter optional filter ( SQL statement )
"""
credentials = service_account . Credentials . from_service_account_file ( args [ ' key ' ] )
SQL = [ " SELECT * FROM :dataset.:table " ]
if ' filter ' in args :
SQL + = [ ' WHERE ' , args [ ' filter ' ] ]
dataset = args [ ' dataset ' ]
table = args [ ' table ' ]
SQL = " " . join ( SQL + [ " LIMIT 1 " ] ) . replace ( " :dataset " , dataset ) . replace ( " :table " , table )
df = pd . read_gbq ( SQL , credentials = credentials , dialect = ' standard ' )
return df . columns
@staticmethod
def apply ( * * args ) :
"""
This function applies the
"""
columns = pseudonym . meta ( * * args )
#
# we need to make the schema here
client = bq . Client . from_service_account_json ( args [ ' key ' ] )
datasets = list ( client . list_datasets ( ) )
dataset_name = args [ ' dataset ' ] + DATASET_SUFFIX
if np . sum ( [ 1 * ( item . dataset_id == dataset_name ) for item in datasets ] ) == 0 :
#-- make the target dataset
dataset = bq . Dataset ( client . dataset ( dataset_name ) )
client . create_dataset ( dataset )
for name in columns :
p = dict ( args , * * { " field " : name } )
p [ ' filter ' ] = ' ' if ' filter ' not in args else args [ ' filter ' ]
# thread = threading.Thread(target=pseudonym.post, args=(p,))
# thread.start()
# if columns.tolist().index(name) == 0 :
# thread.join()
pseudonym . post ( * * p )
#
# let us submit the query
pass
@staticmethod
def post ( * * args ) :
"""
This function will submit a query to bigquery for insertion
"""
SQL = " " . join ( [ ' SELECT DISTINCT CAST( ' , args [ ' field ' ] , " AS STRING) AS values, COUNT(*) as counts FROM :dataset.:table :filter " ] ) . replace ( ' :dataset ' , args [ ' dataset ' ] )
SQL = SQL . replace ( ' :table ' , args [ ' table ' ] )
if args [ ' filter ' ] . strip ( ) != ' ' :
SQL = SQL . replace ( " :filter " , " WHERE " + args [ ' filter ' ] )
else :
SQL = SQL . replace ( " :filter " , " " )
def process ( self , * * args ) :
"""
: dataset
: table
: key
"""
pseudonym . apply ( * * args )
def decode ( self , * * args ) :
"""
This function should be able to take a pseudonymized data frame and convert it to original values
. . .
"""
pass
class Binary :
"""
This is a utility class to import and export a data to / from a binary matrix
"""
def __stream ( self , column ) :
"""
This function will convert a column into a binary matrix with the value - space representing each column of the resulting matrix
: column a column vector i . e every item is a row
"""
values = np . unique ( column )
values . sort ( )
row_count , col_count = column . size , values . size
matrix = [ np . zeros ( col_count ) for i in np . arange ( row_count ) ]
#
# let's create a binary matrix of the feature that was passed in
# The indices of the matrix are inspired by classical x,y axis
for yi in np . arange ( row_count ) :
value = column [ yi ]
xi = np . where ( values == value ) [ 0 ] [ 0 ] #-- column index
matrix [ yi ] [ xi ] = 1
return matrix
def Export ( self , df ) :
"""
This function will convert a data - frame to a binary matrix
: return _map , matrix
"""
#
# This will give us a map of how each column was mapped to a bitstream
_map = df . apply ( lambda column : self . __stream ( column . values ) , axis = 0 )
#
# We will merge this to have a healthy matrix
_matrix = _map . apply ( lambda row : list ( list ( itertools . chain ( * row . values . tolist ( ) ) ) ) , axis = 1 )
_matrix = np . matrix ( [ list ( item ) for item in _matrix ] )
#
# let's format the map so we don't have an unreasonable amount of data
#
columns = _map . columns . tolist ( )
beg = 0
end = 0
_map = _map . loc [ 0 ]
_m = { }
for name in columns :
end + = _map [ name ] . size
_m [ name ] = { " start " : beg , " end " : end }
beg = end
return _m , _matrix
def Import ( self , df , values , _map ) :
"""
This function will convert a binary stream into a
: values original / pseudonymed values
: _map field map of the binary matrix
"""
r = pd . DataFrame ( None , columns = _map . keys ( ) )
for key in _map :
i = np . arange ( _map [ key ] [ ' start ' ] , _map [ key ] [ ' end ' ] )
columns = values [ key ]
r [ key ] = df [ i ] . apply ( lambda row : np . array ( columns ) [ row == 1 ] [ 0 ] , axis = 1 )
return r
pass
# has_basic = 'dataset' in SYS_ARGS.keys() and 'table' in SYS_ARGS.keys() and 'key' in SYS_ARGS.keys()
# has_action= 'export' in SYS_ARGS.keys() or 'pseudo' in SYS_ARGS.keys()
df = pd . DataFrame ( { " fname " : [ ' james ' , ' james ' , ' steve ' , ' kevin ' , ' kevin ' ] , " lname " : [ " bond " , " dean " , " nyemba " , ' james ' , ' johnson ' ] } )
df [ ' age ' ] = ( np . random . sample ( df . shape [ 0 ] ) * 100 ) . astype ( np . int32 )
if __name__ == ' __main__ ' :
"""
Run the program from the command line passing the following mandatory arguments
python bridge . py < [ - - pseudo | - - export < PATH > ] > - - dataset < dataset > - - table < tablename > [ - - filter < table filter > ]
- - pseudo will create pseudonyms for a given
- - export will export data to a specified location
"""
has_basic = ' dataset ' in SYS_ARGS . keys ( ) and ' table ' in SYS_ARGS . keys ( ) and ' key ' in SYS_ARGS . keys ( )
has_action = ' export ' in SYS_ARGS . keys ( ) or ' pseudo ' in SYS_ARGS . keys ( )
if has_basic and has_action :
builder = Builder ( )
if ' export ' in SYS_ARGS :
print ( )
print ( " exporting .... " )
if not os . path . exists ( SYS_ARGS [ ' export ' ] ) :
os . mkdir ( SYS_ARGS [ ' export ' ] )
SQL = builder . encode ( * * SYS_ARGS )
credentials = service_account . Credentials . from_service_account_file ( SYS_ARGS [ ' key ' ] )
df = pd . read_gbq ( SQL , credentials = credentials , dialect = ' standard ' )
FILENAME = os . sep . join ( [ SYS_ARGS [ ' export ' ] , SYS_ARGS [ ' table ' ] + ' .csv ' ] )
#
# This would allow us to export it to wherever we see fit
print ( FILENAME )
df . to_csv ( FILENAME , index = False )
elif ' pseudo ' in SYS_ARGS :
builder . process ( * * SYS_ARGS )
else :
print ( " " )
print ( " has basic " , has_basic )
print ( " has action " , has_action )
# pseudonym.apply(table='person',dataset='wgan_original',key='./curation-test-2.json')
# args = {"dataset":"wgan_original","table":"observation","key":"./curation-test-2.json"}
# builder = Builder()
# # builder.encode(dataset='wgan_original',table='person',key='./curation-test-2.json')
# builder.process(**args)