You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			296 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			296 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Python
		
	
"""
 | 
						|
	This is a RESTful interface implemented using Flask micro framework.
 | 
						|
	The API is driven by configuration that is organized in terms of the monitoring classes
 | 
						|
	
 | 
						|
	The API is both restful and websocket/socketio enabled.
 | 
						|
 | 
						|
	We designed the classes to be reusable (and powered by labels):
 | 
						|
	'monitoring-type':
 | 
						|
		'class':'<class-name>'
 | 
						|
		'config':<labeled-class-specific-configuration>'
 | 
						|
"""
 | 
						|
 | 
						|
from flask import Flask, session, request, redirect, Response
 | 
						|
from flask.templating import render_template
 | 
						|
 | 
						|
from flask_session import Session
 | 
						|
import time
 | 
						|
import sys
 | 
						|
import os
 | 
						|
import json
 | 
						|
import re
 | 
						|
import monitor
 | 
						|
import Queue
 | 
						|
from utils.transport import *
 | 
						|
from utils.workers import ThreadManager, Factory
 | 
						|
from utils.ml import ML,AnomalyDetection,AnalyzeAnomaly
 | 
						|
import utils.params as SYS_ARGS
 | 
						|
import atexit
 | 
						|
 | 
						|
app = Flask(__name__)
 | 
						|
app.config['SECRET_KEY'] = '!h8-[0v8]247-4-360'
 | 
						|
#app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX=?RT'
 | 
						|
 | 
						|
PARAMS = SYS_ARGS.PARAMS
 | 
						|
f = open(PARAMS['path'])
 | 
						|
CONFIG 	= json.loads(f.read())
 | 
						|
f.close()
 | 
						|
 | 
						|
#
 | 
						|
#
 | 
						|
#from threading import Thread, RLock
 | 
						|
p = CONFIG['store']['args']
 | 
						|
class_read = CONFIG['store']['class']['read']
 | 
						|
class_write= CONFIG['store']['class']['write']
 | 
						|
factory = DataSourceFactory()
 | 
						|
# gReader = factory.instance(type=class_read,args=p)
 | 
						|
 | 
						|
atexit.register(ThreadManager.stop)
 | 
						|
@app.route('/get/<id>')
 | 
						|
def procs(id):	
 | 
						|
	try:
 | 
						|
		gReader = factory.instance(type=class_read,args=p)
 | 
						|
		data =  gReader.read()
 | 
						|
		ahandler = AnalyzeAnomaly()
 | 
						|
		learn = {}
 | 
						|
		if 'learn' in data :
 | 
						|
			for row in data['learn'] :
 | 
						|
				label = row['label']
 | 
						|
				learn[label] = row
 | 
						|
		r = {}
 | 
						|
		for label in data :
 | 
						|
			if label not in ['learn','folders'] :
 | 
						|
				index = len(data[label]) - 1
 | 
						|
				row = data[label][index]
 | 
						|
				r[label] = row
 | 
						|
				#
 | 
						|
				# Let us determine if this is a normal operation or not
 | 
						|
				# We will update the status of the information ...
 | 
						|
				#
 | 
						|
				
 | 
						|
				for row in r[label] :	
 | 
						|
					index = r[label].index(row)			
 | 
						|
					if row['label'] in learn:
 | 
						|
						id = row['label']
 | 
						|
						px = ahandler.predict([row],learn[id])
 | 
						|
						if px :
 | 
						|
							
 | 
						|
							# row['anomaly'] = px[1]==1
 | 
						|
							print ""
 | 
						|
							print label,' *** ',index
 | 
						|
							row = dict(row,**px)
 | 
						|
							r[label][index] =row
 | 
						|
							#
 | 
						|
							# @TODO:
 | 
						|
							# Compile a report here that will be sent to the mailing list
 | 
						|
							# 
 | 
						|
					
 | 
						|
	except Exception, e:
 | 
						|
		print e
 | 
						|
		r = []
 | 
						|
	
 | 
						|
	return json.dumps(r)
 | 
						|
 | 
						|
"""
 | 
						|
	This function/endpoint will assess n-virtual environments and return the results
 | 
						|
	@TODO: Should this be stored for future mining (I don't think so but could be wrong)
 | 
						|
"""
 | 
						|
@app.route('/sandbox')
 | 
						|
def sandbox():
 | 
						|
	global CONFIG
 | 
						|
	
 | 
						|
	if 'sandbox' in CONFIG: #CONFIG['monitor']:
 | 
						|
		#handler = HANDLERS['sandbox']['class']
 | 
						|
		#conf = HANDLERS['sandbox']['config']
 | 
						|
		r = []
 | 
						|
		# p = Factory.instance('sandbox',CONFIG)
 | 
						|
		handler = monitor.Sandbox()
 | 
						|
		conf	= CONFIG['sandbox']
 | 
						|
		
 | 
						|
		for id in conf:
 | 
						|
			try:
 | 
						|
				handler.init(conf[id])
 | 
						|
				r.append (dict(handler.composite(),**{"label":id}))
 | 
						|
			except Exception,e:
 | 
						|
				pass
 | 
						|
	else:
 | 
						|
		
 | 
						|
		r = []
 | 
						|
 | 
						|
 | 
						|
	return json.dumps(r)
 | 
						|
@app.route('/trends') 
 | 
						|
def trends ():
 | 
						|
	id = request.args.get('id')
 | 
						|
	app = request.args.get('app').strip()
 | 
						|
	p = CONFIG['store']['args']
 | 
						|
	class_read = CONFIG['store']['class']['read']
 | 
						|
 | 
						|
	
 | 
						|
	gReader = factory.instance(type=class_read,args=p)	
 | 
						|
	r = gReader.read()
 | 
						|
	if id in r:
 | 
						|
		r = r[id] #--matrix
 | 
						|
		series = []
 | 
						|
 | 
						|
		for row in r:
 | 
						|
			
 | 
						|
			series += [item for item in row if str(item['label'])== app]
 | 
						|
		if len(series) > 12 :
 | 
						|
			beg = len(series) - 13
 | 
						|
			series = series[beg:]
 | 
						|
		return json.dumps(series)
 | 
						|
	else:
 | 
						|
		return "[]"
 | 
						|
@app.route('/download',methods=['POST'])
 | 
						|
def requirements():
 | 
						|
	stream = request.form['missing']
 | 
						|
	print stream
 | 
						|
	stream = "\n".join(json.loads(stream))
 | 
						|
	headers = {"content-disposition":"attachment; filename=requirements.txt"}
 | 
						|
	return Response(stream,mimetype='text/plain',headers=headers)
 | 
						|
@app.route('/dashboard')
 | 
						|
def dashboard():
 | 
						|
	context = PARAMS['context']
 | 
						|
	if 'title' in PARAMS :
 | 
						|
		title = PARAMS['title']
 | 
						|
	else:
 | 
						|
		title = 'Zulu OverWatch'
 | 
						|
	return render_template('dashboard.html',context=context,title=title)
 | 
						|
 | 
						|
"""
 | 
						|
	This function is designed to trigger learning for anomaly detection
 | 
						|
	@TODO: forward this to a socket i.e non-blocking socket
 | 
						|
"""
 | 
						|
@app.route('/anomalies/get')
 | 
						|
def learn():
 | 
						|
	global CONFIG
 | 
						|
	p = CONFIG['store']['args']
 | 
						|
	class_read = CONFIG['store']['class']['read']	
 | 
						|
	gReader = factory.instance(type=class_read,args=p)
 | 
						|
	d =  gReader.read()
 | 
						|
	
 | 
						|
	if 'learn' in d :
 | 
						|
		info = d['learn']
 | 
						|
		
 | 
						|
		del d['learn']
 | 
						|
	else :
 | 
						|
		info = []
 | 
						|
	r = []
 | 
						|
	if 'id' in request.args:
 | 
						|
		id = request.args['id']
 | 
						|
		d = d[id]
 | 
						|
		params = {}
 | 
						|
		for item in info:
 | 
						|
			
 | 
						|
			label = item['label']
 | 
						|
			params[label] = item
 | 
						|
		
 | 
						|
		#apps = list(set(ML.Extract(['label'],d)))
 | 
						|
		r = []
 | 
						|
		if params :
 | 
						|
			#
 | 
						|
			# If we have parameters available 
 | 
						|
			p = AnomalyDetection()
 | 
						|
			apps = params.keys()			
 | 
						|
			for name in apps :
 | 
						|
				if name not in params:
 | 
						|
					continue
 | 
						|
				_info	= params[name]		
 | 
						|
				try:
 | 
						|
					xo 	= ML.Filter('label',name,d)	
 | 
						|
				except Exception,e:
 | 
						|
					xo = []
 | 
						|
					#print name,e
 | 
						|
				if len(xo) == 0:					
 | 
						|
					continue	
 | 
						|
				xo 	= [xo[ len(xo) -1]]
 | 
						|
				
 | 
						|
				value	= p.predict(xo,_info)[0]
 | 
						|
				
 | 
						|
				if len(value):
 | 
						|
					report = dict(_info,**{'predicton':value})
 | 
						|
					r.append(report)
 | 
						|
				
 | 
						|
				
 | 
						|
				
 | 
						|
			#print app,value
 | 
						|
			#if value is not None:
 | 
						|
			#	r.append(value)
 | 
						|
	
 | 
						|
	return json.dumps(r)
 | 
						|
		
 | 
						|
 | 
						|
"""
 | 
						|
	This function returns anomalies for a given context or group of processes
 | 
						|
	The information returned is around precision/recall and f-score and parameters
 | 
						|
"""
 | 
						|
@app.route('/anomalies/status')
 | 
						|
def anomalies_status():
 | 
						|
	global CONFIG
 | 
						|
	p = CONFIG['store']['args']
 | 
						|
	class_read = CONFIG['store']['class']['read']	
 | 
						|
	gReader = factory.instance(type=class_read,args=p)
 | 
						|
	d =  gReader.read()
 | 
						|
	if 'learn' in d :
 | 
						|
		info = d['learn']
 | 
						|
		
 | 
						|
		del d['learn']
 | 
						|
	else :
 | 
						|
		info = []
 | 
						|
	print info
 | 
						|
	r = []
 | 
						|
	if 'id' in request.args:
 | 
						|
		id = request.args['id']
 | 
						|
		r = info
 | 
						|
	return json.dumps(r)
 | 
						|
@app.route('/folders')
 | 
						|
def get_folders():
 | 
						|
	global CONFIG
 | 
						|
	p = CONFIG['store']['args']
 | 
						|
	class_read = CONFIG['store']['class']['read']	
 | 
						|
	gReader = factory.instance(type=class_read,args=p)
 | 
						|
	d =  gReader.read()
 | 
						|
	if 'folders' in d:
 | 
						|
		d = d['folders']
 | 
						|
		hosts = set([row[0]['id'] for row in d])
 | 
						|
		m = {}
 | 
						|
		for id in hosts:
 | 
						|
			for row in d:				
 | 
						|
				if id == row[0]['id'] :
 | 
						|
					m[id] = row
 | 
						|
		d = m.values()
 | 
						|
		for row in d:
 | 
						|
			print row[0]['id']
 | 
						|
		# index = len(d) - 1
 | 
						|
		# d = d[index]
 | 
						|
		
 | 
						|
			
 | 
						|
		# m = {}
 | 
						|
		# for row in d :
 | 
						|
			
 | 
						|
		# 	key = row.keys()[0]
 | 
						|
 | 
						|
		# 	row = row[key]
 | 
						|
		# 	if key not in m:
 | 
						|
		# 		r.append(row)
 | 
						|
		# 		m[key] = len(r) -1
 | 
						|
		# 	else:
 | 
						|
		# 		index = m[key]
 | 
						|
		# 		r[index] = row
 | 
						|
		# d = r
 | 
						|
	else:
 | 
						|
		d = []
 | 
						|
	
 | 
						|
	return json.dumps(d)
 | 
						|
	
 | 
						|
if __name__== '__main__':
 | 
						|
	
 | 
						|
#	ThreadManager.start(CONFIG)	
 | 
						|
	if 'port' not in SYS_ARGS.PARAMS :
 | 
						|
		SYS_ARGS.PARAMS['port'] = 5000
 | 
						|
	PORT = int(SYS_ARGS.PARAMS['port'])
 | 
						|
	app.run(host='0.0.0.0',port=PORT,debug=True,threaded=True)
 | 
						|
 | 
						|
	
 |