You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			182 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			182 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
"""
 | 
						|
	This is a RESTful interface implemented using Flask micro framework.
 | 
						|
	The API is driven by configuration that is organized in terms of the monitoring classes
 | 
						|
	
 | 
						|
	The API is both restful and websocket/socketio enabled.
 | 
						|
 | 
						|
	We designed the classes to be reusable (and powered by labels):
 | 
						|
	'monitoring-type':
 | 
						|
		'class':'<class-name>'
 | 
						|
		'config':<labeled-class-specific-configuration>'
 | 
						|
"""
 | 
						|
 | 
						|
from flask import Flask, session, request, redirect, Response
 | 
						|
from flask.templating import render_template
 | 
						|
 | 
						|
from flask_session import Session
 | 
						|
import time
 | 
						|
import sys
 | 
						|
import os
 | 
						|
import json
 | 
						|
import re
 | 
						|
import monitor
 | 
						|
import Queue
 | 
						|
from utils.transport import *
 | 
						|
from utils.workers import ThreadManager, Factory
 | 
						|
from utils.ml import ML,AnomalyDetection
 | 
						|
import utils.params as SYS_ARGS
 | 
						|
import atexit
 | 
						|
 | 
						|
app = Flask(__name__)
 | 
						|
app.config['SECRET_KEY'] = '!h8-[0v8]247-4-360'
 | 
						|
#app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX=?RT'
 | 
						|
 | 
						|
PARAMS = SYS_ARGS.PARAMS
 | 
						|
f = open(PARAMS['path'])
 | 
						|
CONFIG 	= json.loads(f.read())
 | 
						|
f.close()
 | 
						|
 | 
						|
#
 | 
						|
#
 | 
						|
#from threading import Thread, RLock
 | 
						|
p = CONFIG['store']['args']
 | 
						|
class_read = CONFIG['store']['class']['read']
 | 
						|
class_write= CONFIG['store']['class']['write']
 | 
						|
factory = DataSourceFactory()
 | 
						|
gReader = factory.instance(type=class_read,args=p)
 | 
						|
 | 
						|
atexit.register(ThreadManager.stop)
 | 
						|
@app.route('/get/<id>')
 | 
						|
def procs(id):	
 | 
						|
	try:
 | 
						|
		gReader = factory.instance(type=class_read,args=p)
 | 
						|
		d =  gReader.read()
 | 
						|
		
 | 
						|
		r = {}
 | 
						|
		for label in d :
 | 
						|
			if label not in ['learn'] :
 | 
						|
				index = len(d[label]) - 1
 | 
						|
				r[label] = d[label][index]
 | 
						|
				#for row in r[label] :
 | 
						|
					#yo = ML.Extract(['status'],row)
 | 
						|
					#xo = ML.Extract(['cpu_usage','memory_usage'],row)
 | 
						|
					
 | 
						|
	except Exception, e:
 | 
						|
		print e
 | 
						|
		r = []
 | 
						|
	return json.dumps(r)
 | 
						|
 | 
						|
"""
 | 
						|
	This function/endpoint will assess n-virtual environments and return the results
 | 
						|
	@TODO: Should this be stored for future mining (I don't think so but could be wrong)
 | 
						|
"""
 | 
						|
@app.route('/sandbox')
 | 
						|
def sandbox():
 | 
						|
	global CONFIG
 | 
						|
	
 | 
						|
	if 'sandbox' in CONFIG['monitor']:
 | 
						|
		#handler = HANDLERS['sandbox']['class']
 | 
						|
		#conf = HANDLERS['sandbox']['config']
 | 
						|
		r = []
 | 
						|
		p = Factory.instance('sandbox',CONFIG)
 | 
						|
		handler = p['class']
 | 
						|
		conf	= p['config']
 | 
						|
		
 | 
						|
		for id in conf:
 | 
						|
			handler.init(conf[id])
 | 
						|
			r.append (dict(handler.composite(),**{"label":id}))
 | 
						|
	else:
 | 
						|
		
 | 
						|
		r = []
 | 
						|
 | 
						|
 | 
						|
	return json.dumps(r)
 | 
						|
@app.route('/trends') 
 | 
						|
def trends ():
 | 
						|
	id = request.args.get('id')
 | 
						|
	app = request.args.get('app').strip()
 | 
						|
	p = CONFIG['store']['args']
 | 
						|
	class_read = CONFIG['store']['class']['read']
 | 
						|
 | 
						|
	
 | 
						|
	gReader = factory.instance(type=class_read,args=p)
 | 
						|
	
 | 
						|
	r = gReader.read()
 | 
						|
	if id in r:
 | 
						|
		r = r[id] #--matrix
 | 
						|
		series = []
 | 
						|
 | 
						|
		for row in r:
 | 
						|
			
 | 
						|
			series += [item for item in row if str(item['label'])== app]
 | 
						|
		if len(series) > 12 :
 | 
						|
			beg = len(series) - 13
 | 
						|
			series = series[beg:]
 | 
						|
		return json.dumps(series)
 | 
						|
	else:
 | 
						|
		return "[]"
 | 
						|
@app.route('/download',methods=['POST'])
 | 
						|
def requirements():
 | 
						|
	stream = request.form['missing']
 | 
						|
	
 | 
						|
	stream = "\n".join(json.loads(stream))
 | 
						|
	headers = {"content-disposition":"attachment; filename=requirements.txt"}
 | 
						|
	return Response(stream,mimetype='text/plain',headers=headers)
 | 
						|
@app.route('/dashboard')
 | 
						|
def dashboard():
 | 
						|
	context = PARAMS['context']
 | 
						|
	return render_template('dashboard.html',context=context)
 | 
						|
 | 
						|
"""
 | 
						|
	This function is designed to trigger learning for anomaly detection
 | 
						|
	@TODO: forward this to a socket i.e non-blocking socket
 | 
						|
"""
 | 
						|
@app.route('/learn')
 | 
						|
def learn():
 | 
						|
	global CONFIG
 | 
						|
	p = CONFIG['store']['args']
 | 
						|
	class_read = CONFIG['store']['class']['read']	
 | 
						|
	gReader = factory.instance(type=class_read,args=p)
 | 
						|
	d =  gReader.read()
 | 
						|
	if 'learn' in d :
 | 
						|
		logs = d['learn']
 | 
						|
		del d['learn']
 | 
						|
	else :
 | 
						|
		logs = []
 | 
						|
	r = []
 | 
						|
	if 'id' in request.args:
 | 
						|
		id = request.args['id']
 | 
						|
		d = d[id]
 | 
						|
		print CONFIG['monitor']['processes']['config'][id]
 | 
						|
		print (apps)
 | 
						|
		
 | 
						|
		
 | 
						|
		#apps = list(set(ML.Extract(['label'],d)))
 | 
						|
		p = AnomalyDetection()
 | 
						|
		#for row in d :
 | 
						|
			#xo = ML.Filter('label',app,d)
 | 
						|
			#info = ML.Filter('label',app,logs)	
 | 
						|
			#value = p.predict(xo,info)
 | 
						|
			#print app,value
 | 
						|
			#if value is not None:
 | 
						|
			#	r.append(value)
 | 
						|
	print r
 | 
						|
	return json.dumps("[]")
 | 
						|
		
 | 
						|
 | 
						|
 | 
						|
@app.route('/anomalies/status')
 | 
						|
def anomalies_status():
 | 
						|
	pass
 | 
						|
 | 
						|
@app.route('/anomalies/get')
 | 
						|
def anomalies_get():
 | 
						|
	pass
 | 
						|
	
 | 
						|
if __name__== '__main__':
 | 
						|
	
 | 
						|
	#ThreadManager.start(CONFIG)	
 | 
						|
	app.run(host='0.0.0.0',debug=True,threaded=True)
 | 
						|
 | 
						|
	
 |