You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
699 lines
23 KiB
Python
699 lines
23 KiB
Python
#!/usr/bin/env python3
|
|
import json
|
|
from transport import factory
|
|
import numpy as np
|
|
import time
|
|
import os
|
|
from multiprocessing import Process, Lock
|
|
import pandas as pd
|
|
from google.oauth2 import service_account
|
|
from google.cloud import bigquery as bq
|
|
import data.maker
|
|
import copy
|
|
from data.params import SYS_ARGS
|
|
|
|
#
|
|
# The configuration array is now loaded and we will execute the pipe line as follows
|
|
|
|
class Components :
|
|
lock = Lock()
|
|
class KEYS :
|
|
PIPELINE_KEY = 'pipeline'
|
|
SQL_FILTER = 'filter'
|
|
@staticmethod
|
|
def get_filter (**args):
|
|
if args['qualifier'] == 'IN' :
|
|
return ' '.join([args['field'],args['qualifier'],'(',args['value'],')'])
|
|
else:
|
|
return ' '.join([args['field'],args['qualifier'],args['value']])
|
|
@staticmethod
|
|
def get_logger(**args) :
|
|
return factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
|
@staticmethod
|
|
def get(args):
|
|
"""
|
|
This function returns a data-frame provided a bigquery sql statement with conditions (and limits for testing purposes)
|
|
The function must be wrapped around a lambda this makes testing easier and changing data stores transparent to the rest of the code. (Vital when testing)
|
|
:sql basic sql statement
|
|
:condition optional condition and filters
|
|
"""
|
|
SQL = args['sql']
|
|
if Components.KEYS.SQL_FILTER in args :
|
|
FILTER_KEY = Components.KEYS.SQL_FILTER
|
|
SQL_FILTER = args[FILTER_KEY] if type(args[FILTER_KEY]) == list else [args[FILTER_KEY]]
|
|
# condition = ' '.join([args[FILTER_KEY]['field'],args[FILTER_KEY]['qualifier'],'(',args[FILTER_KEY]['value'],')'])
|
|
|
|
condition = ' AND '.join([Components.get_filter(**item) for item in SQL_FILTER])
|
|
SQL = " ".join([SQL,'WHERE',condition])
|
|
|
|
SQL = SQL.replace(':dataset',args['dataset']) #+ " LI "
|
|
|
|
if 'limit' in args :
|
|
SQL = SQL + ' LIMIT ' + args['limit']
|
|
#
|
|
# let's log the sql query that has been performed here
|
|
logger = factory.instance(type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
|
logger.write({"module":"bigquery","action":"read","input":{"sql":SQL}})
|
|
credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
|
|
df = pd.read_gbq(SQL,credentials=credentials,dialect='standard')
|
|
return df
|
|
|
|
# return lambda: pd.read_gbq(SQL,credentials=credentials,dialect='standard')[args['columns']].dropna()
|
|
@staticmethod
|
|
def split(X,MAX_ROWS=3,PART_SIZE=3):
|
|
|
|
return list(pd.cut( np.arange(X.shape[0]+1),PART_SIZE).categories)
|
|
def format_schema(self,schema):
|
|
_schema = {}
|
|
for _item in schema :
|
|
_type = int
|
|
_value = 0
|
|
if _item.field_type == 'FLOAT' :
|
|
_type =float
|
|
elif _item.field_type != 'INTEGER' :
|
|
_type = str
|
|
_value = ''
|
|
_schema[_item.name] = _type
|
|
return _schema
|
|
def get_ignore(self,**_args) :
|
|
if 'columns' in _args and 'data' in _args :
|
|
_df = _args['data']
|
|
terms = _args['columns']
|
|
return [name for name in _df.columns if np.sum( [int(field in name )for field in terms ]) ]
|
|
|
|
return []
|
|
def set_gpu(self,**_args) :
|
|
if 'gpu' in _args :
|
|
gpu = _args['gpu'] if type(_args['gpu']) != str else [_args['gpu']]
|
|
_index = str(gpu[0])
|
|
os.environ['CUDA_VISIBLE_DEVICES'] = _index
|
|
return gpu
|
|
else :
|
|
return None
|
|
def train(self,**args):
|
|
"""
|
|
This function will perform training on the basis of a given pointer that reads data
|
|
|
|
"""
|
|
schema = None
|
|
if 'file' in args :
|
|
|
|
df = pd.read_csv(args['file'])
|
|
del args['file']
|
|
elif 'data' not in args :
|
|
reader = factory.instance(**args['store']['source'])
|
|
if 'row_limit' in args :
|
|
df = reader.read(sql=args['sql'],limit=args['row_limit'])
|
|
else:
|
|
df = reader.read(sql=args['sql'])
|
|
schema = reader.meta(table=args['from']) if hasattr(reader,'meta') and 'from' in args else None
|
|
else:
|
|
df = args['data']
|
|
|
|
#
|
|
#
|
|
# df = df.fillna('')
|
|
if schema :
|
|
_schema = []
|
|
for _item in schema :
|
|
_type = int
|
|
_value = 0
|
|
if _item.field_type == 'FLOAT' :
|
|
_type =float
|
|
elif _item.field_type != 'INTEGER' :
|
|
_type = str
|
|
_value = ''
|
|
_schema += [{"name":_item.name,"type":_item.field_type}]
|
|
df[_item.name] = df[_item.name].fillna(_value).astype(_type)
|
|
args['schema'] = _schema
|
|
# df[_item.name] = df[_item.name].astype(_type)
|
|
_args = copy.deepcopy(args)
|
|
# _args['store'] = args['store']['source']
|
|
_args['data'] = df
|
|
#
|
|
# The columns that are continuous should also be skipped because they don't need to be synthesied (like-that)
|
|
if 'continuous' in args :
|
|
x_cols = args['continuous']
|
|
else:
|
|
x_cols = []
|
|
|
|
if 'ignore' in args and 'columns' in args['ignore'] :
|
|
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
|
|
_args['data'] = df[ list(set(df.columns)- set(_cols))]
|
|
#
|
|
# We need to make sure that continuous columns are removed
|
|
if x_cols :
|
|
_args['data'] = _args['data'][list(set(_args['data'].columns) - set(x_cols))]
|
|
if 'gpu' in args :
|
|
_args['gpu'] = self.set_gpu(gpu=args['gpu'])
|
|
if 'partition' in args :
|
|
_args['partition'] = args['partition']
|
|
if df.shape[0] and df.shape[0] :
|
|
#
|
|
# We have a full blown matrix to be processed
|
|
data.maker.train(**_args)
|
|
else:
|
|
print ("... skipping training !!")
|
|
|
|
if 'autopilot' in ( list(args.keys())) :
|
|
|
|
args['data'] = df
|
|
print (['autopilot mode enabled ....',args['context']])
|
|
self.generate(args)
|
|
|
|
pass
|
|
|
|
def approximate(self,values):
|
|
"""
|
|
:param values array of values to be approximated
|
|
"""
|
|
if values.dtype in [int,float] :
|
|
#
|
|
# @TODO: create bins?
|
|
r = np.random.dirichlet(values+.001) #-- dirichlet doesn't work on values with zeros
|
|
_sd = values[values > 0].std()
|
|
_me = values[values > 0].mean()
|
|
_mi = values.min()
|
|
x = []
|
|
_type = values.dtype
|
|
for index in np.arange(values.size) :
|
|
|
|
if np.random.choice([0,1],1)[0] :
|
|
value = values[index] + (values[index] * r[index])
|
|
|
|
else :
|
|
value = values[index] - (values[index] * r[index])
|
|
#
|
|
# randomly shifting the measurements
|
|
if np.random.choice([0,1],1)[0] and _me > _sd :
|
|
if np.random.choice([0,1],1)[0] :
|
|
value = value * np.divide(_me,_sd)
|
|
else:
|
|
value = value + (np.divide(_me,_sd))
|
|
value = int(value) if _type == int else np.round(value,2)
|
|
x.append( value)
|
|
np.random.shuffle(x)
|
|
return np.array(x)
|
|
else:
|
|
return values
|
|
pass
|
|
|
|
def shuffle(self,_args):
|
|
if 'data' in args :
|
|
df = data['data']
|
|
else:
|
|
reader = factory.instance(**args['store']['source'])
|
|
if 'file' in args :
|
|
df = pd.read_csv(args['file'])
|
|
elif 'data' in _args :
|
|
df = _args['data']
|
|
else:
|
|
if 'row_limit' in args and 'sql' in args:
|
|
df = reader.read(sql=args['sql'],limit=args['row_limit'])
|
|
else:
|
|
df = reader.read(sql=args['sql'])
|
|
schema = None
|
|
if 'schema' not in args and hasattr(reader,'meta') and 'file' not in args:
|
|
schema = reader.meta(table=args['from'])
|
|
schema = [{"name":_item.name,"type":_item.field_type} for _item in schema]
|
|
#
|
|
# We are shufling designated colmns and will be approximating the others
|
|
#
|
|
x_cols = [] #-- coumns tobe approximated.
|
|
_cols = [] #-- columns to be ignored
|
|
if 'continuous' in args :
|
|
x_cols = args['continuous']
|
|
if 'ignore' in args and 'columns' in args['ignore'] :
|
|
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
|
|
|
|
columns = args['columns'] if 'columns' in args else df.columns
|
|
columns = list(set(columns) - set(_cols))
|
|
for name in columns:
|
|
i = np.arange(df.shape[0])
|
|
np.random.shuffle(i)
|
|
if name in x_cols :
|
|
if df[name].unique().size > 0 :
|
|
df[name] = self.approximate(df.iloc[i][name].fillna(0).values)
|
|
# df[name] = df[name].astype(str)
|
|
# pass
|
|
|
|
df.index = np.arange(df.shape[0])
|
|
self.post(data=df,schema=schema,store=args['store']['target'])
|
|
def post(self,**_args) :
|
|
_schema = _args['schema'] if 'schema' in _args else None
|
|
writer = factory.instance(**_args['store'])
|
|
_df = _args['data']
|
|
if _schema :
|
|
columns = []
|
|
for _item in _schema :
|
|
name = _item['name']
|
|
_type = str
|
|
_value = 0
|
|
if _item['type'] in ['DATE','TIMESTAMP','DATETIMESTAMP','DATETIME'] :
|
|
if _item['type'] == 'DATE' :
|
|
_df[name] = _df[name].dt.date
|
|
|
|
|
|
|
|
else:
|
|
if _item['type'] == 'INTEGER' :
|
|
_type = np.int64
|
|
elif _item['type'] in ['FLOAT','NUMERIC']:
|
|
_type = np.float64
|
|
else:
|
|
_value = ''
|
|
_df[name] = _df[name].fillna(_value).astype(_type)
|
|
columns.append(name)
|
|
writer.write(_df,schema=_schema,table=args['from'])
|
|
else:
|
|
writer.write(_df,table=args['from'])
|
|
|
|
def finalize(self,args):
|
|
"""
|
|
This function performs post-processing opertions on a synthetic table i.e :
|
|
- remove duplicate keys
|
|
- remove orphaned keys i.e
|
|
"""
|
|
reader = factory.instance(**args['store']['source'])
|
|
logger = factory.instance(**args['store']['logs'])
|
|
target = args['store']['target']['args']['dataset']
|
|
source = args['store']['source']['args']['dataset']
|
|
table = args['from']
|
|
schema = reader.meta(table=args['from'])
|
|
#
|
|
# keys :
|
|
unique_field = "_".join([args['from'],'id']) if 'unique_fields' not in args else args['unique_fields']
|
|
fields = [ item.name if item.name != unique_field else "y."+item.name for item in schema]
|
|
SQL = [
|
|
"SELECT :fields FROM ",
|
|
"(SELECT ROW_NUMBER() OVER() AS row_number,* FROM :target.:table) x","INNER JOIN",
|
|
"(SELECT ROW_NUMBER() OVER() AS row_number, :unique_field FROM :source.:table ORDER BY RAND()) y",
|
|
"ON y.row_number = x.row_number"
|
|
]
|
|
SQL = " ".join(SQL).replace(":fields",",".join(fields)).replace(":table",table).replace(":source",source).replace(":target",target)
|
|
SQL = SQL.replace(":unique_field",unique_field)
|
|
#
|
|
# Use a native job to get this done ...
|
|
#
|
|
client = bq.Client.from_service_account_json(args['store']['source']['args']["private_key"])
|
|
job = bq.QueryJobConfig()
|
|
job.destination = client.dataset(target).table(table)
|
|
job.use_query_cache = True
|
|
job.allow_large_results = True
|
|
# job.time_partitioning = bq.table.TimePartitioning(type_=bq.table.TimePartitioningType.DAY)
|
|
job.write_disposition = "WRITE_TRUNCATE"
|
|
job.priority = 'BATCH'
|
|
r = client.query(SQL,location='US',job_config=job)
|
|
logger.write({"job":r.job_id,"action":"finalize", "args":{"sql":SQL,"source":"".join([source,table]),"destimation":".".join([target,table])}})
|
|
#
|
|
# Keep a log of what just happened...
|
|
#
|
|
otable = ".".join([args['store']['source']['args']['dataset'],args['from']])
|
|
dtable = ".".join([args['store']['target']['args']['dataset'],args['from']])
|
|
def generate(self,args):
|
|
"""
|
|
This function will generate data and store it to a given,
|
|
"""
|
|
store = args['store']['logs']
|
|
store['args']['doc'] = args['context']
|
|
logger = factory.instance(**store) #type='mongo.MongoWriter',args={'dbname':'aou','doc':args['context']})
|
|
|
|
ostore = args['store']['target']
|
|
writer = factory.instance(**ostore)
|
|
|
|
schema = args['schema'] if 'schema' in args else None
|
|
if 'data' in args :
|
|
|
|
df = args['data']
|
|
else:
|
|
|
|
reader = factory.instance(**args['store']['source'])
|
|
if 'row_limit' in args :
|
|
df = reader.read(sql=args['sql'],limit=args['row_limit'])
|
|
else:
|
|
df = reader.read(sql=args['sql'])
|
|
if 'schema' not in args and hasattr(reader,'meta'):
|
|
schema = reader.meta(table=args['from'])
|
|
schema = [{"name":_item.name,"type":_item.field_type} for _item in schema]
|
|
|
|
|
|
# else:
|
|
# #
|
|
# # This will account for autopilot mode ...
|
|
# df = args['data']
|
|
_cast = {}
|
|
if schema :
|
|
for _item in schema :
|
|
dtype = str
|
|
name = _item['name']
|
|
novalue = -1
|
|
if _item['type'] in ['INTEGER','NUMERIC']:
|
|
dtype = np.int64
|
|
|
|
elif _item['type'] == 'FLOAT' :
|
|
dtype = np.float64
|
|
else:
|
|
novalue = ''
|
|
# _cast[schema['name']] = dtype
|
|
df[name] = df[name].fillna(novalue).astype(dtype)
|
|
|
|
_info = {"module":"gan-prep","action":"read","shape":{"rows":df.shape[0],"columns":df.shape[1]},"schema":schema}
|
|
logger.write(_info)
|
|
|
|
|
|
_dc = pd.DataFrame()
|
|
# for mdf in df :
|
|
args['data'] = df.copy()
|
|
#
|
|
# The columns that are continuous should also be skipped because they don't need to be synthesied (like-that)
|
|
if 'continuous' in args :
|
|
x_cols = args['continuous']
|
|
else:
|
|
x_cols = []
|
|
|
|
if 'ignore' in args and 'columns' in args['ignore'] :
|
|
_cols = self.get_ignore(data=df,columns=args['ignore']['columns'])
|
|
args['data'] = args['data'][ list(set(df.columns)- set(_cols))]
|
|
#
|
|
# We need to remove the continuous columns from the data-frame
|
|
# @TODO: Abstract this !!
|
|
#
|
|
real_df = pd.DataFrame()
|
|
if x_cols :
|
|
args['data'] = args['data'][list(set(args['data'].columns) - set(x_cols))]
|
|
real_df = df[x_cols].copy()
|
|
|
|
args['candidates'] = 1 if 'candidates' not in args else int(args['candidates'])
|
|
if 'gpu' in args :
|
|
args['gpu'] = self.set_gpu(gpu=args['gpu'])
|
|
# if 'partition' in args :
|
|
# args['logs'] = os.sep.join([args['logs'],str(args['partition'])])
|
|
|
|
_info = {"module":"gan-prep","action":"prune","shape":{"rows":args['data'].shape[0],"columns":args['data'].shape[1]}}
|
|
logger.write(_info)
|
|
if args['data'].shape[0] > 0 and args['data'].shape[1] > 0 :
|
|
candidates = (data.maker.generate(**args))
|
|
else:
|
|
candidates = [df]
|
|
if 'sql.BQWriter' in ostore['type'] :
|
|
#table = ".".join([ostore['['dataset'],args['context']])
|
|
# writer = factory.instance(**ostore)
|
|
_columns = None
|
|
skip_columns = []
|
|
_schema = schema
|
|
if schema :
|
|
cols = [_item['name'] for _item in _schema]
|
|
else:
|
|
cols = df.columns
|
|
for _df in candidates :
|
|
#
|
|
# we need to format the fields here to make sure we have something cohesive
|
|
#
|
|
|
|
if not skip_columns :
|
|
# _columns = set(df.columns) - set(_df.columns)
|
|
if 'ignore' in args and 'columns' in args['ignore'] :
|
|
skip_columns = self.get_ignore(data=_df,columns=args['ignore']['columns'])
|
|
# for name in args['ignore']['columns'] :
|
|
# for _name in _df.columns:
|
|
# if _name in name:
|
|
# skip_columns.append(_name)
|
|
#
|
|
# We perform a series of set operations to insure that the following conditions are met:
|
|
# - the synthetic dataset only has fields that need to be synthesized
|
|
# - The original dataset has all the fields except those that need to be synthesized
|
|
#
|
|
|
|
_df = _df[list(set(_df.columns) - set(skip_columns))].copy()
|
|
if x_cols :
|
|
_approx = {}
|
|
for _col in x_cols :
|
|
if real_df[_col].unique().size > 0 :
|
|
|
|
|
|
_df[_col] = self.approximate(real_df[_col].values)
|
|
_approx[_col] = {
|
|
"io":{"min":_df[_col].min().astype(float),"max":_df[_col].max().astype(float),"mean":_df[_col].mean().astype(float),"sd":_df[_col].values.std().astype(float),"missing": _df[_col].where(_df[_col] == -1).dropna().count().astype(float),"zeros":_df[_col].where(_df[_col] == 0).dropna().count().astype(float)},
|
|
"real":{"min":real_df[_col].min().astype(float),"max":real_df[_col].max().astype(float),"mean":real_df[_col].mean().astype(float),"sd":real_df[_col].values.std().astype(float),"missing": real_df[_col].where(_df[_col] == -1).dropna().count().astype(float),"zeros":real_df[_col].where(_df[_col] == 0).dropna().count().astype(float)}
|
|
}
|
|
else:
|
|
_df[_col] = -1
|
|
logger.write({"module":"gan-generate","action":"approximate","status":_approx})
|
|
if set(df.columns) & set(_df.columns) :
|
|
_columns = set(df.columns) - set(_df.columns)
|
|
df = df[_columns]
|
|
|
|
#
|
|
# Let us merge the dataset here and and have a comprehensive dataset
|
|
|
|
_df = pd.DataFrame.join(df,_df)
|
|
|
|
# if _schema :
|
|
# for _item in _schema :
|
|
# if _item['type'] in ['DATE','TIMESTAMP','DATETIME'] :
|
|
# _df[_item['name']] = _df[_item['name']].astype(str)
|
|
|
|
# pass
|
|
_params = {'data':_df,'store' : ostore}
|
|
if _schema :
|
|
_params ['schema'] = _schema
|
|
self.post(**_params)
|
|
# if _schema :
|
|
# writer.write(_df[cols],schema=_schema,table=args['from'])
|
|
# self.post(data=_df,schema=)
|
|
# else:
|
|
# writer.write(_df[cols],table=args['from'])
|
|
|
|
pass
|
|
# else:
|
|
# pass
|
|
|
|
|
|
# #
|
|
# # We need to post the generate the data in order to :
|
|
# # 1. compare immediately
|
|
# # 2. synthetic copy
|
|
# #
|
|
|
|
# cols = _dc.columns.tolist()
|
|
|
|
# data_comp = _args['data'][args['columns']].join(_dc[args['columns']],rsuffix='_io') #-- will be used for comparison (store this in big query)
|
|
# #
|
|
# # performing basic analytics on the synthetic data generated (easy to quickly asses)
|
|
# #
|
|
# info = {"module":"generate","action":"io.metrics","input":{"rows":data_comp.shape[0],"partition":partition,"logs":[]}}
|
|
|
|
# #
|
|
# # @TODO: Send data over to a process for analytics
|
|
|
|
# base_cols = list(set(_args['data'].columns) - set(args['columns'])) #-- rebuilt the dataset (and store it)
|
|
# cols = _dc.columns.tolist()
|
|
# for name in cols :
|
|
# _args['data'][name] = _dc[name]
|
|
|
|
# #
|
|
# #-- Let us store all of this into bigquery
|
|
# prefix = args['notify']+'.'+_args['context']
|
|
# partition = str(partition)
|
|
# table = '_'.join([prefix,partition,'io']).replace('__','_')
|
|
# folder = os.sep.join([args['logs'],args['context'],partition,'output'])
|
|
# if 'file' in args :
|
|
|
|
# _fname = os.sep.join([folder,table.replace('_io','_full_io.csv')])
|
|
# _pname = os.sep.join([folder,table])+'.csv'
|
|
# data_comp.to_csv( _pname,index=False)
|
|
# _args['data'].to_csv(_fname,index=False)
|
|
|
|
# _id = 'path'
|
|
# else:
|
|
|
|
# credentials = service_account.Credentials.from_service_account_file('/home/steve/dev/aou/accounts/curation-prod.json')
|
|
# _pname = os.sep.join([folder,table+'.csv'])
|
|
# _fname = table.replace('_io','_full_io')
|
|
# partial = '.'.join(['io',args['context']+'_partial_io'])
|
|
# complete= '.'.join(['io',args['context']+'_full_io'])
|
|
# data_comp.to_csv(_pname,index=False)
|
|
# if 'dump' in args :
|
|
# print (_args['data'].head())
|
|
# else:
|
|
# Components.lock.acquire()
|
|
# data_comp.to_gbq(if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
|
|
# _args['data'].to_gbq(if_exists='append',destination_table=complete,credentials=credentials,chunksize=90000)
|
|
# Components.lock.release()
|
|
# _id = 'dataset'
|
|
# info = {"full":{_id:_fname,"rows":_args['data'].shape[0]},"partial":{"path":_pname,"rows":data_comp.shape[0]} }
|
|
# if partition :
|
|
# info ['partition'] = int(partition)
|
|
# logger.write({"module":"generate","action":"write","input":info} )
|
|
|
|
|
|
|
|
if __name__ == '__main__' :
|
|
filename = SYS_ARGS['config'] if 'config' in SYS_ARGS else 'config.json'
|
|
f = open (filename)
|
|
_config = json.loads(f.read())
|
|
f.close()
|
|
PIPELINE = _config['pipeline']
|
|
index = SYS_ARGS['index']
|
|
if index.isnumeric() :
|
|
index = int(SYS_ARGS['index'])
|
|
else:
|
|
#
|
|
# The index provided is a key to a pipeline entry mainly the context
|
|
#
|
|
N = len(PIPELINE)
|
|
f = [i for i in range(0,N) if PIPELINE[i]['context'] == index]
|
|
index = f[0] if f else 0
|
|
#
|
|
|
|
print ("..::: ",PIPELINE[index]['context'])
|
|
args = (PIPELINE[index])
|
|
for key in _config :
|
|
if key == 'pipeline' or key in args:
|
|
#
|
|
# skip in case of pipeline or if key exists in the selected pipeline (provided by index)
|
|
#
|
|
continue
|
|
args[key] = _config[key]
|
|
|
|
args = dict(args,**SYS_ARGS)
|
|
if 'matrix_size' in args :
|
|
args['matrix_size'] = int(args['matrix_size'])
|
|
if 'batch_size' not in args :
|
|
args['batch_size'] = 2000 #if 'batch_size' not in args else int(args['batch_size'])
|
|
if 'dataset' not in args :
|
|
args['dataset'] = 'combined20191004v2_deid'
|
|
PART_SIZE = int(args['part_size']) if 'part_size' in args else 8
|
|
#
|
|
# @TODO:
|
|
# Log what was initiated so we have context of this processing ...
|
|
#
|
|
|
|
GPU_CHIPS = args['gpu'] if 'gpu' in args else None
|
|
if GPU_CHIPS and type(GPU_CHIPS) != list :
|
|
GPU_CHIPS = [int(_id.strip()) for _id in GPU_CHIPS.split(',')] if type(GPU_CHIPS) == str else [GPU_CHIPS]
|
|
if 'gpu' in SYS_ARGS :
|
|
args['gpu'] = GPU_CHIPS
|
|
jobs = []
|
|
if 'generate' in SYS_ARGS :
|
|
#
|
|
# Let us see if we have partitions given the log folder
|
|
|
|
content = os.listdir( os.sep.join([args['logs'],'train',args['context']]))
|
|
if 'all-chips' in SYS_ARGS and GPU_CHIPS:
|
|
index = 0
|
|
jobs = []
|
|
for _gpu in GPU_CHIPS :
|
|
_args = copy.deepcopy(args)
|
|
_args['gpu'] = [int(_gpu)]
|
|
_args['partition'] = int(_gpu) #index
|
|
index += 1
|
|
make = lambda _params: (Components()).generate(_params)
|
|
job = Process(target=make,args=( dict(_args),))
|
|
job.name = 'Trainer # ' + str(index)
|
|
job.start()
|
|
jobs.append(job)
|
|
pass
|
|
else:
|
|
generator = Components()
|
|
generator.generate(args)
|
|
elif 'shuffle' in SYS_ARGS :
|
|
index = 0
|
|
if GPU_CHIPS and 'all-chips' in SYS_ARGS:
|
|
|
|
for index in GPU_CHIPS :
|
|
publisher = lambda _params: ( Components() ).shuffle(_params)
|
|
job = Process (target = publisher,args=( args,))
|
|
job.name = 'Shuffler #' + str(index)
|
|
job.start()
|
|
jobs.append(job)
|
|
else:
|
|
shuffler = Components()
|
|
shuffler.shuffle(args)
|
|
pass
|
|
elif 'train' in SYS_ARGS:
|
|
|
|
# DATA = np.array_split(DATA,PART_SIZE)
|
|
#
|
|
# Let us create n-jobs across n-gpus, The assumption here is the data that is produced will be a partition
|
|
# @TODO: Find better name for partition
|
|
#
|
|
if GPU_CHIPS and 'all-chips' in SYS_ARGS:
|
|
index = 0
|
|
print (['... launching ',len(GPU_CHIPS),' jobs',args['context']])
|
|
for _gpu in GPU_CHIPS :
|
|
_args = copy.deepcopy(args)
|
|
_args['gpu'] = [int(_gpu)]
|
|
_args['partition'] = int(_gpu) #index
|
|
index += 1
|
|
make = lambda _params: (Components()).train(**_params)
|
|
job = Process(target=make,args=( _args,))
|
|
job.name = 'Trainer # ' + str(index)
|
|
job.start()
|
|
jobs.append(job)
|
|
|
|
|
|
|
|
|
|
else:
|
|
#
|
|
# The choice of the chip will be made internally
|
|
agent = Components()
|
|
agent.train(**args)
|
|
#
|
|
# If we have any obs we should wait till they finish
|
|
#
|
|
DIRTY = 0
|
|
while len(jobs)> 0 :
|
|
DIRTY =1
|
|
jobs = [job for job in jobs if job.is_alive()]
|
|
time.sleep(2)
|
|
if DIRTY:
|
|
print (["..:: jobs finished "])
|
|
#
|
|
# We need to harmonize the keys if any at all in this case we do this for shuffle or generate operations
|
|
#
|
|
print (['finalize' in SYS_ARGS, ('generate' in SYS_ARGS or 'shuffle' in SYS_ARGS) ])
|
|
if 'finalize' in SYS_ARGS or ('generate' in SYS_ARGS or 'shuffle' in SYS_ARGS) :
|
|
#
|
|
# We should pull all the primary keys and regenerate them in order to insure some form of consistency
|
|
#
|
|
|
|
(Components()).finalize(args)
|
|
# finalize(args)
|
|
pass
|
|
# jobs = []
|
|
# for index in range(0,PART_SIZE) :
|
|
# if 'focus' in args and int(args['focus']) != index :
|
|
# continue
|
|
# args['part_size'] = PART_SIZE
|
|
# args['partition'] = index
|
|
# args['data'] = DATA[index]
|
|
# if int(args['num_gpu']) > 1 :
|
|
# args['gpu'] = index
|
|
# else:
|
|
# args['gpu']=0
|
|
|
|
# make = lambda _args: (Components()).train(**_args)
|
|
# job = Process(target=make,args=( dict(args),))
|
|
# job.name = 'Trainer # ' + str(index)
|
|
# job.start()
|
|
# jobs.append(job)
|
|
# # args['gpu']
|
|
# print (["Started ",len(jobs),"trainers" if len(jobs)>1 else "trainer" ])
|
|
# while len(jobs)> 0 :
|
|
# jobs = [job for job in jobs if job.is_alive()]
|
|
# time.sleep(2)
|
|
|
|
# trainer = Components()
|
|
# trainer.train(**args)
|
|
|
|
|
|
# Components.train(**args)
|
|
#for args in PIPELINE :
|
|
#args['dataset'] = 'combined20190510'
|
|
#process = Process(target=Components.train,args=(args,))
|
|
#process.name = args['context']
|
|
#process.start()
|
|
# Components.train(args)
|