learner testing

master
Steve L. Nyemba 8 years ago
parent b2a1ed9165
commit f3d9e03b95

@ -23,30 +23,41 @@ class ML:
class AnomalyDetection:
def split(self,data,index=-1,threshold=0.7) :
N = len(data)
if N < LIMIT:
return None
# if N < LIMIT:
# return None
end = int(N*threshold)
train = data[:end]
test = data[end:]
if index > 0:
return {"train":train,"test":test,"labels":[]}
def learn(self,data,conf):
if 'filter' in conf:
filter = conf['filter']
data = ML.Filter(filter['key'],filter['value'],data)
attr = conf['features']
label= conf['label']
labels= ML.Extract([label],data)
data = ML.Extract(attr,data)
r = self.split(data)
labels = self.split(labels)
return {"train":train,"test":test}
"""
p = self.gParameters(r['train'])
test = self.gPx(p['mean'],p['cov'],r['test'])
return self.gPerformance(test,labels['test'])
@param key field name by which the data will be filtered
@param value field value for the filter
@param features features to be used in the analysis
@param labels used to assess performance
@TODO: Map/Reduce does a good job at filtering
"""
def learn(self,data,key,value,features,label):
xo = ML.Filter(key,value,data)
# attr = conf['features']
# label= conf['label']
yo= ML.Extract([label['name']],xo)
xo = ML.Extract(features,xo)
yo = self.getLabel(yo,label)
xo = self.split(xo)
yo = self.split(yo)
p = self.gParameters(xo['train'])
px = self.gPx(p['mean'],p['cov'],xo['test'])
print self.gPerformance(px,yo['test'])
def getLabel(self,yo,label_conf):
return [ int(len(set(item) & set(label_conf["1"]))>0) for item in yo ]
"""
@ -80,10 +91,10 @@ class AnomalyDetection:
fn = 0 # false negative
tn = 0 # true negative
for i in range(0,N):
tp += 1 if test[i][1]==labels[i] and test[i][1] == 1
fp += 1 if test[i][1] != labels[i] and test[i][1] == 1
fn += 1 if test[i][1] != labels[i] and test[i][1] == 0
tn += 1 if test[i][1] == labels[i] and test[i][1] == 0
tp += 1 if (test[i][1]==labels[i] and test[i][1] == 1) else 0
fp += 1 if (test[i][1] != labels[i] and test[i][1] == 1) else 0
fn += 1 if (test[i][1] != labels[i] and test[i][1] == 0) else 0
tn += 1 if (test[i][1] == labels[i] and test[i][1] == 0) else 0
precision = tp / (tp + fp)
recall = tp / (tp + fn)
fscore = (2 * precision * recall)/ (precision + recall)

@ -250,7 +250,8 @@ class MessageQueue:
resp = self.connection is not None and self.connection.is_open
self.close()
return resp
def close(self):
def close(self):
self.channel.close()
self.connection.close()
"""

@ -1,5 +1,5 @@
from utils import transport
from utils.ml import ML
from utils.ml import ML, AnomalyDetection
import unittest
import json
import os
@ -10,7 +10,6 @@ f = open(path)
CONFIG = json.loads( f.read())
f.close()
factory = transport.DataSourceFactory()
#greader = factory.instance(type=ref,args=p)
class TestML(unittest.TestCase):
def setUp(self):
@ -34,9 +33,21 @@ class TestML(unittest.TestCase):
r = r['apps']
x = ML.Filter('label','Google Chrome',r)
x_ = ML.Extract(['cpu_usage','memory_usage'], x)
print x[0]
print x_
self.assertTrue (len (x) == len(x_))
pass
def test_Learn(self):
ref = CONFIG['store']['class']['read']
p = CONFIG['store']['args']
greader = factory.instance(type=ref,args=p)
data = greader.read()
data = data['apps']
lhandler = AnomalyDetection()
features = CONFIG['learner']['anomalies']['features']
label = CONFIG['learner']['anomalies']['label']
lhandler.learn(data,'label','Google Chrome',features,label)
if __name__ == '__main__' :

Loading…
Cancel
Save