From 11a247d0ea5555e5e7360be0172b8a5a1d24c3b0 Mon Sep 17 00:00:00 2001 From: "Steve L. Nyemba -- The Architect" Date: Mon, 22 Oct 2018 13:21:40 -0500 Subject: [PATCH] bug fix and enhancements --- src/pandas_risk.py | 87 +++++++++++++++++++++++----------------- src/risk.py | 98 ++++++++++++++++++++++++++++++++++------------ 2 files changed, 125 insertions(+), 60 deletions(-) diff --git a/src/pandas_risk.py b/src/pandas_risk.py index 282d289..355c2fb 100644 --- a/src/pandas_risk.py +++ b/src/pandas_risk.py @@ -40,61 +40,76 @@ class deid : id = args['id'] if 'quasi_id' in args : - num_runs = 1 - columns = list(set(args['quasi_id'])- set(id) ) - else : - num_runs = args['num_runs'] if 'num_runs' in args else 100 - columns = list(set(self._df.columns) - set([id])) - r = pd.DataFrame() + num_runs = 1 + columns = list(set(args['quasi_id'])- set(id) ) + else : + num_runs = args['num_runs'] if 'num_runs' in args else 100 + columns = list(set(self._df.columns) - set([id])) + r = pd.DataFrame() k = len(columns) + N = self._df.shape[0] + tmp = self._df.fillna(' ') + np.random.seed(1) for i in range(0,num_runs) : + # # let's chose a random number of columns and compute marketer and prosecutor risk # Once the fields are selected we run a groupby clause # - if 'quasi_id' not in args : - n = np.random.randint(2,k) #-- number of random fields we are picking - ii = np.random.choice(k,n,replace=False) - cols = np.array(columns)[ii].tolist() + if 'quasi_id' not in args : + if 'field_count' in args : + # + # We chose to limit how many fields we passin + n = np.random.randint(2,int(args['field_count'])) #-- number of random fields we are picking + else : + n = np.random.randint(2,k) #-- number of random fields we are picking + ii = np.random.choice(k,n,replace=False) + cols = np.array(columns)[ii].tolist() + policy = np.zeros(k) + policy [ii] = 1 + policy = pd.DataFrame(policy).T + else: - cols = columns - n = len(cols) - x_ = self._df.groupby(cols).count()[id].values + cols = columns + policy = np.ones(k) + policy = pd.DataFrame(policy).T + n = len(cols) + policy.columns = columns + N = tmp.shape[0] + + x_ = tmp.groupby(cols).size().values + # print [id,i,n,k,self._df.groupby(cols).count()] r = r.append( pd.DataFrame( [ { - "selected":n, + "group_count":x_.size, + "patient_count":N, + "field_count":n, "marketer": x_.size / np.float64(np.sum(x_)), "prosecutor":1 / np.float64(np.min(x_)) } ] - ) + ).join(policy) ) - g_size = x_.size - n_ids = np.float64(np.sum(x_)) + # g_size = x_.size + # n_ids = np.float64(np.sum(x_)) + # sql = """ + # SELECT COUNT(g_size) as group_count, :patient_count as patient_count,SUM(g_size) as rec_count, COUNT(g_size)/SUM(g_size) as marketer, 1/ MIN(g_size) as prosecutor, :n as field_count + # FROM ( + # SELECT COUNT(*) as g_size,:key,:fields + # FROM :full_name + # GROUP BY :fields + # """.replace(":n",str(n)).replace(":fields",",".join(cols)).replace(":key",id).replace(":patient_count",str(N)) + # r.append(self._df.query(sql.replace("\n"," ").replace("\r"," ") )) return r -import pandas as pd -import numpy as np -from io import StringIO -csv = """ -id,sex,age,profession,drug_test -1,M,37,doctor,- -2,F,28,doctor,+ -3,M,37,doctor,- -4,M,28,doctor,+ -5,M,28,doctor,- -6,M,37,doctor,- -""" -f = StringIO() -f.write(unicode(csv)) -f.seek(0) -df = pd.read_csv(f) -print df.deid.risk(id='id',num_runs=2) -print " *** " -print df.deid.risk(id='id',quasi_id=['sex','age','profession']) +# df = pd.read_gbq("select * from deid_risk.risk_30k",private_key='/home/steve/dev/google-cloud-sdk/accounts/curation-test.json') +# r = df.deid.risk(id='person_id',num_runs=200) +# print r[['field_count','patient_count','marketer','prosecutor']] + + diff --git a/src/risk.py b/src/risk.py index 81784a3..0719fdb 100644 --- a/src/risk.py +++ b/src/risk.py @@ -51,9 +51,10 @@ class utils : r = [] ref = client.dataset(dataset) tables = list(client.list_tables(ref)) + TERMS = ['type','unit','count','refills','stop','supply','quantity'] for table in tables : - if table.table_id.strip() in ['people_seed']: + if table.table_id.strip() in ['people_seed','measurement','drug_exposure','procedure_occurrence','visit_occurrence','condition_occurrence','device_exposure']: print ' skiping ...' continue ref = table.reference @@ -62,12 +63,15 @@ class utils : rows = table.num_rows if rows == 0 : continue - names = [f.name for f in schema] + + names = [f.name for f in schema if len (set(TERMS) & set(f.name.strip().split("_"))) == 0 ] + x = list(set(names) & set([key])) if x : full_name = ".".join([dataset,table.table_id]) r.append({"name":table.table_id,"fields":names,"row_count":rows,"full_name":full_name}) return r + def get_field_name(self,alias,field_name,index): """ This function will format the a field name given an index (the number of times it has occurred in projection) @@ -82,6 +86,25 @@ class utils : return ".".join(name)+" AS :field_name:index".replace(":field_name",field_name).replace(":index",str(index)) else: return ".".join(name) + def get_filtered_table(self,table,key): + """ + This function will return a table with a single record per individual patient + """ + return """ + + SELECT :table.* FROM ( + SELECT row_number() over () as top, * FROM :full_name ) as :table + + + INNER JOIN ( + SELECT MAX(top) as top, :key FROM ( + SELECT row_number() over () as top,:key from :full_name ) GROUP BY :key + + )as filter + ON filter.top = :table.top and filter.:key = :table.:key + + """.replace(":key",key).replace(":full_name",table['full_name']).replace(":table",table['name']) + def get_sql(self,**args): """ This function will generate that will join a list of tables given a key and a limit of records @@ -91,7 +114,7 @@ class utils : """ tables = args['tables'] key = args['key'] - limit = args['limit'] if 'limit' in args else 300000 + limit = args['limit'] if 'limit' in args else 10000 limit = str(limit) SQL = [ """ @@ -105,9 +128,10 @@ class utils : alias= table['name'] index = tables.index(table) sql_ = """ - (select * from :name limit :limit) as :alias + (select * from :name ) as :alias """.replace(":limit",limit) - sql_ = sql_.replace(":name",name).replace(":alias",alias) + # sql_ = " ".join(["(",self.get_filtered_table(table,key)," ) as :alias"]) + sql_ = sql_.replace(":name",name).replace(":alias",alias).replace(":limit",limit) fields += [self.get_field_name(alias,field_name,index) for field_name in table['fields'] if field_name != key or (field_name==key and tables.index(table) == 0) ] if tables.index(table) > 0 : join = """ @@ -139,20 +163,23 @@ class risk : fields = list(set(table['fields']) - set([key])) #-- We need to select n-fields max 64 k = len(fields) - n = np.random.randint(2,64) #-- how many random fields are we processing + if 'field_count' in args : + n = np.random.randint(2, int(args['field_count']) ) #-- number of random fields we are picking + else: + n = np.random.randint(2,k) #-- how many random fields are we processing ii = np.random.choice(k,n,replace=False) stream = np.zeros(len(fields) + 1) - stream[ii] = 1 - stream = pd.DataFrame(stream.tolist()).T - stream.columns = args['table']['fields'] - fields = list(np.array(fields)[ii]) + stream[ii] = 1 + stream = pd.DataFrame(stream.tolist()).T + stream.columns = args['table']['fields'] + fields = list(np.array(fields)[ii]) sql = """ - SELECT COUNT(g_size) as group_count, COUNT( DISTINCT :key) as patient_count,SUM(g_size) as rec_count, COUNT(g_size)/SUM(g_size) as marketer, 1/ MIN(g_size) as prosecutor, :n as field_count + SELECT COUNT(g_size) as group_count,SUM(g_size) as patient_count, COUNT(g_size)/SUM(g_size) as marketer, 1/ MIN(g_size) as prosecutor, :n as field_count FROM ( - SELECT COUNT(*) as g_size,:key,:fields + SELECT COUNT(*) as g_size,:fields FROM :full_name - GROUP BY :key,:fields + GROUP BY :fields ) """.replace(":fields", ",".join(fields)).replace(":full_name",table['full_name']).replace(":key",key).replace(":n",str(n)) return {"sql":sql,"stream":stream} @@ -161,7 +188,7 @@ class risk : -if 'action' in SYS_ARGS and SYS_ARGS['action'] in ['create','compute'] : +if 'action' in SYS_ARGS and SYS_ARGS['action'] in ['create','compute','migrate'] : path = SYS_ARGS['path'] client = bq.Client.from_service_account_json(path) @@ -195,26 +222,49 @@ if 'action' in SYS_ARGS and SYS_ARGS['action'] in ['create','compute'] : r = client.query(create_sql,location='US',job_config=job) print [r.job_id,' ** ',r.state] + elif SYS_ARGS['action'] == 'migrate' : + # + # + + o_dataset = SYS_ARGS['o_dataset'] + for table in tables: + sql = " ".join(["SELECT ",",".join(table['fields']) ," FROM (",mytools.get_filtered_table(table,key),") as ",table['name']]) + + job = bq.QueryJobConfig() + job.destination = client.dataset(o_dataset).table(table['name']) + job.use_query_cache = True + job.allow_large_results = True + job.priority = 'INTERACTIVE' + job.time_partitioning = bq.table.TimePartitioning(type_=bq.table.TimePartitioningType.DAY) + + r = client.query(sql,location='US',job_config=job) + + print [table['full_name'],' ** ',r.job_id,' ** ',r.state] + + + pass else: # # - tables = [tab for tab in tables if tab['name'] == SYS_ARGS['table'] ] - limit = int(SYS_ARGS['limit']) if 'limit' in SYS_ARGS else 1 + tables = [tab for tab in tables if tab['name'] == SYS_ARGS['table'] ] + limit = int(SYS_ARGS['limit']) if 'limit' in SYS_ARGS else 1 if tables : - risk = risk() - df = pd.DataFrame() - dfs = pd.DataFrame() + risk= risk() + df = pd.DataFrame() + dfs = pd.DataFrame() + np.random.seed(1) for i in range(0,limit) : r = risk.get_sql(key=SYS_ARGS['key'],table=tables[0]) - sql = r['sql'] - dfs = dfs.append(r['stream']) - df = df.append(pd.read_gbq(query=sql,private_key=path,dialect='standard')) + sql = r['sql'] + dfs = dfs.append(r['stream'],sort=True) + df = df.append(pd.read_gbq(query=sql,private_key=path,dialect='standard').join(dfs)) + # df = df.join(dfs,sort=True) df.to_csv(SYS_ARGS['table']+'.csv') - dfs.to_csv(SYS_ARGS['table']+'_stream.csv') + # dfs.to_csv(SYS_ARGS['table']+'_stream.csv') print [i,' ** ',df.shape[0],pd.DataFrame(r['stream']).shape] time.sleep(2) - pass + else: print 'ERROR' pass