diff --git a/src/api/index.py b/src/api/index.py index 54a2625..81e5c42 100644 --- a/src/api/index.py +++ b/src/api/index.py @@ -1,14 +1,18 @@ """ This is a RESTful interface implemented using Flask micro framework. The API is driven by configuration that is organized in terms of the monitoring classes + + The API is both restful and websocket/socketio enabled. We designed the classes to be reusable (and powered by labels): 'monitoring-type': 'class':'' 'config':' """ + from flask import Flask, session, request, redirect, Response from flask.templating import render_template + from flask_session import Session import time import sys @@ -18,6 +22,7 @@ import re import monitor import Queue from utils.transport import * + PARAMS = {'context':''} if len(sys.argv) > 1: @@ -36,6 +41,10 @@ if len(sys.argv) > 1: app = Flask(__name__) +app.config['SECRET_KEY'] = '!h8-[0v8]247-4-360' +#app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX=?RT' + + f = open(PARAMS['path']) CONFIG = json.loads(f.read()) HANDLERS= {} @@ -151,12 +160,18 @@ def learn(): r = ML.Filter('label',app,r) label = ML.Extract(['status'],r) r = ML.Extract(['cpu_usage','memory_usage'],r) + +@app.route('/anomalies/status') +def anomalies_status(): + pass + +@app.route('/anomalies/get') +def anomalies_get(): + pass if __name__== '__main__': mthread.start() - - app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX=?RT' app.run(host='0.0.0.0',debug=True,threaded=True) diff --git a/src/api/ml-index.py b/src/api/ml-index.py new file mode 100644 index 0000000..866b442 --- /dev/null +++ b/src/api/ml-index.py @@ -0,0 +1,9 @@ +from flask import Flask, render_template +from flask_socketio import SocketIO + +app = Flask(__name__) +app.config['SECRET_KEY'] = '[0v8-247]-4qdm-h8r5!' +socketio = SocketIO(app) + +if __name__ == '__main__': + socketio.run(app) \ No newline at end of file diff --git a/src/api/static/index.html b/src/api/static/index.html index 24836c5..44b668b 100644 --- a/src/api/static/index.html +++ b/src/api/static/index.html @@ -1,5 +1,6 @@ + diff --git a/src/api/static/js/dashboard.js b/src/api/static/js/dashboard.js index db2a843..6bf0772 100644 --- a/src/api/static/js/dashboard.js +++ b/src/api/static/js/dashboard.js @@ -334,3 +334,7 @@ monitor.sandbox.render = function (logs) { jx.dom.show('inspect_sandbox') } + +/** + * Socket handler, check for learning status + */ \ No newline at end of file diff --git a/src/api/templates/dashboard.html b/src/api/templates/dashboard.html index 6ba9e2a..a3a1d8d 100644 --- a/src/api/templates/dashboard.html +++ b/src/api/templates/dashboard.html @@ -26,7 +26,9 @@ - +
+ {% include "prediction.html" %} +
diff --git a/src/api/templates/prediction.html b/src/api/templates/prediction.html index ff38447..8852ad3 100644 --- a/src/api/templates/prediction.html +++ b/src/api/templates/prediction.html @@ -1,31 +1,29 @@ + -
- -
-
+
Accuracy
-
+
00
Precision
-
+
00
Recall
-
+
00
diff --git a/src/monitor.py b/src/monitor.py index d0b0eb3..4343ee5 100755 --- a/src/monitor.py +++ b/src/monitor.py @@ -221,68 +221,24 @@ class Monitor (Thread): self.keep_running = True lock = RLock() while self.keep_running: - + lock.acquire() for label in self.mconfig: - lock.acquire() + self.handler.init(self.mconfig[label]) r = self.handler.composite() self.writer.write(label=label,row = r) - lock.release() + time.sleep(2) - + lock.release() self.prune() HALF_HOUR = 60*25 time.sleep(HALF_HOUR) print "Stopped ..." def prune(self) : + MAX_ENTRIES = 100 if len(self.logs) > MAX_ENTRIES : BEG = len(self.logs) - MAX_SIZE -1 self.logs = self.logs[BEG:] -class mapreducer: - def __init__(self): - self.store = {} - def filter (self,key,dataset): - return [row[key] for row in dataset if key in row] - def run(self,dataset,mapper,reducer): - r = None - if mapper is not None: - if isinstance(dataset,list) : - [mapper(row,self.emit) for row in dataset] - - - if reducer is not None: - r = self.store - # r = [reducer(self.store[key]) for key in self.store] - else: - r = self.store - return r - def mapper(self,row,emit): - [emit(_matrix['label'],_matrix) for _matrix in row ] - def reducer(self,values): - beg = len(values)-101 if len(values) > 100 else 0 - return values[beg:] - - def emit(self,key,content): - if key not in self.store: - self.store[key] = [] - self.store[key].append(content) -# # -# # We need to generate the appropriate dataset here -# # map/reduce is a well documented technique for generating datasets -# # -# def map(self,key,id,rows): - -# #r = [row[key] for row in rows if key in row] -# for row in rows: -# if key in row : -# for xr in row[key]: -# self.emit(xr['label'],xr) - - - -# def reduce(keys,values): -# print values[0] -# return r \ No newline at end of file diff --git a/src/utils/ml.py b/src/utils/ml.py index 8125dbf..2bd0a44 100644 --- a/src/utils/ml.py +++ b/src/utils/ml.py @@ -1,6 +1,10 @@ """ This file is intended to perfom certain machine learning tasks based on numpy We are trying to keep it lean that's why no sklearn involved yet + + @TODO: + Create factory method for the learners implemented here + Improve preconditions (size of the dataset, labels) """ from __future__ import division import numpy as np @@ -16,6 +20,8 @@ class ML: @staticmethod def Extract(lattr,data): return [[row[id] for id in lattr] for row in data] + + """ Implements a multivariate anomaly detection @TODO: determine computationally determine epsilon @@ -41,7 +47,8 @@ class AnomalyDetection: """ def learn(self,data,key,value,features,label): xo = ML.Filter(key,value,data) - + if len(xo) < 100 : + return None # attr = conf['features'] # label= conf['label'] yo= ML.Extract([label['name']],xo) @@ -55,7 +62,8 @@ class AnomalyDetection: px = self.gPx(p['mean'],p['cov'],xo['test']) - print self.gPerformance(px,yo['test']) + perf = self.gPerformance(px,yo['test']) + return {"parameters":p,"performance":perf} def getLabel(self,yo,label_conf): return [ int(len(set(item) & set(label_conf["1"]))>0) for item in yo ] diff --git a/src/utils/params.py b/src/utils/params.py new file mode 100644 index 0000000..72b67a0 --- /dev/null +++ b/src/utils/params.py @@ -0,0 +1,17 @@ +import sys +PARAMS = {'context':''} +if len(sys.argv) > 1: + + N = len(sys.argv) + for i in range(1,N): + value = None + if sys.argv[i].startswith('--'): + key = sys.argv[i].replace('-','') + + if i + 1 < N: + value = sys.argv[i + 1] = sys.argv[i+1].strip() + if key and value: + PARAMS[key] = value + + i += 2 + diff --git a/src/utils/workers.py b/src/utils/workers.py new file mode 100644 index 0000000..eed5a89 --- /dev/null +++ b/src/utils/workers.py @@ -0,0 +1,96 @@ +import multiprocessing +from utils import transport +import time +import monitor +import sys +""" + This class is intended to collect data given a configuration + +""" +class Top(multiprocessing.Process): + def __init__(self,_config,lock): + multiprocessing.Process.__init__(self) + self.lock = lock + self.reader_class = _config['store']['class']['read'] + self.write_class = _config['store']['class']['write'] + self.rw_args = _config['store']['args'] + self.factory = transport.DataSourceFactory() + + self.name = 'Zulu-Top' + self.quit = False + print sys.argv + sys.argv[0] = self.name + print sys.argv + # multiprocessing.current_process().name = 'Zulu-Top' + self.exit = multiprocessing.Event() + + + className = ''.join(['monitor.',_config['monitor']['processes']['class'],'()']) + self.handler = eval(className) + self.config = _config['monitor']['processes']['config'] + def stop(self): + self.quit = True + def run(self): + while self.quit == False: + for label in self.config : + self.lock.acquire() + gwriter = self.factory.instance(type=self.write_class,args=self.rw_args) + for app in self.config[label] : + self.handler.init(app) + r = self.handler.composite() + gwriter.write(label=label,row=r) + time.sleep(5) + self.lock.release() + ELLAPSED_TIME = 60*30 + time.sleep(ELLAPSED_TIME) + print "Exiting ",self.name + +class Learner(multiprocessing.Process) : + + """ + This function expects paltform config (store,learner) + It will leverage store and learner in order to operate + """ + def __init__(self,config,lock): + multiprocessing.Process.__init__(self) + self.name='Zulu-Learner' + self.lock = lock + self.reader_class = config['store']['class']['read'] + self.write_class = config['store']['class']['write'] + self.rw_args = config['store']['args'] + self.features = config['learner']['anomalies']['features'] + self.yo = config['learner']['anomalies']['label'] + self.apps = config['learner']['anomalies']['apps'] + self.factory = transport.DataSourceFactory() + """ + This function will initiate learning every (x-hour) + If there is nothing to learn the app will simply go to sleep + """ + def run(self): + reader = self.factory.instance(type=self.reader_class,args=self.rw_args) + data = reader.read() + # + # This is the motherload of innefficiency ... + # + while True: + r = {} + for key in data : + logs = data[key] + r[key] = {} + for app in self.apps: + handler = AnomalyDetection() + r[key][app] = lhandler.learn(data,'label',app,self.features,self.yo) + # + # At this point we've already learnt every thing we need to learn + # + self.lock.aquire() + writer = sef.factory.instance(type.self.write_class,args=self.rw_args) + writer.write('learn',r) + self.lock.release() + + TIME_ELLAPSED = 60*120 #-- Every 2 hours + time.sleep(TIME_ELLAPSED) + + + + diff --git a/src/utils/zulu-learn/__init__.py b/src/utils/zulu-learn/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/utils/zulu-learn/__init__.py @@ -0,0 +1 @@ + diff --git a/src/utils/zulu-learn/__main__.py b/src/utils/zulu-learn/__main__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/utils/zulu-learn/__main__.py @@ -0,0 +1 @@ + diff --git a/src/utils/zulu-top/__main__.py b/src/utils/zulu-top/__main__.py new file mode 100644 index 0000000..800db99 --- /dev/null +++ b/src/utils/zulu-top/__main__.py @@ -0,0 +1,14 @@ + +import os +import json +from utils.workers import * +from utils.params import PARAMS +f = open(PARAMS['path']) +config = json.loads(f.read()) +f.close() +from multiprocessing import Lock +lock = Lock() +p = Top(config,lock) +p.daemon = True +p.start() +p.join()