From 0afbb1c0726a47f7600aaa1eee4ed99012b684b1 Mon Sep 17 00:00:00 2001 From: "Steve L. Nyemba" Date: Sun, 29 Jan 2017 20:10:05 -0600 Subject: [PATCH] Enabling preconditions to be effective (variance) @TODO: dimensionality reduction --- src/utils/ml.py | 51 +++++++++++++++++++++++++++++------------- src/utils/transport.py | 5 ++++- src/utils/workers.py | 20 +++++++++++++---- 3 files changed, 55 insertions(+), 21 deletions(-) diff --git a/src/utils/ml.py b/src/utils/ml.py index 9a76162..9339760 100644 --- a/src/utils/ml.py +++ b/src/utils/ml.py @@ -16,7 +16,7 @@ class ML: # @TODO: Make sure this approach works across all transport classes # We may have a potential issue of how the data is stored ... it may not scale # - + value = ML.CleanupName(value) #return [item[0] for item in data if item and attr in item[0] and item[0][attr] == value] return [[item for item in row if item[attr] == value][0] for row in data] @staticmethod @@ -24,7 +24,9 @@ class ML: if isinstance(lattr,basestring): lattr = [lattr] return [[row[id] for id in lattr] for row in data] - + @staticmethod + def CleanupName(value) : + return value.replace('$','').replace('.+','') """ Implements a multivariate anomaly detection @@ -32,7 +34,7 @@ class ML: """ class AnomalyDetection: - def split(self,data,index=-1,threshold=0.9) : + def split(self,data,index=-1,threshold=0.65) : N = len(data) # if N < LIMIT: # return None @@ -52,13 +54,13 @@ class AnomalyDetection: @TODO: Map/Reduce does a good job at filtering """ def learn(self,data,key,value,features,label): - xo = ML.Filter(key,value,data) - if not xo or len(xo) < 100: - return None - #if len(xo) < 100 : - #return None + if len(data) < 10: + return None + xo = ML.Filter(key,value,data) + if len(xo) < 10 : + return None # attr = conf['features'] # label= conf['label'] @@ -69,9 +71,10 @@ class AnomalyDetection: xo = self.split(xo) yo = self.split(yo) p = self.gParameters(xo['train']) - has_cov = np.linalg.det(p['cov']) #-- making sure the matrix is invertible + has_cov = np.linalg.det(p['cov']) if p else False #-- making sure the matrix is invertible if xo['train'] and has_cov : E = 0.001 + ACCEPTABLE_FSCORE = 0.6 fscore = 0 # # We need to find an appropriate epsilon for the predictions @@ -94,22 +97,31 @@ class AnomalyDetection: __operf__ = self.gPerformance(px,yo['test']) - print __operf__ + if __operf__['fscore'] == 1 : - break + continue if perf is None : - perf = __operf__['fscore'] - elif perf['fscore'] < __perf__['fscore'] and __operf__['fscore']> 0.5 : perf = __operf__ - + elif perf['fscore'] < __operf__['fscore'] and __operf__['fscore'] > ACCEPTABLE_FSCORE : + perf = __operf__ perf['epsilon'] = Epsilon + # + # At this point we are assuming we came out of the whole thing with an acceptable performance + # The understanding is that error drives performance thus we reject fscore==1 + # - - if perf and perf['fscore'] > 0.5 : + if perf and perf['fscore'] > ACCEPTABLE_FSCORE : return {"label":value,"parameters":p,"performance":perf} else: return None return None + """ + This function determines if the preconditions for learning are met + For that parameters are passed to the function + p + """ + def canLearn(self,p) : + pass def getLabel(self,yo,label_conf): return [ int(len(set(item) & set(label_conf["1"]))>0) for item in yo ] @@ -188,8 +200,15 @@ class AnomalyDetection: return None r = np.array([ np.sqrt(np.var(m[i,:])) for i in range(0,n)]) # + # Before we normalize the data we must insure there's is some level of movement in this application + # A lack of movement suggests we may not bave enough information to do anything + # + if 0 in r : + return None + # #-- Normalizing the matrix then we will compute covariance matrix # + m = np.array([ (m[i,:] - u[i])/r[i] for i in range(0,n)]) sigma = np.cov(m) sigma = [ list(row) for row in sigma] diff --git a/src/utils/transport.py b/src/utils/transport.py index 5b451b6..3c696af 100644 --- a/src/utils/transport.py +++ b/src/utils/transport.py @@ -521,7 +521,10 @@ class CouchdbWriter(Couchdb,Writer): def __init__(self,**args): uri = args['uri'] self.uid = args['uid'] - self.filename = args['filename'] + if 'filename' in args: + self.filename = args['filename'] + else: + self.filename = None dbname = args['dbname'] self.server = Server(uri=uri) self.dbase = self.server.get_db(dbname) diff --git a/src/utils/workers.py b/src/utils/workers.py index 2948a65..82d1a71 100644 --- a/src/utils/workers.py +++ b/src/utils/workers.py @@ -1,6 +1,7 @@ #import multiprocessing from threading import Thread, RLock -from utils import transport +#from utils import transport +from utils.transport import * from utils.ml import AnomalyDetection,ML import time import monitor @@ -17,7 +18,7 @@ class Top(Thread): self.reader_class = _config['store']['class']['read'] self.write_class = _config['store']['class']['write'] self.rw_args = _config['store']['args'] - self.factory = transport.DataSourceFactory() + self.factory = DataSourceFactory() self.name = 'Zulu-Top' self.quit = False @@ -36,6 +37,7 @@ class Top(Thread): apps = self.config[label] self.handler.init(apps) r = self.handler.composite() + gwriter.write(label=label,row=r) time.sleep(5) self.lock.release() @@ -44,7 +46,7 @@ class Top(Thread): # This suggests we are in development mode # break - ELLAPSED_TIME = 60*30 + ELLAPSED_TIME = 60*20 time.sleep(ELLAPSED_TIME) print "Exiting ",self.name @@ -64,7 +66,7 @@ class Learner(Thread) : self.features = config['learner']['anomalies']['features'] self.yo = config['learner']['anomalies']['label'] self.apps = config['learner']['anomalies']['apps'] - self.factory = transport.DataSourceFactory() + self.factory = DataSourceFactory() self.quit = False def stop(self): @@ -192,3 +194,13 @@ class Factory : else: return None + +if __name__ =='__main__' : + import utils.params as SYS_ARGS + import json + PARAMS = SYS_ARGS.PARAMS + f = open(PARAMS['path']) + CONFIG = json.loads(f.read()) + f.close() + ThreadManager.start(CONFIG) + \ No newline at end of file