parent
							
								
									4c297679dc
								
							
						
					
					
						commit
						89ed5d5d46
					
				@ -0,0 +1,159 @@
 | 
				
			|||||||
 | 
					"""
 | 
				
			||||||
 | 
					This file will perform basic tasks to finalize the GAN process by performing the following :
 | 
				
			||||||
 | 
					    - basic stats & analytics
 | 
				
			||||||
 | 
					    - rebuild io to another dataset
 | 
				
			||||||
 | 
					"""
 | 
				
			||||||
 | 
					import pandas as pd
 | 
				
			||||||
 | 
					import numpy as np
 | 
				
			||||||
 | 
					from google.oauth2 import service_account
 | 
				
			||||||
 | 
					from google.cloud import bigquery as bq
 | 
				
			||||||
 | 
					from data.params import SYS_ARGS 
 | 
				
			||||||
 | 
					import json
 | 
				
			||||||
 | 
					class Analytics :
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    This class will compile basic analytics about a given dataset i.e compare original/synthetic
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    @staticmethod
 | 
				
			||||||
 | 
					    def distribution(**args):
 | 
				
			||||||
 | 
					        context = args['context']
 | 
				
			||||||
 | 
					        df = args['data']
 | 
				
			||||||
 | 
					        #
 | 
				
			||||||
 | 
					        #-- This data frame counts unique values for each feature (space)
 | 
				
			||||||
 | 
					        df_counts = pd.DataFrame(df.apply(lambda col: col.unique().size),columns=['counts']).T  # unique counts
 | 
				
			||||||
 | 
					        #
 | 
				
			||||||
 | 
					        #-- Get the distributions for common values
 | 
				
			||||||
 | 
					        #
 | 
				
			||||||
 | 
					        names   = [name for name in df_counts.columns.tolist() if name.endswith('_io') == False]
 | 
				
			||||||
 | 
					        ddf     = df.apply(lambda col: pd.DataFrame(col.values,columns=[col.name]).groupby([col.name]).size() ).fillna(0)
 | 
				
			||||||
 | 
					        ddf[context] = ddf.index
 | 
				
			||||||
 | 
					          
 | 
				
			||||||
 | 
					        pass
 | 
				
			||||||
 | 
					    def distance(**args):
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        This function will measure the distance between 
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        df      = args['data']
 | 
				
			||||||
 | 
					        names   = [name for name in df_counts.columns.tolist() if name.endswith('_io') == False]
 | 
				
			||||||
 | 
					class Utils :
 | 
				
			||||||
 | 
					    class get :
 | 
				
			||||||
 | 
					        @staticmethod
 | 
				
			||||||
 | 
					        def config(**args) :
 | 
				
			||||||
 | 
					            contexts    = args['contexts'].split(',') if type(args['contexts']) == str else args['contexts']
 | 
				
			||||||
 | 
					            pipeline    = args['pipeline']
 | 
				
			||||||
 | 
					            return [ item for item in pipeline if item['context'] in contexts]
 | 
				
			||||||
 | 
					        @staticmethod
 | 
				
			||||||
 | 
					        def sql(**args) :
 | 
				
			||||||
 | 
					            """
 | 
				
			||||||
 | 
					            This function is intended to build SQL query for the remainder of the table that was not synthesized
 | 
				
			||||||
 | 
					            :config configuration entries
 | 
				
			||||||
 | 
					            :from   source of the table name
 | 
				
			||||||
 | 
					            :dataset    name of the source dataset
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            """
 | 
				
			||||||
 | 
					            SQL = ["SELECT * FROM :from "]
 | 
				
			||||||
 | 
					            SQL_FILTER = []
 | 
				
			||||||
 | 
					            NO_FILTERS_FOUND = True
 | 
				
			||||||
 | 
					            pipeline = Utils.get.config(**args)
 | 
				
			||||||
 | 
					            REVERSE_QUALIFIER = {'IN':'NOT IN','NOT IN':'IN','=':'<>','<>':'='}
 | 
				
			||||||
 | 
					            for item in pipeline :
 | 
				
			||||||
 | 
					                
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                if 'filter' in item :
 | 
				
			||||||
 | 
					                    if NO_FILTERS_FOUND :
 | 
				
			||||||
 | 
					                        NO_FILTERS_FOUND = False
 | 
				
			||||||
 | 
					                        SQL  += ['WHERE']
 | 
				
			||||||
 | 
					                    #
 | 
				
			||||||
 | 
					                    # Let us load the filter in the SQL Query
 | 
				
			||||||
 | 
					                    FILTER = item['filter']
 | 
				
			||||||
 | 
					                    QUALIFIER = REVERSE_QUALIFIER[FILTER['qualifier'].upper()]
 | 
				
			||||||
 | 
					                    SQL_FILTER += [" ".join([FILTER['field'], QUALIFIER,'(',FILTER['value'],')'])]
 | 
				
			||||||
 | 
					            src = ".".join([args['dataset'],args['from']])
 | 
				
			||||||
 | 
					            SQL += [" AND ".join(SQL_FILTER)]
 | 
				
			||||||
 | 
					            #
 | 
				
			||||||
 | 
					            # let's pull the field schemas out of the table definition
 | 
				
			||||||
 | 
					            #
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            return " ".join(SQL).replace(":from",src)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					def mk(**args) :
 | 
				
			||||||
 | 
					    dataset  = args['dataset']
 | 
				
			||||||
 | 
					    client  = args['client'] if 'client' in args else bq.Client.from_service_account_file(args['private_key'])
 | 
				
			||||||
 | 
					    #
 | 
				
			||||||
 | 
					    # let us see if we have a dataset handy here 
 | 
				
			||||||
 | 
					    #
 | 
				
			||||||
 | 
					    datasets = list(client.list_datasets())
 | 
				
			||||||
 | 
					    found = [item for item in datasets if item.dataset_id == dataset]
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    if not found :
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return client.create_dataset(dataset)
 | 
				
			||||||
 | 
					    return found[0] 
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					def move (**args):
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    This function will move a table from the synthetic dataset into a designated location
 | 
				
			||||||
 | 
					    This is the simplest case for finalizing a synthetic data set
 | 
				
			||||||
 | 
					    :private_key        
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    private_key = args['private_key']
 | 
				
			||||||
 | 
					    client      = bq.Client.from_service_account_json(private_key)
 | 
				
			||||||
 | 
					    config      = Utils.get.config(**args)
 | 
				
			||||||
 | 
					    dataset     = args['dataset']
 | 
				
			||||||
 | 
					    SQL         = [ ''.join(["SELECT * FROM io.",item['context'],'_full_io']) for item in config]
 | 
				
			||||||
 | 
					    SQL         += [Utils.get.sql(**args)]
 | 
				
			||||||
 | 
					    SQL         =  ('\n UNION ALL \n'.join(SQL).replace(':dataset','io'))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #
 | 
				
			||||||
 | 
					    # At this point we have gathered all the tables in the io folder and we should now see if we need to merge with the remainder from the original table
 | 
				
			||||||
 | 
					    #
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    odataset    = mk(dataset=dataset+'_io',client=client)
 | 
				
			||||||
 | 
					    # SQL =       "SELECT * FROM io.:context_full_io".replace(':context',context)
 | 
				
			||||||
 | 
					    config = bq.QueryJobConfig()
 | 
				
			||||||
 | 
					    config.destination = client.dataset(odataset.dataset_id).table(args['from'])
 | 
				
			||||||
 | 
					    config.use_query_cache = True
 | 
				
			||||||
 | 
					    config.allow_large_results = True
 | 
				
			||||||
 | 
					    config.priority = 'INTERACTIVE'
 | 
				
			||||||
 | 
					    #
 | 
				
			||||||
 | 
					    #
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema
 | 
				
			||||||
 | 
					    fields = [" ".join(["CAST (",item.name,"AS",item.field_type.replace("INTEGER","INT64").replace("FLOAT","FLOAT64"),") ",item.name]) for item in schema]
 | 
				
			||||||
 | 
					    SQL = SQL.replace("*"," , ".join(fields))
 | 
				
			||||||
 | 
					    # print (SQL)
 | 
				
			||||||
 | 
					    out = client.query(SQL,location='US',job_config=config)
 | 
				
			||||||
 | 
					    print (dir (out))
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import pandas as pd
 | 
				
			||||||
 | 
					import numpy as np
 | 
				
			||||||
 | 
					from google.oauth2 import service_account
 | 
				
			||||||
 | 
					import json
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# path = '../curation-prod.json'
 | 
				
			||||||
 | 
					# credentials = service_account.Credentials.from_service_account_file(path)
 | 
				
			||||||
 | 
					# df = pd.read_gbq("SELECT * FROM io.icd10_partial_io",credentials=credentials,dialect='standard')
 | 
				
			||||||
 | 
					f = open('config.json')
 | 
				
			||||||
 | 
					config = json.loads(f.read())
 | 
				
			||||||
 | 
					args = config['pipeline']
 | 
				
			||||||
 | 
					f.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					if __name__ == '__main__' :
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    Usage :
 | 
				
			||||||
 | 
					        finalize --<move|stats> --contexts <c1,c2,...c3> --from <table>
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    if 'move' in SYS_ARGS :
 | 
				
			||||||
 | 
					        table = SYS_ARGS['from']
 | 
				
			||||||
 | 
					        contexts = [item['context'] for item in config['pipeline'] if item['from'] == args['from']]
 | 
				
			||||||
 | 
					        args = dict(config,**{"private_key":"../curation-prod.json"})
 | 
				
			||||||
 | 
					        args = dict(args,**SYS_ARGS)
 | 
				
			||||||
 | 
					        args['contexts'] = contexts
 | 
				
			||||||
 | 
					        move(**args)
 | 
				
			||||||
					Loading…
					
					
				
		Reference in new issue