diff --git a/data/maker/__init__.py b/data/maker/__init__.py index a7d8d69..bf388a6 100644 --- a/data/maker/__init__.py +++ b/data/maker/__init__.py @@ -11,13 +11,15 @@ This package is designed to generate synthetic data from a dataset from an origi import pandas as pd import numpy as np import data.gan as gan -from transport import factory +import transport from data.bridge import Binary import threading as thread from data.maker import prepare import copy import os import json +from multiprocessing import Process, RLock + class ContinuousToDiscrete : ROUND_UP = 2 @@ -101,7 +103,7 @@ def train (**_args): else: args['store']['doc'] = _args['context'] - logger = factory.instance(**args['store']) + logger = transport.factory.instance(**args['store']) args['logger'] = logger for key in _inputhandler._map : @@ -193,4 +195,173 @@ def generate(**_args): candidates = handler.apply(candidates=args['candidates']) return [_inputhandler.revert(matrix=_matrix) for _matrix in candidates] - + +class Learner(Process): + def __init__(self,**_args): + + + super(Learner, self).__init__() + if 'gpu' in _args : + print (_args['gpu']) + os.environ['CUDA_VISIBLE_DEVICES'] = str(_args['gpu']) + self.gpu = int(_args['gpu']) + else: + self.gpu = None + self.info = _args['info'] + self.columns = self.info['columns'] if 'columns' in self.info else None + self.store = _args['store'] + if 'network_args' not in _args : + self.network_args ={ + 'context':_args['context'] if 'context' in _args else 'GENERAL', + 'logs':_args['logpath'] if 'logpath' in _args else 'logs', + 'max_epochs':int(_args['epochs']) if 'epochs' in _args else 2, + 'batch_size':int (_args['batch']) if 'batch' in _args else 2000 + } + else: + self.network_args = _args['network_args'] + self._encoder = None + self._map = None + self._df = _args['data'] if 'data' in _args else None + # + # @TODO: allow for verbose mode so we have a sens of what is going on within the newtork + # + + # self.logpath= _args['logpath'] if 'logpath' in _args else 'logs' + # sel.max_epoc + def get_schema(self): + return [{'name':self._df.dtypes.index.tolist()[i],'type':self._df.dtypes.astype(str).tolist()[i]}for i in range(self._df.dtypes.shape[0])] + def initalize(self): + reader = transport.factory.instance(**self.store['source']) + _read_args= self.info + if self._df is None : + self._df = reader.read(**_read_args) + columns = self.columns if self.columns else self._df.columns + # + # convert the data to binary here ... + + _args = {"schema":self.get_schema(),"data":self._df,"columns":columns} + if self._map : + _args['map'] = self._map + self._encoder = prepare.Input(**_args) +class Trainer(Learner): + """ + This will perform training using a GAN + """ + def __init__(self,**_args): + super().__init__(**_args) + # self.info = _args['info'] + self.limit = int(_args['limit']) if 'limit' in _args else None + self.name = _args['name'] + self.autopilot = _args['autopilot'] if 'autopilot' in _args else False + self.generate = None + self.candidates = int(_args['candidates']) if 'candidates' in _args else 1 + def run(self): + self.initalize() + _space,_matrix = self._encoder.convert() + + _args = self.network_args + if self.gpu : + _args['gpu'] = self.gpu + _args['real'] = _matrix + _args['candidates'] = self.candidates + # + # At this point we have the binary matrix, we can initiate training + # + + gTrain = gan.Train(**_args) + gTrain.apply() + + writer = transport.factory.instance(provider='file',context='write',path=os.sep.join([gTrain.out_dir,'map.json'])) + writer.write(self._encoder._map,overwrite=True) + writer.close() + + # + # @TODO: At this point we need to generate another some other objects + # + _args = {"network_args":self.network_args,"store":self.store,"info":self.info,"candidates":self.candidates,"data":self._df} + if self.gpu : + _args['gpu'] = self.gpu + g = Generator(**_args) + # g.run() + self.generate = g + if self.autopilot : + self.generate.run() + def generate (self): + if self.autopilot : + print( "Autopilot is set ... No need to call this function") + else: + raise Exception( "Autopilot has not been, Wait till training is finished. Use is_alive function on process object") + +class Generator (Learner): + def __init__(self,**_args): + super().__init__(**_args) + # + # We need to load the mapping information for the space we are working with ... + # + self.network_args['candidates'] = int(_args['candidates']) if 'candidates' in _args else 1 + filename = os.sep.join([self.network_args['logs'],'output',self.network_args['context'],'map.json']) + file = open(filename) + self._map = json.loads(file.read()) + file.close() + def run(self): + self.initalize() + # + # The values will be returned because we have provided _map information from the constructor + # + values,_matrix = self._encoder.convert() + _args = self.network_args + _args['map'] = self._map + _args['values'] = np.array(values) + _args['row_count'] = self._df.shape[0] + + gHandler = gan.Predict(**_args) + gHandler.load_meta(columns=None) + _iomatrix = gHandler.apply() + _candidates= [ self._encoder.revert(matrix=_item) for _item in _iomatrix] + self.post(_candidates) + def appriximate(self,_df): + _columns = self.info['approximate'] + _schema = {} + for _info in self.get_schema() : + _schema[_info['name']] = _info['type'] + + + for name in _columns : + batches = np.array_split(_df[name].values,10) + x = [] + for values in batches : + _values = np.random.dirichlet(values) + x += list(values + _values )if np.random.randint(0,2) else list(values - _values) + _df[name] = np.int64(x) if 'int' in _schema[name] else np.float64(x) + return _df + def format(self,_df): + pass + def post(self,_candidates): + + _store = self.store['target'] if 'target' in self.store else {'provider':'console'} + _store['lock'] = True + writer = transport.factory.instance(**_store) + + for _iodf in _candidates : + _df = self._df.copy() + _df[self.columns] = _iodf[self.columns] + if 'approximate' in self.info : + + _df = self.appriximate(_df) + writer.write(_df,schema=self.get_schema()) + pass +class factory : + _infocache = {} + @staticmethod + def instance(**_args): + """ + An instance of an object that trains and generates candidate datasets + :param gpu (optional) index of the gpu to be used if using one + :param store {source,target} if no target is provided console will be output + :param epochs (default 2) number of epochs to train + :param candidates(default 1) number of candidates to generate + :param info {columns,sql,from} + :param autopilot will generate output automatically + :param batch (default 2k) size of the batch + """ + return Trainer(**_args) \ No newline at end of file diff --git a/data/maker/prepare/__init__.py b/data/maker/prepare/__init__.py index 6e67cb2..478d435 100644 --- a/data/maker/prepare/__init__.py +++ b/data/maker/prepare/__init__.py @@ -128,7 +128,7 @@ class Input : cols, _matrix = self.tobinary(_df[name],values) _beg,_end = i,i+len(cols) if name not in self._map : - self._map[name] = {"beg":_beg,"end":_end ,"values":cols} + self._map[name] = {"beg":_beg,"end":_end ,"values":cols.tolist()} i += len(cols) if not _m.shape[0]: _m = _matrix ; @@ -196,7 +196,7 @@ class Input : # In the advent the sample rows do NOT have the values of the cols = rows.unique() cols = np.array(cols) - row_count = len(rows) + row_count = np.int64(len(rows)) # if 'GPU' not in os.environ : # _matrix = np.zeros([row_count,cols.size],dtype=int) # diff --git a/finalize.py b/finalize.py deleted file mode 100644 index d420d7d..0000000 --- a/finalize.py +++ /dev/null @@ -1,240 +0,0 @@ -#!/usr/bin/env python3 -""" -This file will perform basic tasks to finalize the GAN process by performing the following : - - basic stats & analytics - - rebuild io to another dataset -""" -import pandas as pd -import numpy as np -from multiprocessing import Process, Lock -from google.oauth2 import service_account -from google.cloud import bigquery as bq -import transport -from data.params import SYS_ARGS -import json - -class Analytics : - """ - This class will compile basic analytics about a given dataset i.e compare original/synthetic - """ - @staticmethod - def distribution(**args): - context = args['context'] - df = args['data'] - # - #-- This data frame counts unique values for each feature (space) - df_counts = pd.DataFrame(df.apply(lambda col: col.unique().size),columns=['counts']).T # unique counts - # - #-- Get the distributions for common values - # - names = [name for name in df_counts.columns.tolist() if name.endswith('_io') == False] - ddf = df.apply(lambda col: pd.DataFrame(col.values,columns=[col.name]).groupby([col.name]).size() ).fillna(0) - ddf[context] = ddf.index - - pass - def distance(**args): - """ - This function will measure the distance between - """ - pass -class Utils : - @staticmethod - def log(**args): - logger = transport.factory.instance(type="mongo.MongoWriter",args={"dbname":"aou","doc":"logs"}) - logger.write(args) - logger.close() - class get : - @staticmethod - def pipeline(table,path) : - # contexts = args['contexts'].split(',') if type(args['contexts']) == str else args['contexts'] - config = json.loads((open(path)).read()) - pipeline = config['pipeline'] - # return [ item for item in pipeline if item['context'] in contexts] - pipeline = [item for item in pipeline if 'from' in item and item['from'].strip() == table] - Utils.log(module=table,action='init',input={"pipeline":pipeline}) - return pipeline - @staticmethod - def sql(**args) : - """ - This function is intended to build SQL query for the remainder of the table that was not synthesized - :config configuration entries - :from source of the table name - :dataset name of the source dataset - - """ - SQL = ["SELECT * FROM :from "] - SQL_FILTER = [] - NO_FILTERS_FOUND = True - # pipeline = Utils.get.config(**args) - pipeline = args['pipeline'] - REVERSE_QUALIFIER = {'IN':'NOT IN','NOT IN':'IN','=':'<>','<>':'='} - for item in pipeline : - - - if 'filter' in item : - if NO_FILTERS_FOUND : - NO_FILTERS_FOUND = False - SQL += ['WHERE'] - # - # Let us load the filter in the SQL Query - FILTER = item['filter'] - QUALIFIER = REVERSE_QUALIFIER[FILTER['qualifier'].upper()] - SQL_FILTER += [" ".join([FILTER['field'], QUALIFIER,'(',FILTER['value'],')']).replace(":dataset",args['dataset'])] - src = ".".join([args['dataset'],args['from']]) - SQL += [" AND ".join(SQL_FILTER)] - # - # let's pull the field schemas out of the table definition - # - Utils.log(module=args['from'],action='sql',input={"sql":" ".join(SQL) }) - return " ".join(SQL).replace(":from",src) - - -def mk(**args) : - dataset = args['dataset'] - client = args['client'] if 'client' in args else bq.Client.from_service_account_file(args['private_key']) - # - # let us see if we have a dataset handy here - # - datasets = list(client.list_datasets()) - found = [item for item in datasets if item.dataset_id == dataset] - - if not found : - - return client.create_dataset(dataset) - return found[0] - -def move (args): - """ - This function will move a table from the synthetic dataset into a designated location - This is the simplest case for finalizing a synthetic data set - :private_key - """ - pipeline = Utils.get.pipeline(args['from'],args['config']) - _args = json.loads((open(args['config'])).read()) - _args['pipeline'] = pipeline - # del _args['pipeline'] - args = dict(args,**_args) - # del args['pipeline'] - # private_key = args['private_key'] - client = bq.Client.from_service_account_json(args['private_key']) - - dataset = args['dataset'] - if pipeline : - SQL = [ ''.join(["SELECT * FROM io.",item['context'],'_full_io']) for item in pipeline] - SQL += [Utils.get.sql(**args)] - SQL = ('\n UNION ALL \n'.join(SQL).replace(':dataset','io')) - else: - # - # moving a table to a designated location - tablename = args['from'] - if 'sql' not in args : - SQL = "SELECT * FROM :dataset.:table" - else: - SQL = args['sql'] - SQL = SQL.replace(":dataset",dataset).replace(":table",tablename) - Utils.log(module=args['from'],action='sql',input={'sql':SQL}) - # - # At this point we have gathered all the tables in the io folder and we should now see if we need to merge with the remainder from the original table - # - - - - odataset = mk(dataset=dataset+'_io',client=client) - # SQL = "SELECT * FROM io.:context_full_io".replace(':context',context) - config = bq.QueryJobConfig() - config.destination = client.dataset(odataset.dataset_id).table(args['from']) - config.use_query_cache = True - config.allow_large_results = True - config.priority = 'INTERACTIVE' - # - # - - schema = client.get_table(client.dataset(args['dataset']).table(args['from'])).schema - fields = [" ".join(["CAST (",item.name,"AS",item.field_type.replace("INTEGER","INT64").replace("FLOAT","FLOAT64"),") ",item.name]) for item in schema] - SQL = SQL.replace("*"," , ".join(fields)) - # print (SQL) - out = client.query(SQL,location='US',job_config=config) - Utils.log(module=args['from'],action='move',input={'job':out.job_id}) - return (out.job_id) - - - - -import pandas as pd -import numpy as np -from google.oauth2 import service_account -import json - -# path = '../curation-prod.json' -# credentials = service_account.Credentials.from_service_account_file(path) -# df = pd.read_gbq("SELECT * FROM io.icd10_partial_io",credentials=credentials,dialect='standard') -filename = 'config.json' if 'config' not in SYS_ARGS else SYS_ARGS['config'] -f = open(filename) -config = json.loads(f.read()) -args = config['pipeline'] -f.close() - - -if __name__ == '__main__' : - """ - Usage : - finalize -- --contexts --from - """ - - if 'move' in SYS_ARGS : - - if 'init' in SYS_ARGS : - dep = config['dep'] if 'dep' in config else {} - info = [] - - if 'queries' in dep : - info += dep['queries'] - print ('________') - if 'tables' in dep : - info += dep['tables'] - args = {} - jobs = [] - for item in info : - args = {} - if type(item) == str : - args['from'] = item - name = item - else: - args = item - name = item['from'] - args['config'] = SYS_ARGS['config'] - # args['pipeline'] = [] - job = Process(target=move,args=(args,)) - job.name = name - jobs.append(job) - job.start() - - - # while len(jobs) > 0 : - # jobs = [job for job in jobs if job.is_alive()] - # time.sleep(1) - - - else: - move(SYS_ARGS) - # # table = SYS_ARGS['from'] - # # args = dict(config,**{"private_key":"../curation-prod.json"}) - # args = dict(args,**SYS_ARGS) - # contexts = [item['context'] for item in config['pipeline'] if item['from'] == SYS_ARGS['from']] - # log = [] - # if contexts : - # args['contexts'] = contexts - # log = move(**args) - - # else: - # tables = args['from'].split(',') - # for name in tables : - # name = name.strip() - # args['from'] = name - # log += [move(**args)] - # print ("\n".join(log)) - - - - else: - print ("NOT YET READY !") \ No newline at end of file diff --git a/pipeline.py b/pipeline.py index 5fb62fe..9d095d9 100644 --- a/pipeline.py +++ b/pipeline.py @@ -486,7 +486,7 @@ class Components : # Let us merge the dataset here and and have a comprehensive dataset _df = pd.DataFrame.join(df,_df) - _params = {'data':_df,'store' : ostore} + _params = {'data':_df,'store' : ostore,'from':args['from']} if _schema : _params ['schema'] = _schema _info = {"module":"gan-prep","action":"write","input":{"rows":_df.shape[0],"cols":_df.shape[1]}}