You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
smart-top/src/monitor.py

288 lines
8.1 KiB
Python

"""
This program is designed to inspect an application environment
This program should only be run on unix friendly systems
We enable the engines to be able to run a several configurations
Similarly to what a visitor design-pattern would do
"""
from __future__ import division
import os
import subprocess
from sets import Set
import re
import datetime
import Queue
from threading import Thread, RLock
import time
class Analysis:
def __init__(self):
self.logs = []
pass
def post(self,object):
self.logs.append(object)
def init(self):
d = datetime.datetime.now()
self.now = {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute}
def getNow(self):
d = datetime.datetime.now()
return {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute}
"""
This class is designed to analyze environment variables. Environment variables can either be folders, files or simple values
The class returns a quantifiable assessment of the environment variables (expected 100%)
"""
class Env(Analysis):
def __init__(self):
Analysis.__init__(self)
def init(self,values):
#Analysis.init(self)
self.values = values
"""
This function evaluate the validity of an environment variable by returning a 1 or 0 (computable)
The function will use propositional logic (https://en.wikipedia.org/wiki/Propositional_calculus)
"""
def evaluate(self,id):
if id in os.environ :
#
# We can inspect to make sure the environment variable is not a path or filename.
# Using propositional logic we proceed as follows:
# - (p) We determine if the value is an folder or file name (using regex)
# - (q) In case of a file or folder we check for existance
# The final result is a conjuction of p and q
#
value = os.environ[id]
expressions = [os.sep,'(\\.\w+)$']
p = sum([ re.search(xchar,value) is not None for xchar in expressions])
q = os.path.exists(value)
return int(p and q)
else:
return 0
def composite (self):
#Analysis.init(self)
r = [ self.evaluate(id) for id in self.values] ;
N = len(r)
n = sum(r)
value = 100 * round(n/N,2)
print '*** ',value
missing = [self.values[i] for i in range(0,N) if r[i] == 0]
return dict(self.getNow(),**{"value":value,"missing":missing})
class Sandbox(Analysis):
def __init__(self):
Analysis.__init__(self)
def init(self,conf):
#Analysis.init(self)
self.sandbox_path = conf['sandbox']
self.requirements_path = conf['requirements']
def get_requirements (self):
f = open(self.requirements_path)
return [ name.replace('-',' ').replace('_',' ') for name in f.read().split('\n') if name != '']
"""
This function will return the modules installed in the sandbox (virtual environment)
"""
def get_sandbox_requirements(self):
cmd = ['freeze']
xchar = ''.join([os.sep]*2)
pip_vm = ''.join([self.sandbox_path,os.sep,'bin',os.sep,'pip']).replace(xchar,os.sep)
cmd = [pip_vm]+cmd
r = subprocess.check_output(cmd).split('\n')
return [row.replace('-',' ').replace('_',' ') for row in r if row.strip() != '']
def evaluate(self):
pass
"""
This function returns the ratio of existing modules relative to the ones expected
"""
def composite(self):
Analysis.init(self)
required_modules= self.get_requirements()
sandbox_modules = self.get_sandbox_requirements()
N = len(required_modules)
n = len(Set(required_modules) - Set(sandbox_modules))
value = round(1 - (n/N),2)*100
missing = list(Set(required_modules) - Set(sandbox_modules))
return dict(self.getNow(),**{"value":value,"missing":missing})
"""
This class performs the analysis of a list of processes and determines
The class provides a quantifiable measure of how many processes it found over all
"""
class ProcessCounter(Analysis):
def __init__(self):
Analysis.__init__(self)
def init(self,names):
#Analysis.init(self)
self.names = names
def evaluate(self,name):
cmd = "".join(['ps -eo comm |grep ',name,' |wc -l'])
handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
return int(handler.communicate()[0].replace("\n","") )
def composite(self):
#Analysis.init(self)
r = {}
for name in self.names :
r[name] = self.evaluate(name)
#N = len(r)
#n = sum(r)
#return n/N
return dict(self.getNow(),**r)
"""
This class returns an application's both memory and cpu usage
"""
class DetailProcess(Analysis):
def __init__(self):
Analysis.__init__(self)
def init (self,names):
#Analysis.init(self)
self.names = names;
def split(self,name,stream):
pattern = "(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)".replace(":name",name).strip()
g = re.match(pattern,stream.strip())
if g :
return list(g.groups())+[name]
else:
return ''
def evaluate(self,name) :
cmd = "ps -eo pmem,pcpu,vsize,comm|grep -E \":app\""
handler = subprocess.Popen(cmd.replace(":app",name),shell=True,stdout=subprocess.PIPE)
ostream = handler.communicate()[0].split('\n')
ostream = [ self.split(name,row) for row in ostream if row != '']
if len(ostream) == 0 or len(ostream[0]) < 4 :
ostream = [['0','0','0',name]]
r = []
for row in ostream :
#
# Though the comm should only return the name as specified,
# On OSX it has been observed that the fully qualified path is sometimes returned (go figure)
#
row = [float(value) for value in row if value.strip() != '' and name not in value ] +[re.sub('\$|^','',name)]
r.append(row)
return r
def status(self,row):
x = row['memory_usage']
y = row['cpu_usage']
z = row['memory_available']
if z :
if y :
return "running"
return "idle"
else:
return "crash"
def format(self,row):
r= {"memory_usage":row[0],"cpu_usage":row[1],"memory_available":row[2]/1000,"label":row[3]}
status = self.status(r)
r['status'] = status
return r
#return dict(self.getNow(),**r)
def composite(self):
#Analysis.init(self)
#value = self.evaluate(self.name)
#row= {"memory_usage":value[0],"cpu_usage":value[1]}
#return row
#ma = [self.evaluate(name) for name in self.names]
ma = []
now = self.getNow()
for name in self.names:
matrix = self.evaluate(name)
ma += [ dict(now, **self.format(row)) for row in matrix]
#return [{"memory_usage":row[0],"cpu_usage":row[1],"memory_available":row[2]/1000,"label":row[3]} for row in ma]
return ma
class Monitor (Thread):
def __init__(self,pConfig,pWriter,id='processes') :
Thread.__init__(self)
self.config = pConfig[id]
self.writer = pWriter;
self.logs = []
self.handler = self.config['class']
self.mconfig = self.config['config']
def stop(self):
self.keep_running = False
def run(self):
r = {}
self.keep_running = True
lock = RLock()
while self.keep_running:
for label in self.mconfig:
lock.acquire()
self.handler.init(self.mconfig[label])
r = self.handler.composite()
self.writer.write(label=label,row = r)
lock.release()
time.sleep(2)
self.prune()
HALF_HOUR = 60*25
time.sleep(HALF_HOUR)
print "Stopped ..."
def prune(self) :
MAX_ENTRIES = 100
if len(self.logs) > MAX_ENTRIES :
BEG = len(self.logs) - MAX_SIZE -1
self.logs = self.logs[BEG:]
class mapreducer:
def __init__(self):
self.store = {}
def filter (self,key,dataset):
return [row[key] for row in dataset if key in row]
def run(self,dataset,mapper,reducer):
r = None
if mapper is not None:
if isinstance(dataset,list) :
[mapper(row,self.emit) for row in dataset]
if reducer is not None:
r = self.store
# r = [reducer(self.store[key]) for key in self.store]
else:
r = self.store
return r
def mapper(self,row,emit):
[emit(_matrix['label'],_matrix) for _matrix in row ]
def reducer(self,values):
beg = len(values)-101 if len(values) > 100 else 0
return values[beg:]
def emit(self,key,content):
if key not in self.store:
self.store[key] = []
self.store[key].append(content)
# #
# # We need to generate the appropriate dataset here
# # map/reduce is a well documented technique for generating datasets
# #
# def map(self,key,id,rows):
# #r = [row[key] for row in rows if key in row]
# for row in rows:
# if key in row :
# for xr in row[key]:
# self.emit(xr['label'],xr)
# def reduce(keys,values):
# print values[0]
# return r