From 54744253ca298a77be43dcbb7775a92e48b37131 Mon Sep 17 00:00:00 2001 From: steve Date: Wed, 18 Jan 2017 14:24:13 -0600 Subject: [PATCH] Bug fix with learner, @TODO: Determine epsilon i.e the right one to get good f-score --- src/utils/ml.py | 35 ++++++++++++++++++++++------------- src/utils/workers.py | 29 +++++++++++++++++++++++------ test/TestServerMonitor.py | 12 ++++++++++-- 3 files changed, 55 insertions(+), 21 deletions(-) diff --git a/src/utils/ml.py b/src/utils/ml.py index 2bd0a44..8c722bd 100644 --- a/src/utils/ml.py +++ b/src/utils/ml.py @@ -16,7 +16,7 @@ class ML: # @TODO: Make sure this approach works across all transport classes # We may have a potential issue of how the data is stored ... it may not scale # - return [item[0] for item in data if item[0][attr] == value] + return [item[0] for item in data if item and attr in item[0] and item[0][attr] == value] @staticmethod def Extract(lattr,data): return [[row[id] for id in lattr] for row in data] @@ -27,7 +27,7 @@ class ML: @TODO: determine computationally determine epsilon """ class AnomalyDetection: - def split(self,data,index=-1,threshold=0.7) : + def split(self,data,index=-1,threshold=0.8) : N = len(data) # if N < LIMIT: # return None @@ -47,10 +47,15 @@ class AnomalyDetection: """ def learn(self,data,key,value,features,label): xo = ML.Filter(key,value,data) - if len(xo) < 100 : + + if not xo : return None + + #if len(xo) < 100 : + #return None # attr = conf['features'] # label= conf['label'] + yo= ML.Extract([label['name']],xo) xo = ML.Extract(features,xo) yo = self.getLabel(yo,label) @@ -58,12 +63,14 @@ class AnomalyDetection: xo = self.split(xo) yo = self.split(yo) - p = self.gParameters(xo['train']) - - px = self.gPx(p['mean'],p['cov'],xo['test']) - - perf = self.gPerformance(px,yo['test']) - return {"parameters":p,"performance":perf} + if xo['train'] : + p = self.gParameters(xo['train']) + + px = self.gPx(p['mean'],p['cov'],xo['test']) + + perf = self.gPerformance(px,yo['test']) + return {"parameters":p,"performance":perf} + return None def getLabel(self,yo,label_conf): return [ int(len(set(item) & set(label_conf["1"]))>0) for item in yo ] @@ -72,7 +79,7 @@ class AnomalyDetection: This function will compute the probability density function given a particular event/set of events @pre xu.shape[0] == sigma[0] == sigma[1] """ - def gPx(self,xu,sigma,data,EPSILON=0.05): + def gPx(self,xu,sigma,data,EPSILON=0.25): n = len(data[0]) r = [] @@ -84,6 +91,7 @@ class AnomalyDetection: d = np.matrix(row - xu) d.shape = (n,1) b = np.exp((-0.5*np.transpose(d)) * (np.linalg.inv(sigma)*d)) + px = float(b/a) r.append([px,int(px < EPSILON)]) return r @@ -103,8 +111,8 @@ class AnomalyDetection: fp += 1 if (test[i][1] != labels[i] and test[i][1] == 1) else 0 fn += 1 if (test[i][1] != labels[i] and test[i][1] == 0) else 0 tn += 1 if (test[i][1] == labels[i] and test[i][1] == 0) else 0 - precision = tp / (tp + fp) - recall = tp / (tp + fn) + precision = tp / (tp + fp) if tp + fp > 0 else 1 + recall = tp / (tp + fn) if tp + fp > 0 else 1 fscore = (2 * precision * recall)/ (precision + recall) return {"precision":precision,"recall":recall,"fscore":fscore} @@ -124,4 +132,5 @@ class AnomalyDetection: # m = np.array([ (m[i,:] - u[i])/r[i] for i in range(0,n)]) sigma = np.cov(m) - return {"cov":sigma,"mean":u} + sigma = [ list(row) for row in sigma] + return {"cov":sigma,"mean":list(u)} diff --git a/src/utils/workers.py b/src/utils/workers.py index 391e14b..e88ddc0 100644 --- a/src/utils/workers.py +++ b/src/utils/workers.py @@ -1,6 +1,7 @@ #import multiprocessing from threading import Thread, Lock from utils import transport +from utils.ml import AnomalyDetection import time import monitor import sys @@ -78,17 +79,33 @@ class Learner(Thread) : r = {} for key in data : logs = data[key] - r[key] = {} + for app in self.apps: + handler = AnomalyDetection() - r[key][app] = lhandler.learn(data,'label',app,self.features,self.yo) + value = handler.learn(logs,'label',app,self.features,self.yo) + + if value is not None: + print value + if key not in r: + r[key] = {} + r[key][app] = value # # At this point we've already learnt every thing we need to learn # - self.lock.aquire() - writer = sef.factory.instance(type.self.write_class,args=self.rw_args) - writer.write('learn',r) - self.lock.release() + + if r.keys() : + + self.lock.acquire() + writer = self.factory.instance(type=self.write_class,args=self.rw_args) + writer.write(label='learn',row=r) + self.lock.release() + + if 'MONITOR_CONFIG_PATH' in os.environ: + # + # This suggests we are in development mode + # + break TIME_ELLAPSED = 60*120 #-- Every 2 hours time.sleep(TIME_ELLAPSED) diff --git a/test/TestServerMonitor.py b/test/TestServerMonitor.py index c9130b1..4143dc8 100644 --- a/test/TestServerMonitor.py +++ b/test/TestServerMonitor.py @@ -5,7 +5,8 @@ import monitor import os import json from utils.workers import Top, Learner -from multiprocessing import Lock +#from multiprocessing import Lock +from threading import Lock path = os.environ['MONITOR_CONFIG_PATH'] f = open(path) CONFIG = json.loads( f.read()) @@ -46,7 +47,14 @@ class TestMonitorServer(unittest.TestCase): def test_StartTop(self): lock = Lock() p = Top(CONFIG,lock) + #p.start() + + #p.join() + def test_StartLearner(self): + lock = Lock() + p = Learner(CONFIG,lock) p.start() - p.join() + + if __name__ == '__main__' : unittest.main()