You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

451 lines
11 KiB
Python

"""
This is a RESTful interface implemented using Flask micro framework.
The API is driven by configuration that is organized in terms of the monitoring classes
The API is both restful and websocket/socketio enabled.
We designed the classes to be reusable (and powered by labels):
'monitoring-type':
'class':'<class-name>'
'config':<labeled-class-specific-configuration>'
@TODO:
- In order to make this Saas we need to have the configuration be session driven
- Add socketio, so that each section of the dashboard updates independently
"""
from flask import Flask, session, request, redirect, Response
from flask.templating import render_template
from flask_session import Session
import time
import sys
import os
import json
import re
import monitor
import Queue
from utils.transport import *
from utils.workers import ThreadManager, Factory
from utils.ml import ML,AnomalyDetection,AnalyzeAnomaly
import utils.params as SYS_ARGS
import atexit
app = Flask(__name__)
app.config['SECRET_KEY'] = '!h8-[0v8]247-4-360'
#app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX=?RT'
PARAMS = SYS_ARGS.PARAMS
f = open(PARAMS['path'])
CONFIG = json.loads(f.read())
f.close()
#
#
#from threading import Thread, RLock
p = CONFIG['store']['args']
class_read = CONFIG['store']['class']['read']
class_write= CONFIG['store']['class']['write']
factory = DataSourceFactory()
# gReader = factory.instance(type=class_read,args=p)
@app.route('/')
def home():
context = PARAMS['context']
if 'title' in PARAMS :
title = PARAMS['title']
else:
title = 'Dashboard'
apps = []
try:
gReader = factory.instance(type=class_read,args=p)
apps = gReader.view('summary/nodes',key=p['uid'])
except Exception, e:
print (e)
return render_template('dashboard.html',context=context,title=title,app_names=apps)
@app.route('/1/get/nodes')
def get_nodes():
"""
This function returns the labels of applications for every node registered
@param None
e.g: apps@zulu.org
"""
r = []
try:
gReader = factory.instance(type=class_read,args=p)
r = gReader.view('summary/nodes',key=p['uid'])
except Exception,e:
print (e)
return json.dumps(r)
@app.route('/1/get/apps')
def get_apps():
"""
This function returns the applications for a given node
@param node identifier e.g: apps@zulu.org
"""
r = []
try:
node_id = request.args.get('node')
gReader = factory.instance(type=class_read,args=p)
r = gReader.view('summary/app_names',key=p['uid'])
r = r[node_id]
print r
except Exception,e:
print (e)
return json.dumps(r)
@app.route('/1/get/summary/<id>')
def get_summary(id):
"""
This function returns the summary i.e an overall assessment of resource usage
It will pull information out of the user's data-store (database & document) specified in the configuration
@param id {app_resources|app_status|folder_size}
"""
r = []
try:
gReader = factory.instance(type=class_read,args=p)
#if id == 'apps_resources' :
# r = gReader.view('summary/app_resources',key=p['uid'])
#else:
# r = gReader.view('summary/folder_size',key=p['uid'])
id='summary/'+id.strip()
print p
print id
r = gReader.view(id,key=p['uid'])
except Exception,e:
print (e)
return json.dumps(r)
@app.route("/1/sys/usage/trend")
def get_usage_trend():
"""
This function returns cpu/memory usage for the entire system being monitored. It will return the 24 most recent observations in the logs
@param None
@return {memory_usage:[],cpu_usage:[],app_count:value,memory_available:[]}
"""
r = {}
try:
gReader = factory.instance(type=class_read,args=p)
r = gReader.view('summary/resource_usage_trend',key=p['uid'])
except Exception,e:
print (e)
return json.dumps(r)
@app.route("/1/app/usage/trend")
def get_usage_detail():
"""
This function returns detailed information about usage per application monitored. It will return the 24 most recent observations in the logs
@param node node identifier e.g: apps@zulu.io
@return {node_x:{app_1:{memory_usage:[],cpu_usage:[]}},...}
"""
r = {}
try:
id = request.args.get('node')
app_id = request.args.get('app')
gReader = factory.instance(type=class_read,args=p)
r = gReader.view('summary/app_resource_usage_details',key=p['uid'])
r = r[id][app_id]
except Exception,e:
print (e)
return json.dumps(r)
@app.route('/1/app/status')
def app_status() :
"""
This function aggregates the number of crashes/running/idle instances found in the past 24 log entries
@param nid node identifier e.g: app@zulu.io
@param aid application identifier e.g: kate, firefox, chrome ... specified in the configuraiton
"""
r = []
try:
nid = request.args.get('node') # Node identifier
aid = request.args.get('app') # application identifier
gReader = factory.instance(type=class_read,args=p)
r = gReader.view('summary/app_status_details',key=p['uid'])
#
#@TODO: Once the back-end enables the nodes in which the application is running, uncomment the line below
#
r = r[nid][aid]
except Exception,e:
print e
return json.dumps(r)
#@app.route('/get/<id>')
#def procs(id):
#try:
#gReader = factory.instance(type=class_read,args=p)
#data = gReader.read()
#ahandler = AnalyzeAnomaly()
#learn = {}
#if 'learn' in data :
#for row in data['learn'] :
#label = row['label']
#learn[label] = row
#r = {}
#for label in data :
#if label not in ['learn','folders'] :
#index = len(data[label]) - 1
#row = data[label][index]
#r[label] = row
##
## Let us determine if this is a normal operation or not
## We will update the status of the information ...
##
#for row in r[label] :
#index = r[label].index(row)
#if row['label'] in learn:
#id = row['label']
#px = ahandler.predict([row],learn[id])
#if px :
## row['anomaly'] = px[1]==1
#print ""
#print label,' *** ',index
#row = dict(row,**px)
#r[label][index] =row
##
## @TODO:
## Compile a report here that will be sent to the mailing list
##
#except Exception, e:
#print e
#r = []
#return json.dumps(r)
"""
This function/endpoint will assess n-virtual environments and return the results
@TODO: Should this be stored for future mining (I don't think so but could be wrong)
"""
@app.route('/sandbox')
def sandbox():
global CONFIG
if 'sandbox' in CONFIG: #CONFIG['monitor']:
#handler = HANDLERS['sandbox']['class']
#conf = HANDLERS['sandbox']['config']
r = []
# p = Factory.instance('sandbox',CONFIG)
handler = monitor.Sandbox()
conf = CONFIG['sandbox']
for id in conf:
try:
handler.init(conf[id])
r.append (dict(handler.composite(),**{"label":id}))
except Exception,e:
pass
else:
r = []
return json.dumps(r)
#@app.route('/trends')
#def trends ():
#id = request.args.get('id')
#app = request.args.get('app').strip()
#p = CONFIG['store']['args']
#class_read = CONFIG['store']['class']['read']
#gReader = factory.instance(type=class_read,args=p)
#r = gReader.read()
#if id in r:
#r = r[id] #--matrix
#series = []
#for row in r:
#series += [item for item in row if str(item['label'])== app]
#if len(series) > 12 :
#beg = len(series) - 8
#series = series[beg:]
#return json.dumps(series)
#else:
#return "[]"
#@app.route('/download',methods=['POST'])
#def requirements():
#stream = request.form['missing']
#print stream
#stream = "\n".join(json.loads(stream))
#headers = {"content-disposition":"attachment; filename=requirements.txt"}
#return Response(stream,mimetype='text/plain',headers=headers)
@app.route('/dashboard')
def dashboard():
context = PARAMS['context']
if 'title' in PARAMS :
title = PARAMS['title']
else:
title = 'Dashboard'
apps = []
try:
gReader = factory.instance(type=class_read,args=p)
apps = gReader.view('summary/app_names',key=p['uid'])
except Exception, e:
print (e)
return render_template('dashboard.html',context=context,title=title,app_names=apps)
@app.route('/upgrade')
def upgrade():
context = PARAMS['context']
if 'title' in PARAMS :
title = PARAMS['title']
else:
title = 'Upgrade'
return render_template('upgrade.html',context=context,title=title)
@app.route('/user')
def user():
context = PARAMS['context']
if 'title' in PARAMS :
title = PARAMS['title']
else:
title = 'Upgrade'
return render_template('user.html',context=context,title=title)
#"""
#This function is designed to trigger learning for anomaly detection
#@TODO: forward this to a socket i.e non-blocking socket
#"""
#@app.route('/anomalies/get')
#def learn():
#global CONFIG
#p = CONFIG['store']['args']
#class_read = CONFIG['store']['class']['read']
#gReader = factory.instance(type=class_read,args=p)
#d = gReader.read()
#if 'learn' in d :
#info = d['learn']
#del d['learn']
#else :
#info = []
#r = []
#if 'id' in request.args:
#id = request.args['id']
#d = d[id]
#params = {}
#for item in info:
#label = item['label']
#params[label] = item
##apps = list(set(ML.Extract(['label'],d)))
#r = []
#if params :
##
## If we have parameters available
#p = AnomalyDetection()
#apps = params.keys()
#for name in apps :
#if name not in params:
#continue
#_info = params[name]
#try:
#xo = ML.Filter('label',name,d)
#except Exception,e:
#xo = []
##print name,e
#if len(xo) == 0:
#continue
#xo = [xo[ len(xo) -1]]
#value = p.predict(xo,_info)[0]
#if len(value):
#report = dict(_info,**{'predicton':value})
#r.append(report)
##print app,value
##if value is not None:
## r.append(value)
#return json.dumps(r)
"""
This function returns anomalies for a given context or group of processes
The information returned is around precision/recall and f-score and parameters
"""
#@app.route('/anomalies/status')
#def anomalies_status():
#global CONFIG
#p = CONFIG['store']['args']
#class_read = CONFIG['store']['class']['read']
#gReader = factory.instance(type=class_read,args=p)
#d = gReader.read()
#if 'learn' in d :
#info = d['learn']
#del d['learn']
#else :
#info = []
#print info
#r = []
#if 'id' in request.args:
#id = request.args['id']
#r = info
#return json.dumps(r)
#@app.route('/folders')
#def get_folders():
#global CONFIG
#p = CONFIG['store']['args']
#class_read = CONFIG['store']['class']['read']
#gReader = factory.instance(type=class_read,args=p)
#d = gReader.read()
#if 'folders' in d:
#d = d['folders']
#hosts = set([row[0]['id'] for row in d])
#m = {}
#for id in hosts:
#for row in d:
#if id == row[0]['id'] :
#m[id] = row
#d = m.values()
#for row in d:
#print row[0]['id']
## index = len(d) - 1
## d = d[index]
## m = {}
## for row in d :
## key = row.keys()[0]
## row = row[key]
## if key not in m:
## r.append(row)
## m[key] = len(r) -1
## else:
## index = m[key]
## r[index] = row
## d = r
#else:
#d = []
#return json.dumps(d)
if __name__== '__main__':
# ThreadManager.start(CONFIG)
if 'port' not in SYS_ARGS.PARAMS :
SYS_ARGS.PARAMS['port'] = 8484
PORT = int(SYS_ARGS.PARAMS['port'])
app.run(host='0.0.0.0' ,port=PORT,debug=True,threaded=True)