fixing learning bugs on integration

master
Steve L. Nyemba 8 years ago
parent b787ee0608
commit cbef913877

@ -23,40 +23,17 @@ import monitor
import Queue import Queue
from utils.transport import * from utils.transport import *
from utils.workers import ThreadManager, Factory from utils.workers import ThreadManager, Factory
from utils.ml import ML,AnomalyDetection
import utils.params as SYS_ARGS
import atexit import atexit
PARAMS = {'context':''}
if len(sys.argv) > 1:
N = len(sys.argv)
for i in range(1,N):
value = None
if sys.argv[i].startswith('--'):
key = sys.argv[i].replace('-','')
if i + 1 < N:
value = sys.argv[i + 1] = sys.argv[i+1].strip()
if key and value:
PARAMS[key] = value
i += 2
app = Flask(__name__) app = Flask(__name__)
app.config['SECRET_KEY'] = '!h8-[0v8]247-4-360' app.config['SECRET_KEY'] = '!h8-[0v8]247-4-360'
#app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX=?RT' #app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX=?RT'
PARAMS = SYS_ARGS.PARAMS
f = open(PARAMS['path']) f = open(PARAMS['path'])
CONFIG = json.loads(f.read()) CONFIG = json.loads(f.read())
#HANDLERS= {}
#for key in CONFIG['monitor'] :
#className = CONFIG['monitor'][key]['class']
#ref = "".join(["monitor.",className,"()"])
#ref = eval(ref)
#HANDLERS[key] = {"class":ref,"config":CONFIG['monitor'][key]["config"]}
f.close() f.close()
# #
@ -66,12 +43,7 @@ p = CONFIG['store']['args']
class_read = CONFIG['store']['class']['read'] class_read = CONFIG['store']['class']['read']
class_write= CONFIG['store']['class']['write'] class_write= CONFIG['store']['class']['write']
factory = DataSourceFactory() factory = DataSourceFactory()
#gWriter = factory.instance(type='CouchdbWritera',args=p)
#gReader = factory.instance(type='CouchdbReader',args=p)
#p['qid'] = HANDLERS['processes']['config'].keys()
gReader = factory.instance(type=class_read,args=p) gReader = factory.instance(type=class_read,args=p)
#gWriter = factory.instance(type=class_write,args=p)
#mthread = monitor.Monitor(HANDLERS,gWriter,'processes',)
atexit.register(ThreadManager.stop) atexit.register(ThreadManager.stop)
@app.route('/get/<id>') @app.route('/get/<id>')
@ -82,8 +54,12 @@ def procs(id):
r = {} r = {}
for label in d : for label in d :
if label not in ['learn'] :
index = len(d[label]) - 1 index = len(d[label]) - 1
r[label] = d[label][index] r[label] = d[label][index]
#for row in r[label] :
#yo = ML.Extract(['status'],row)
#xo = ML.Extract(['cpu_usage','memory_usage'],row)
except Exception, e: except Exception, e:
print e print e
@ -122,13 +98,14 @@ def trends ():
p = CONFIG['store']['args'] p = CONFIG['store']['args']
class_read = CONFIG['store']['class']['read'] class_read = CONFIG['store']['class']['read']
p['qid'] =[id] #HANDLERS['processes']['config'].keys()
gReader = factory.instance(type=class_read,args=p) gReader = factory.instance(type=class_read,args=p)
r = gReader.read() r = gReader.read()
if id in r: if id in r:
r = r[id] #--matrix r = r[id] #--matrix
series = [] series = []
for row in r: for row in r:
series += [item for item in row if str(item['label'])== app] series += [item for item in row if str(item['label'])== app]
@ -156,19 +133,37 @@ def dashboard():
""" """
@app.route('/learn') @app.route('/learn')
def learn(): def learn():
app = request.args.get('app') global CONFIG
id = request.args.get('id')
p = CONFIG['store']['args'] p = CONFIG['store']['args']
class_read = CONFIG['store']['class']['read'] class_read = CONFIG['store']['class']['read']
p['qid'] =[id] #HANDLERS['processes']['config'].keys()
gReader = factory.instance(type=class_read,args=p) gReader = factory.instance(type=class_read,args=p)
d = gReader.read()
if 'learn' in d :
logs = d['learn']
del d['learn']
else :
logs = []
r = []
if 'id' in request.args:
id = request.args['id']
d = d[id]
print CONFIG['monitor']['processes']['config'][id]
print (apps)
#apps = list(set(ML.Extract(['label'],d)))
p = AnomalyDetection()
#for row in d :
#xo = ML.Filter('label',app,d)
#info = ML.Filter('label',app,logs)
#value = p.predict(xo,info)
#print app,value
#if value is not None:
# r.append(value)
print r
return json.dumps("[]")
r = gReader.read()
r = r[id]
r = ML.Filter('label',app,r)
label = ML.Extract(['status'],r)
r = ML.Extract(['cpu_usage','memory_usage'],r)
@app.route('/anomalies/status') @app.route('/anomalies/status')
def anomalies_status(): def anomalies_status():
@ -180,7 +175,7 @@ def anomalies_get():
if __name__== '__main__': if __name__== '__main__':
ThreadManager.start(CONFIG) #ThreadManager.start(CONFIG)
app.run(host='0.0.0.0',debug=True,threaded=True) app.run(host='0.0.0.0',debug=True,threaded=True)

@ -16,9 +16,12 @@ class ML:
# @TODO: Make sure this approach works across all transport classes # @TODO: Make sure this approach works across all transport classes
# We may have a potential issue of how the data is stored ... it may not scale # We may have a potential issue of how the data is stored ... it may not scale
# #
return [item[0] for item in data if item and attr in item[0] and item[0][attr] == value] return [item[0] for item in data if item and attr in item[0] and item[0][attr] == value]
@staticmethod @staticmethod
def Extract(lattr,data): def Extract(lattr,data):
if isinstance(lattr,basestring):
lattr = [lattr]
return [[row[id] for id in lattr] for row in data] return [[row[id] for id in lattr] for row in data]
@ -27,6 +30,7 @@ class ML:
@TODO: determine computationally determine epsilon @TODO: determine computationally determine epsilon
""" """
class AnomalyDetection: class AnomalyDetection:
def split(self,data,index=-1,threshold=0.8) : def split(self,data,index=-1,threshold=0.8) :
N = len(data) N = len(data)
# if N < LIMIT: # if N < LIMIT:
@ -64,12 +68,21 @@ class AnomalyDetection:
yo = self.split(yo) yo = self.split(yo)
if xo['train'] : if xo['train'] :
E = 0.01
for i in range(0,10):
Epsilon = E + (2*E*i)
p = self.gParameters(xo['train']) p = self.gParameters(xo['train'])
px = self.gPx(p['mean'],p['cov'],xo['test']) px = self.gPx(p['mean'],p['cov'],xo['test'],Epsilon)
perf = self.gPerformance(px,yo['test']) perf = self.gPerformance(px,yo['test'])
return {"parameters":p,"performance":perf} if perf['fscore'] > 0 :
perf['epsilon'] = Epsilon
break
return {"label":value,"parameters":p,"performance":perf}
return None return None
def getLabel(self,yo,label_conf): def getLabel(self,yo,label_conf):
return [ int(len(set(item) & set(label_conf["1"]))>0) for item in yo ] return [ int(len(set(item) & set(label_conf["1"]))>0) for item in yo ]
@ -77,9 +90,10 @@ class AnomalyDetection:
""" """
This function will compute the probability density function given a particular event/set of events This function will compute the probability density function given a particular event/set of events
The return value is [px,yo]
@pre xu.shape[0] == sigma[0] == sigma[1] @pre xu.shape[0] == sigma[0] == sigma[1]
""" """
def gPx(self,xu,sigma,data,EPSILON=0.25): def gPx(self,xu,sigma,data,EPSILON=0.01):
n = len(data[0]) n = len(data[0])
r = [] r = []
@ -95,6 +109,21 @@ class AnomalyDetection:
px = float(b/a) px = float(b/a)
r.append([px,int(px < EPSILON)]) r.append([px,int(px < EPSILON)])
return r return r
"""
This function uses stored learnt information to predict on raw data
In this case it will determin if we have an anomaly or not
@param xo raw observations (matrix)
@param info stored information about this
"""
def predict(self,xo,info):
xo = ML.Filter(info['extract'],xo)
if not xo :
return None
sigma = info['parameters']['cov']
xu = info['parameters']['mean']
epsilon = info['performance']['epsilon']
return self.getPx(xu,sigma,xo,epsilon)
""" """
This function computes performance metrics i.e precision, recall and f-score This function computes performance metrics i.e precision, recall and f-score
for details visit https://en.wikipedia.org/wiki/Precision_and_recall for details visit https://en.wikipedia.org/wiki/Precision_and_recall
@ -134,3 +163,13 @@ class AnomalyDetection:
sigma = np.cov(m) sigma = np.cov(m)
sigma = [ list(row) for row in sigma] sigma = [ list(row) for row in sigma]
return {"cov":sigma,"mean":list(u)} return {"cov":sigma,"mean":list(u)}
class Regression:
parameters = {}
@staticmethod
def predict(xo):
pass
def __init__(self,config):
pass

@ -1,7 +1,7 @@
#import multiprocessing #import multiprocessing
from threading import Thread, RLock from threading import Thread, RLock
from utils import transport from utils import transport
from utils.ml import AnomalyDetection from utils.ml import AnomalyDetection,ML
import time import time
import monitor import monitor
import sys import sys
@ -76,16 +76,38 @@ class Learner(Thread) :
def run(self): def run(self):
reader = self.factory.instance(type=self.reader_class,args=self.rw_args) reader = self.factory.instance(type=self.reader_class,args=self.rw_args)
data = reader.read() data = reader.read()
#
# Let's make sure we extract that which has aleady been learnt
#
if 'learn' in data:
r = data['learn']
del data['learn']
r = ML.Extract('label',r)
logs = [row[0] for row in r]
logs = list(set(logs))
else:
logs = []
# #
# This is the motherload of innefficiency ... # In order to address the inefficiencies below, we chose to adopt the following policy
# We don't learn that which is already learnt, This measure consists in filtering out the list of the apps that already have learning data
# #
self.apps = list(set(self.apps) - set(logs))
while self.quit == False: while self.quit == False:
r = {} r = {}
lapps = list(self.apps)
for key in data : for key in data :
logs = data[key] logs = data[key]
#
for app in self.apps: # There poor design at this point, we need to make sure things tested don't get tested again
# This creates innefficiencies (cartesian product)
#
for app in lapps:
handler = AnomalyDetection() handler = AnomalyDetection()
value = handler.learn(logs,'label',app,self.features,self.yo) value = handler.learn(logs,'label',app,self.features,self.yo)
@ -94,17 +116,23 @@ class Learner(Thread) :
if key not in r: if key not in r:
r[key] = {} r[key] = {}
r[key][app] = value r[key][app] = value
i = lapps.index(app)
del lapps[i]
# #
# At this point we've already learnt every thing we need to learn # This offers a clean write to the data store upon value retrieved
# The removal of the application enables us to improve efficiency (among other things)
# #
value = dict(value,**{"features":self.features})
if r.keys() :
self.lock.acquire() self.lock.acquire()
writer = self.factory.instance(type=self.write_class,args=self.rw_args) writer = self.factory.instance(type=self.write_class,args=self.rw_args)
writer.write(label='learn',row=r) writer.write(label='learn',row=value)
self.lock.release() self.lock.release()
#
# Usually this is used for development
# @TODO : Remove this and find a healthy way to stop the server
#
if 'MONITOR_CONFIG_PATH' in os.environ: if 'MONITOR_CONFIG_PATH' in os.environ:
# #
# This suggests we are in development mode # This suggests we are in development mode

Loading…
Cancel
Save