master
Steve L. Nyemba 8 years ago
parent eead099963
commit 3f8f975528

@ -196,7 +196,7 @@ def anomalies_status():
if __name__== '__main__': if __name__== '__main__':
#ThreadManager.start(CONFIG) ThreadManager.start(CONFIG)
app.run(host='0.0.0.0',debug=True,threaded=True) app.run(host='0.0.0.0',debug=True,threaded=True)

@ -86,7 +86,7 @@ monitor.processes.render = function(label,data) {
{ name: 'label', type: 'text', title: "Process", headercss: "small bold", css: "small"}, { name: 'label', type: 'text', title: "Process", headercss: "small bold", css: "small"},
{ name: "cpu_usage", type: "number", title: "CPU", headercss: "small bold" , width:'64px'}, { name: "cpu_usage", type: "number", title: "CPU", headercss: "small bold" , width:'64px'},
{ name: "memory_usage", type: "text", title: "Mem. Used", type: "number", headercss: "small bold" }, { name: "memory_usage", type: "text", title: "Mem. Used", type: "number", headercss: "small bold" },
{ name: "memory_available", type: "number", title: "Mem. Avail", headercss: "small bold" }, { name: "proc_count", type: "number", title: "Proc Count", headercss: "small bold" },
{name:"status",type:"text",title:"Status",headercss:"small bold",align:"center", width:'64px'} {name:"status",type:"text",title:"Status",headercss:"small bold",align:"center", width:'64px'}
] ]
var grid = $('#latest_processes').jsGrid(options) ; var grid = $('#latest_processes').jsGrid(options) ;
@ -127,7 +127,10 @@ monitor.processes.trend.render = function (logs, key,label) {
conf.data = {} conf.data = {}
conf.options = { legend: { position: 'bottom' } } conf.options = { legend: { position: 'bottom' } }
conf.options.scales = {} conf.options.scales = {}
conf.options.scales.yAxes = [ {scaleLabel:{display:true,labelString:'CPU & MEMORY USAGE'},ticks:{min:0,beginAtZero:true},gridLines: {display:false}}] conf.options.scales.yAxes = [
{id:'0',scaleLabel:{display:true,labelString:'CPU & MEMORY USAGE'},ticks:{min:0,beginAtZero:true},gridLines: {display:false}},
{id:'1',position:'right',scaleLabel:{display:true,labelString:'PROCESS COUNT'},ticks:{min:0,stepSize:1,beginAtZero:true},gridLines: {display:false}}
]
conf.options.scales.xAxes = [ conf.options.scales.xAxes = [
{ {
@ -145,8 +148,9 @@ monitor.processes.trend.render = function (logs, key,label) {
var x_axis = [] var x_axis = []
var _x = {} var _x = {}
// var _y = {} // var _y = {}
var cpu = { label: 'CPU Usage (%)', data: [] ,backgroundColor:'transparent',borderColor:COLORS[187],fill:false,borderWidth:1} var cpu = {yAxisID:'0', label: 'CPU Usage (%)', data: [] ,backgroundColor:'transparent',borderColor:COLORS[187],fill:false,borderWidth:1}
var mem = {label : 'Memory Usage(%)',data:[],backgroundColor:'transparent',borderColor:COLORS[32],fill:false,borderWidth:1} var mem = {yAxisID:'0',label : 'Memory Usage(%)',data:[],backgroundColor:'transparent',borderColor:COLORS[32],fill:false,borderWidth:1}
var proc= {yAxisID:'1',label : 'Proc Count',data:[],backgroundColor:'transparent',borderColor:COLORS[42],fill:false,borderWidth:1}
jx.utils.patterns.visitor(logs,function(item){ jx.utils.patterns.visitor(logs,function(item){
x = new Date(item.year,item.month-1,item.day,item.hour,item.minute) x = new Date(item.year,item.month-1,item.day,item.hour,item.minute)
y = item[key] y = item[key]
@ -156,6 +160,7 @@ monitor.processes.trend.render = function (logs, key,label) {
x_axis.push(x) x_axis.push(x)
cpu.data.push({ x: x, y: item.cpu_usage }) cpu.data.push({ x: x, y: item.cpu_usage })
mem.data.push({x:x,y:item.memory_usage}) mem.data.push({x:x,y:item.memory_usage})
proc.data.push({x:x,y:item.proc_count})
// return {x:x,y:y} // return {x:x,y:y}
} }
@ -164,7 +169,7 @@ monitor.processes.trend.render = function (logs, key,label) {
conf.data.datasets = [cpu,mem] conf.data.datasets = [cpu,mem,proc]
x_axis = jx.utils.unique(x_axis) x_axis = jx.utils.unique(x_axis)
conf.data.labels = x_axis conf.data.labels = x_axis
// console.log(conf) // console.log(conf)

@ -146,17 +146,17 @@ class DetailProcess(Analysis):
pattern = "(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)".replace(":name",name).strip() pattern = "(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)".replace(":name",name).strip()
g = re.match(pattern,stream.strip()) g = re.match(pattern,stream.strip())
if g : if g :
return list(g.groups())+[name] return list(g.groups())+['1']+[name]
else: else:
return '' return ''
def evaluate(self,name) : def evaluate(self,name) :
cmd = "ps -eo pmem,pcpu,vsize,comm|grep -E \":app\"" cmd = "ps -eo pmem,pcpu,vsize,command|grep -E \":app\""
handler = subprocess.Popen(cmd.replace(":app",name),shell=True,stdout=subprocess.PIPE) handler = subprocess.Popen(cmd.replace(":app",name),shell=True,stdout=subprocess.PIPE)
ostream = handler.communicate()[0].split('\n') ostream = handler.communicate()[0].split('\n')
#xstr = ostream
ostream = [ self.split(name,row) for row in ostream if row != ''] ostream = [ self.split(name,row) for row in ostream if row != '' and 'grep' not in row]
if len(ostream) == 0 or len(ostream[0]) < 4 : if len(ostream) == 0 or len(ostream[0]) < 4 :
ostream = [['0','0','0',name]] ostream = [['0','0','0','0',name]]
r = [] r = []
for row in ostream : for row in ostream :
# #
@ -164,8 +164,26 @@ class DetailProcess(Analysis):
# On OSX it has been observed that the fully qualified path is sometimes returned (go figure) # On OSX it has been observed that the fully qualified path is sometimes returned (go figure)
# #
row = [float(value) for value in row if value.strip() != '' and name not in value ] +[re.sub('\$|^','',name)] row = [float(value) for value in row if value.strip() != '' and name not in value ] +[re.sub('\$|^','',name)]
r.append(row) r.append(row)
#
# At this point we should aggregate results
# The aggregation is intended for applications with several processes (e.g: apache2)
#
if len(r) > 1:
m = None
for row in r:
if m is None:
m = row
else:
m[3] += row[3]
m[0] += row[0]
m[1] += row[1]
m[2] += row[2]
m[0] = round((m[0] / m[3]),2)
m[1] = round((m[1] / m[3]),2)
m[2] = round((m[2] / m[3]),2)
r = [m]
return r return r
def status(self,row): def status(self,row):
x = row['memory_usage'] x = row['memory_usage']
@ -178,7 +196,7 @@ class DetailProcess(Analysis):
else: else:
return "crash" return "crash"
def format(self,row): def format(self,row):
r= {"memory_usage":row[0],"cpu_usage":row[1],"memory_available":row[2]/1000,"label":row[3]} r= {"memory_usage":row[0],"cpu_usage":row[1],"memory_available":row[2]/1000,"proc_count":row[3],"label":row[4]}
status = self.status(r) status = self.status(r)
r['status'] = status r['status'] = status
return r return r
@ -201,6 +219,31 @@ class DetailProcess(Analysis):
#return [{"memory_usage":row[0],"cpu_usage":row[1],"memory_available":row[2]/1000,"label":row[3]} for row in ma] #return [{"memory_usage":row[0],"cpu_usage":row[1],"memory_available":row[2]/1000,"label":row[3]} for row in ma]
return ma return ma
class FileWatch(Analysis):
def __init__(self,conf):
pass
def split(self,row):
x = row.split(' ')
r = {}
months = ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
if 'K' in x[0]:
size = x[0].replace('K','').replace('KB','') / 1000
elif 'MB' in x[0] :
size = x[0].replace('MB','')
elif 'GB' in x[0] :
size = x[0].replace('GB','') * 1000
month = months.index(m[1]) + 1
day = x[2]
hour,minute = x[3].split(':')
year = x[4]
return {"size":size,"age":age}
def evaluate(self,path):
cmd = "find :path|xargs ls -lh |awk '{print $5,$6,$7,$8,$9}'".replace(":path",path)
handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
ostream = handler.communicate()[0].split('\n')
[self.split(stream) for stream in ostream if stream.strip() != '']
pass
class Monitor (Thread): class Monitor (Thread):
def __init__(self,pConfig,pWriter,id='processes') : def __init__(self,pConfig,pWriter,id='processes') :
@ -232,8 +275,8 @@ class Monitor (Thread):
lock.release() lock.release()
self.prune() self.prune()
HALF_HOUR = 60*25 TIME_LAPSE = 60*2
time.sleep(HALF_HOUR) time.sleep(TIME_LAPSE)
print "Stopped ..." print "Stopped ..."
def prune(self) : def prune(self) :

@ -17,7 +17,8 @@ class ML:
# We may have a potential issue of how the data is stored ... it may not scale # We may have a potential issue of how the data is stored ... it may not scale
# #
return [item[0] for item in data if item and attr in item[0] and item[0][attr] == value] #return [item[0] for item in data if item and attr in item[0] and item[0][attr] == value]
return [[item for item in row if item[attr] == value] for row in data]
@staticmethod @staticmethod
def Extract(lattr,data): def Extract(lattr,data):
if isinstance(lattr,basestring): if isinstance(lattr,basestring):
@ -41,6 +42,7 @@ class AnomalyDetection:
test = data[end:] test = data[end:]
return {"train":train,"test":test} return {"train":train,"test":test}
""" """
@param key field name by which the data will be filtered @param key field name by which the data will be filtered
@ -51,8 +53,9 @@ class AnomalyDetection:
""" """
def learn(self,data,key,value,features,label): def learn(self,data,key,value,features,label):
xo = ML.Filter(key,value,data) xo = ML.Filter(key,value,data)
print key,value, len(xo)
if not xo : if not xo or len(xo) < 100:
return None return None
#if len(xo) < 100 : #if len(xo) < 100 :
@ -69,19 +72,21 @@ class AnomalyDetection:
if xo['train'] : if xo['train'] :
E = 0.01 E = 0.01
fscore = 0
for i in range(0,10): for i in range(0,10):
Epsilon = E + (2*E*i) Epsilon = E + (2*E*i)
p = self.gParameters(xo['train']) p = self.gParameters(xo['train'])
if p is None :
return None
px = self.gPx(p['mean'],p['cov'],xo['test'],Epsilon) px = self.gPx(p['mean'],p['cov'],xo['test'],Epsilon)
perf = self.gPerformance(px,yo['test']) perf = self.gPerformance(px,yo['test'])
if perf['fscore'] > 0 : if fscore == 0 :
fscore = perf['fscore']
elif perf['fscore'] > fscore and perf['fscore'] > 0.5 :
perf['epsilon'] = Epsilon perf['epsilon'] = Epsilon
break
return {"label":value,"parameters":p,"performance":perf} return {"label":value,"parameters":p,"performance":perf}
return None return None
def getLabel(self,yo,label_conf): def getLabel(self,yo,label_conf):
@ -157,6 +162,8 @@ class AnomalyDetection:
m = np.transpose(np.array(train)) m = np.transpose(np.array(train))
u = np.array([ np.mean(m[i][:]) for i in range(0,n)]) u = np.array([ np.mean(m[i][:]) for i in range(0,n)])
if np.sum(u) == 0:
return None
r = np.array([ np.sqrt(np.var(m[i,:])) for i in range(0,n)]) r = np.array([ np.sqrt(np.var(m[i,:])) for i in range(0,n)])
# #
#-- Normalizing the matrix then we will compute covariance matrix #-- Normalizing the matrix then we will compute covariance matrix

@ -251,7 +251,6 @@ class MessageQueue:
self.close() self.close()
return resp return resp
def close(self): def close(self):
print "closing ..."
self.channel.close() self.channel.close()
self.connection.close() self.connection.close()
""" """

@ -27,8 +27,10 @@ class TestMonitorServer(unittest.TestCase):
self.assertTrue(p.evaluate('PATH') == 0) self.assertTrue(p.evaluate('PATH') == 0)
def test_RunningProcess(self): def test_RunningProcess(self):
p = DetailProcess() p = DetailProcess()
p.init(['kate','firefox']) #['rabbitmq-server','python','apache2','firefox']) p.init(['kate','rabbitmq-server','python','apache2','firefox'])
r = p.composite() r = p.composite()
for row in r:
print row['label'],row['status'],row['proc_count']
self.assertTrue(r) self.assertTrue(r)
def test_ProcessCount(self): def test_ProcessCount(self):

Loading…
Cancel
Save