process engines and learners @TODO: Bootup & revisit detailprocess class

master
Steve L. Nyemba 8 years ago
parent f3d9e03b95
commit b0cd031b07

@ -2,13 +2,17 @@
This is a RESTful interface implemented using Flask micro framework.
The API is driven by configuration that is organized in terms of the monitoring classes
The API is both restful and websocket/socketio enabled.
We designed the classes to be reusable (and powered by labels):
'monitoring-type':
'class':'<class-name>'
'config':<labeled-class-specific-configuration>'
"""
from flask import Flask, session, request, redirect, Response
from flask.templating import render_template
from flask_session import Session
import time
import sys
@ -18,6 +22,7 @@ import re
import monitor
import Queue
from utils.transport import *
PARAMS = {'context':''}
if len(sys.argv) > 1:
@ -36,6 +41,10 @@ if len(sys.argv) > 1:
app = Flask(__name__)
app.config['SECRET_KEY'] = '!h8-[0v8]247-4-360'
#app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX=?RT'
f = open(PARAMS['path'])
CONFIG = json.loads(f.read())
HANDLERS= {}
@ -152,11 +161,17 @@ def learn():
label = ML.Extract(['status'],r)
r = ML.Extract(['cpu_usage','memory_usage'],r)
@app.route('/anomalies/status')
def anomalies_status():
pass
@app.route('/anomalies/get')
def anomalies_get():
pass
if __name__== '__main__':
mthread.start()
app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX=?RT'
app.run(host='0.0.0.0',debug=True,threaded=True)

@ -0,0 +1,9 @@
from flask import Flask, render_template
from flask_socketio import SocketIO
app = Flask(__name__)
app.config['SECRET_KEY'] = '[0v8-247]-4qdm-h8r5!'
socketio = SocketIO(app)
if __name__ == '__main__':
socketio.run(app)

@ -1,5 +1,6 @@
<link type="text/css" rel="stylesheet" href="{{ context }}/js/jsgrid/jsgrid.min.css" />
<link type="text/css" rel="stylesheet" href="{{ context }}/js/jsgrid/jsgrid-theme.min.css" />
<script src="https://cdn.socket.io/socket.io-1.4.5.js"></script>
<script src="{{ context }}/static/js/jsgrid.js"></script>
<script src="{{ context }}/static/js/jquery/jquery.min.js"></script>

@ -334,3 +334,7 @@ monitor.sandbox.render = function (logs) {
jx.dom.show('inspect_sandbox')
}
/**
* Socket handler, check for learning status
*/

@ -26,7 +26,9 @@
<div id="menu" class="menu"></div>
</div>
<div class="left">
{% include "prediction.html" %}
</div>
<div class="left info ">
<div class="" style="text-transform:capitalize; ">

@ -1,31 +1,29 @@
<meta charset="UTF-8">
<meta http-equiv="cache-control" content="no-cache">
<meta name="viewport" content="width=device-width, initial-scale=1,maximum-scale=1">
<script src="https://cdn.socket.io/socket.io-1.4.5.js"></script>
<!--
*
* This section is to handle predictions, i.e multivariate anomaly detection
*
-->
<div class="small">
</div>
<div>
<div class="left border info">
<div id="predict_chart" class="left"></div>
<div class="class">
<div>
<div></div>
<div>Accuracy</div>
</div>
<div>
<div class="left">
<div id="precision">00</div>
<div class="small">Precision</div>
</div>
<div>
<div class="left">
<div id="recall">00</div>
<div class="small">Recall</div>
</div>
<div>
<div class="left">
<div id="fscore">00</div>
<div class="small"></div>
</div>

@ -221,68 +221,24 @@ class Monitor (Thread):
self.keep_running = True
lock = RLock()
while self.keep_running:
lock.acquire()
for label in self.mconfig:
lock.acquire()
self.handler.init(self.mconfig[label])
r = self.handler.composite()
self.writer.write(label=label,row = r)
lock.release()
time.sleep(2)
time.sleep(2)
lock.release()
self.prune()
HALF_HOUR = 60*25
time.sleep(HALF_HOUR)
print "Stopped ..."
def prune(self) :
MAX_ENTRIES = 100
if len(self.logs) > MAX_ENTRIES :
BEG = len(self.logs) - MAX_SIZE -1
self.logs = self.logs[BEG:]
class mapreducer:
def __init__(self):
self.store = {}
def filter (self,key,dataset):
return [row[key] for row in dataset if key in row]
def run(self,dataset,mapper,reducer):
r = None
if mapper is not None:
if isinstance(dataset,list) :
[mapper(row,self.emit) for row in dataset]
if reducer is not None:
r = self.store
# r = [reducer(self.store[key]) for key in self.store]
else:
r = self.store
return r
def mapper(self,row,emit):
[emit(_matrix['label'],_matrix) for _matrix in row ]
def reducer(self,values):
beg = len(values)-101 if len(values) > 100 else 0
return values[beg:]
def emit(self,key,content):
if key not in self.store:
self.store[key] = []
self.store[key].append(content)
# #
# # We need to generate the appropriate dataset here
# # map/reduce is a well documented technique for generating datasets
# #
# def map(self,key,id,rows):
# #r = [row[key] for row in rows if key in row]
# for row in rows:
# if key in row :
# for xr in row[key]:
# self.emit(xr['label'],xr)
# def reduce(keys,values):
# print values[0]
# return r

@ -1,6 +1,10 @@
"""
This file is intended to perfom certain machine learning tasks based on numpy
We are trying to keep it lean that's why no sklearn involved yet
@TODO:
Create factory method for the learners implemented here
Improve preconditions (size of the dataset, labels)
"""
from __future__ import division
import numpy as np
@ -16,6 +20,8 @@ class ML:
@staticmethod
def Extract(lattr,data):
return [[row[id] for id in lattr] for row in data]
"""
Implements a multivariate anomaly detection
@TODO: determine computationally determine epsilon
@ -41,7 +47,8 @@ class AnomalyDetection:
"""
def learn(self,data,key,value,features,label):
xo = ML.Filter(key,value,data)
if len(xo) < 100 :
return None
# attr = conf['features']
# label= conf['label']
yo= ML.Extract([label['name']],xo)
@ -55,7 +62,8 @@ class AnomalyDetection:
px = self.gPx(p['mean'],p['cov'],xo['test'])
print self.gPerformance(px,yo['test'])
perf = self.gPerformance(px,yo['test'])
return {"parameters":p,"performance":perf}
def getLabel(self,yo,label_conf):
return [ int(len(set(item) & set(label_conf["1"]))>0) for item in yo ]

@ -0,0 +1,17 @@
import sys
PARAMS = {'context':''}
if len(sys.argv) > 1:
N = len(sys.argv)
for i in range(1,N):
value = None
if sys.argv[i].startswith('--'):
key = sys.argv[i].replace('-','')
if i + 1 < N:
value = sys.argv[i + 1] = sys.argv[i+1].strip()
if key and value:
PARAMS[key] = value
i += 2

@ -0,0 +1,96 @@
import multiprocessing
from utils import transport
import time
import monitor
import sys
"""
This class is intended to collect data given a configuration
"""
class Top(multiprocessing.Process):
def __init__(self,_config,lock):
multiprocessing.Process.__init__(self)
self.lock = lock
self.reader_class = _config['store']['class']['read']
self.write_class = _config['store']['class']['write']
self.rw_args = _config['store']['args']
self.factory = transport.DataSourceFactory()
self.name = 'Zulu-Top'
self.quit = False
print sys.argv
sys.argv[0] = self.name
print sys.argv
# multiprocessing.current_process().name = 'Zulu-Top'
self.exit = multiprocessing.Event()
className = ''.join(['monitor.',_config['monitor']['processes']['class'],'()'])
self.handler = eval(className)
self.config = _config['monitor']['processes']['config']
def stop(self):
self.quit = True
def run(self):
while self.quit == False:
for label in self.config :
self.lock.acquire()
gwriter = self.factory.instance(type=self.write_class,args=self.rw_args)
for app in self.config[label] :
self.handler.init(app)
r = self.handler.composite()
gwriter.write(label=label,row=r)
time.sleep(5)
self.lock.release()
ELLAPSED_TIME = 60*30
time.sleep(ELLAPSED_TIME)
print "Exiting ",self.name
class Learner(multiprocessing.Process) :
"""
This function expects paltform config (store,learner)
It will leverage store and learner in order to operate
"""
def __init__(self,config,lock):
multiprocessing.Process.__init__(self)
self.name='Zulu-Learner'
self.lock = lock
self.reader_class = config['store']['class']['read']
self.write_class = config['store']['class']['write']
self.rw_args = config['store']['args']
self.features = config['learner']['anomalies']['features']
self.yo = config['learner']['anomalies']['label']
self.apps = config['learner']['anomalies']['apps']
self.factory = transport.DataSourceFactory()
"""
This function will initiate learning every (x-hour)
If there is nothing to learn the app will simply go to sleep
"""
def run(self):
reader = self.factory.instance(type=self.reader_class,args=self.rw_args)
data = reader.read()
#
# This is the motherload of innefficiency ...
#
while True:
r = {}
for key in data :
logs = data[key]
r[key] = {}
for app in self.apps:
handler = AnomalyDetection()
r[key][app] = lhandler.learn(data,'label',app,self.features,self.yo)
#
# At this point we've already learnt every thing we need to learn
#
self.lock.aquire()
writer = sef.factory.instance(type.self.write_class,args=self.rw_args)
writer.write('learn',r)
self.lock.release()
TIME_ELLAPSED = 60*120 #-- Every 2 hours
time.sleep(TIME_ELLAPSED)

@ -0,0 +1,14 @@
import os
import json
from utils.workers import *
from utils.params import PARAMS
f = open(PARAMS['path'])
config = json.loads(f.read())
f.close()
from multiprocessing import Lock
lock = Lock()
p = Top(config,lock)
p.daemon = True
p.start()
p.join()
Loading…
Cancel
Save