You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

376 lines
8.5 KiB
Python

"""
This is a RESTful interface implemented using Flask micro framework.
The API is driven by configuration that is organized in terms of the monitoring classes
The API is both restful and websocket/socketio enabled.
We designed the classes to be reusable (and powered by labels):
'monitoring-type':
'class':'<class-name>'
'config':<labeled-class-specific-configuration>'
@TODO:
- In order to make this Saas we need to have the configuration be session driven
- Add socketio, so that each section of the dashboard updates independently
"""
from flask import Flask, session, request, redirect, Response
from flask.templating import render_template
from flask_session import Session
import time
import sys
import os
import json
import re
import monitor
import Queue
from utils.transport import *
from utils.workers import ThreadManager, Factory
from utils.ml import ML,AnomalyDetection,AnalyzeAnomaly
import utils.params as SYS_ARGS
import atexit
app = Flask(__name__)
app.config['SECRET_KEY'] = '!h8-[0v8]247-4-360'
#app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX=?RT'
PARAMS = SYS_ARGS.PARAMS
f = open(PARAMS['path'])
CONFIG = json.loads(f.read())
f.close()
#
#
#from threading import Thread, RLock
p = CONFIG['store']['args']
class_read = CONFIG['store']['class']['read']
class_write= CONFIG['store']['class']['write']
factory = DataSourceFactory()
# gReader = factory.instance(type=class_read,args=p)
@app.route('/1/get/apps')
def get_apps():
r = []
try:
gReader = factory.instance(type=class_read,args=p)
r = gReader.view('summary/app_names',key=p['uid'])
except Exception,e:
print (e)
return json.dumps(r)
@app.route('/1/get/summary/<id>')
def get_summary(id):
r = []
try:
gReader = factory.instance(type=class_read,args=p)
#if id == 'apps_resources' :
# r = gReader.view('summary/app_resources',key=p['uid'])
#else:
# r = gReader.view('summary/folder_size',key=p['uid'])
id='summary/'+id.strip()
print p
print id
r = gReader.view(id,key=p['uid'])
except Exception,e:
print (e)
return json.dumps(r)
@app.route("/1/sys/usage/trend")
def get_usage_trend():
"""
This function returns trends for the 24 most recent observations in the logs
"""
r = {}
try:
gReader = factory.instance(type=class_read,args=p)
r = gReader.view('summary/resource_usage_trend',key=p['uid'])
except Exception,e:
print (e)
return json.dumps(r)
@app.route("/1/app/usage/trend")
def get_usage_detail():
"""
This function returns trends for the 24 most recent observations in the logs
"""
r = {}
try:
gReader = factory.instance(type=class_read,args=p)
r = gReader.view('summary/app_resource_usage_details',key=p['uid'])
except Exception,e:
print (e)
return json.dumps(r)
@app.route('/get/<id>')
def procs(id):
try:
gReader = factory.instance(type=class_read,args=p)
data = gReader.read()
ahandler = AnalyzeAnomaly()
learn = {}
if 'learn' in data :
for row in data['learn'] :
label = row['label']
learn[label] = row
r = {}
for label in data :
if label not in ['learn','folders'] :
index = len(data[label]) - 1
row = data[label][index]
r[label] = row
#
# Let us determine if this is a normal operation or not
# We will update the status of the information ...
#
for row in r[label] :
index = r[label].index(row)
if row['label'] in learn:
id = row['label']
px = ahandler.predict([row],learn[id])
if px :
# row['anomaly'] = px[1]==1
print ""
print label,' *** ',index
row = dict(row,**px)
r[label][index] =row
#
# @TODO:
# Compile a report here that will be sent to the mailing list
#
except Exception, e:
print e
r = []
return json.dumps(r)
"""
This function/endpoint will assess n-virtual environments and return the results
@TODO: Should this be stored for future mining (I don't think so but could be wrong)
"""
@app.route('/sandbox')
def sandbox():
global CONFIG
if 'sandbox' in CONFIG: #CONFIG['monitor']:
#handler = HANDLERS['sandbox']['class']
#conf = HANDLERS['sandbox']['config']
r = []
# p = Factory.instance('sandbox',CONFIG)
handler = monitor.Sandbox()
conf = CONFIG['sandbox']
for id in conf:
try:
handler.init(conf[id])
r.append (dict(handler.composite(),**{"label":id}))
except Exception,e:
pass
else:
r = []
return json.dumps(r)
@app.route('/trends')
def trends ():
id = request.args.get('id')
app = request.args.get('app').strip()
p = CONFIG['store']['args']
class_read = CONFIG['store']['class']['read']
gReader = factory.instance(type=class_read,args=p)
r = gReader.read()
if id in r:
r = r[id] #--matrix
series = []
for row in r:
series += [item for item in row if str(item['label'])== app]
if len(series) > 12 :
beg = len(series) - 8
series = series[beg:]
return json.dumps(series)
else:
return "[]"
@app.route('/download',methods=['POST'])
def requirements():
stream = request.form['missing']
print stream
stream = "\n".join(json.loads(stream))
headers = {"content-disposition":"attachment; filename=requirements.txt"}
return Response(stream,mimetype='text/plain',headers=headers)
@app.route('/dashboard')
def dashboard():
context = PARAMS['context']
if 'title' in PARAMS :
title = PARAMS['title']
else:
title = 'Dashboard'
apps = []
try:
gReader = factory.instance(type=class_read,args=p)
apps = gReader.view('summary/app_names',key=p['uid'])
except Exception, e:
print (e)
return render_template('dashboard.html',context=context,title=title,app_names=apps)
@app.route('/upgrade')
def upgrade():
context = PARAMS['context']
if 'title' in PARAMS :
title = PARAMS['title']
else:
title = 'Upgrade'
return render_template('upgrade.html',context=context,title=title)
@app.route('/user')
def user():
context = PARAMS['context']
if 'title' in PARAMS :
title = PARAMS['title']
else:
title = 'Upgrade'
return render_template('user.html',context=context,title=title)
"""
This function is designed to trigger learning for anomaly detection
@TODO: forward this to a socket i.e non-blocking socket
"""
@app.route('/anomalies/get')
def learn():
global CONFIG
p = CONFIG['store']['args']
class_read = CONFIG['store']['class']['read']
gReader = factory.instance(type=class_read,args=p)
d = gReader.read()
if 'learn' in d :
info = d['learn']
del d['learn']
else :
info = []
r = []
if 'id' in request.args:
id = request.args['id']
d = d[id]
params = {}
for item in info:
label = item['label']
params[label] = item
#apps = list(set(ML.Extract(['label'],d)))
r = []
if params :
#
# If we have parameters available
p = AnomalyDetection()
apps = params.keys()
for name in apps :
if name not in params:
continue
_info = params[name]
try:
xo = ML.Filter('label',name,d)
except Exception,e:
xo = []
#print name,e
if len(xo) == 0:
continue
xo = [xo[ len(xo) -1]]
value = p.predict(xo,_info)[0]
if len(value):
report = dict(_info,**{'predicton':value})
r.append(report)
#print app,value
#if value is not None:
# r.append(value)
return json.dumps(r)
"""
This function returns anomalies for a given context or group of processes
The information returned is around precision/recall and f-score and parameters
"""
@app.route('/anomalies/status')
def anomalies_status():
global CONFIG
p = CONFIG['store']['args']
class_read = CONFIG['store']['class']['read']
gReader = factory.instance(type=class_read,args=p)
d = gReader.read()
if 'learn' in d :
info = d['learn']
del d['learn']
else :
info = []
print info
r = []
if 'id' in request.args:
id = request.args['id']
r = info
return json.dumps(r)
@app.route('/folders')
def get_folders():
global CONFIG
p = CONFIG['store']['args']
class_read = CONFIG['store']['class']['read']
gReader = factory.instance(type=class_read,args=p)
d = gReader.read()
if 'folders' in d:
d = d['folders']
hosts = set([row[0]['id'] for row in d])
m = {}
for id in hosts:
for row in d:
if id == row[0]['id'] :
m[id] = row
d = m.values()
for row in d:
print row[0]['id']
# index = len(d) - 1
# d = d[index]
# m = {}
# for row in d :
# key = row.keys()[0]
# row = row[key]
# if key not in m:
# r.append(row)
# m[key] = len(r) -1
# else:
# index = m[key]
# r[index] = row
# d = r
else:
d = []
return json.dumps(d)
if __name__== '__main__':
# ThreadManager.start(CONFIG)
if 'port' not in SYS_ARGS.PARAMS :
SYS_ARGS.PARAMS['port'] = 8484
PORT = int(SYS_ARGS.PARAMS['port'])
app.run(host='0.0.0.0' ,port=PORT,debug=True,threaded=True)