import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
class MailAgent :
def __init__(self,conf) :
self.uid = conf['uid']
self.handler = smtplib.SMTP_SSL(conf['host'],conf['port'])
r = self.handler.login(self.uid,conf['password'])
# @TODO: Check the status of the authentication
# If not authenticated the preconditions have failed
except Exception,e:
print e
self.handler = None
def send(self,**args) :
subject = args['subject']
message = args['message']
to = args['to']
if '<' in message and '>' in message :
message = MIMEText(message,'html')
message = MIMEText(message,'plain')
message['From'] = self.uid
message['To'] = to
message['Subject'] = subject
return self.handler.sendmail(self.uid,to,message.as_string())
def close(self):

This file is intended to perfom certain machine learning tasks based on numpy
We are trying to keep it lean that's why no sklearn involved yet
Create factory method for the learners implemented here
Improve preconditions (size of the dataset, labels)
from __future__ import division
import numpy as np
class ML:
def Filter (attr,value,data) :
# @TODO: Make sure this approach works across all transport classes
# We may have a potential issue of how the data is stored ... it may not scale
value = ML.CleanupName(value)
#return [item[0] for item in data if item and attr in item[0] and item[0][attr] == value]
#return [[item for item in row if item[attr] == value][0] for row in data]
# We are making the filtering more rescillient, i.e if an item doesn't exist we don't have to throw an exception
# This is why we expanded the loops ... fully expressive but rescilient
r = []
for row in data :
if isinstance(row,list) :
for item in row :
if attr in item and item[attr] == value:
# We are dealing with a vector of objects
if attr in row and row[attr] == value:
return r
def Extract(lattr,data):
if isinstance(lattr,basestring):
lattr = [lattr]
# return [[row[id] for id in lattr] for row in data]
r = [[row[id] for id in lattr] for row in data]
if len(lattr) == 1 :
return [x[0] for x in r]
return r
def CleanupName(value) :
return value.replace('$','').replace('.+','')
def distribution(xo,lock,scale=False) :
d = []
m = {}
if scale :
xu = np.mean(xo)
sd = np.sqrt(np.var(xo))
for xi in xo :
value = round(xi,2)
if scale :
value = round((value - xu)/sd,2)
id = str(value)
if id in m :
index = m[id]
d[index][1] += 1
m[id] = len(d)
del m
return d
Implements a multivariate anomaly detection
@TODO: determine computationally determine epsilon
class AnomalyDetection:
def __init__(self):
def split(self,data,index=-1,threshold=0.65) :
N = len(data)
# if N < LIMIT:
# return None
end = int(N*threshold)
train = data[:end]
test = data[end:]
return {"train":train,"test":test}
@param key field name by which the data will be filtered
@param value field value for the filter
@param features features to be used in the analysis
@param labels used to assess performance
@TODO: Map/Reduce does a good job at filtering
def learn(self,data,key,value,features,label):
if len(data) < 10:
return None
xo = ML.Filter(key,value,data)
if len(xo) < 10 :
return None
# attr = conf['features']
# label= conf['label']
yo= ML.Extract([label['name']],xo)
xo = ML.Extract(features,xo)
yo = self.getLabel(yo,label)
# @TODO: Insure this can be finetuned, training size matters for learning. It's not obvious to define upfront
xo = self.split(xo)
yo = self.split(yo)
p = self.gParameters(xo['train'])
has_cov = np.linalg.det(p['cov']) if p else False #-- making sure the matrix is invertible
if xo['train'] and has_cov :
E = 0.001
fscore = 0
# We need to find an appropriate epsilon for the predictions
# The appropriate epsilon is one that yields an f-score [0.5,1[
__operf__ = None
perf = None
for i in range(0,10):
Epsilon = E + (2*E*i)
if p is None :
return None
# At this point we've got enough data for the parameters
# We should try to fine tune epsilon for better results
px = self.gPx(p['mean'],p['cov'],xo['test'],Epsilon)
__operf__ = self.gPerformance(px,yo['test'])
print value,__operf__
if __operf__['fscore'] == 1 :
if perf is None :
perf = __operf__
elif perf['fscore'] < __operf__['fscore'] and __operf__['fscore'] > ACCEPTABLE_FSCORE :
perf = __operf__
perf['epsilon'] = Epsilon
# At this point we are assuming we came out of the whole thing with an acceptable performance
# The understanding is that error drives performance thus we reject fscore==1
if perf and perf['fscore'] > ACCEPTABLE_FSCORE :
return {"label":value,"parameters":p,"performance":perf}
return None
return None
This function determines if the preconditions for learning are met
For that parameters are passed to the function
def canLearn(self,p) :
def getLabel(self,yo,label_conf):
return [ int(len(set(item) & set(label_conf["1"]))>0) for item in yo ]
This function will compute the probability density function given a particular event/set of events
The return value is [px,yo]
@pre xu.shape[0] == sigma[0] == sigma[1]
def gPx(self,xu,sigma,data,EPSILON=0.01):
n = len(data[0])
r = []
a = (2*(np.pi)**(n/2))*np.linalg.det(sigma)**0.5
# EPSILON = np.float64(EPSILON)
test = np.array(data)
for row in test:
row = np.array(row)
d = np.matrix(row - xu)
d.shape = (n,1)
b = np.exp((-0.5*np.transpose(d)) * (np.linalg.inv(sigma)*d))
px = float(b/a)
r.append([px,int(px < EPSILON)])
return r
This function uses stored learnt information to predict on raw data
In this case it will determin if we have an anomaly or not
@param xo raw observations (matrix)
@param info stored information about this
def predict(self,xo,info):
xo = ML.Extract(info['features'],xo)
if not xo :
return None
sigma = info['parameters']['cov']
xu = info['parameters']['mean']
epsilon = info['performance']['epsilon']
return self.gPx(xu,sigma,xo,epsilon)
This function computes performance metrics i.e precision, recall and f-score
for details visit https://en.wikipedia.org/wiki/Precision_and_recall
def gPerformance(self,test,labels) :
N = len(test)
tp = 0 # true positive
fp = 0 # false positive
fn = 0 # false negative
tn = 0 # true negative
for i in range(0,N):
tp += 1 if (test[i][1]==labels[i] and test[i][1] == 1) else 0
fp += 1 if (test[i][1] != labels[i] and test[i][1] == 1) else 0
fn += 1 if (test[i][1] != labels[i] and test[i][1] == 0) else 0
tn += 1 if (test[i][1] == labels[i] and test[i][1] == 0) else 0
precision = tp /( (tp + fp) if tp + fp > 0 else 1)
recall = tp / ((tp + fn) if tp + fn > 0 else 1)
fscore = (2 * precision * recall)/ ((precision + recall) if (precision + recall) > 0 else 1)
return {"precision":precision,"recall":recall,"fscore":fscore}
This function returns gaussian parameters i.e means and covariance
The information will be used to compute probabilities
def gParameters(self,train) :
n = len(train[0])
m = np.transpose(np.array(train))
u = np.array([ np.mean(m[i][:]) for i in range(0,n)])
if np.sum(u) == 0:
return None
r = np.array([ np.sqrt(np.var(m[i,:])) for i in range(0,n)])
# Before we normalize the data we must insure there's is some level of movement in this application
# A lack of movement suggests we may not bave enough information to do anything
if 0 in r :
return None
#-- Normalizing the matrix then we will compute covariance matrix
m = np.array([ (m[i,:] - u[i])/r[i] for i in range(0,n)])
sigma = np.cov(m)
sigma = [ list(row) for row in sigma]
return {"cov":sigma,"mean":list(u)}
class AnalyzeAnomaly(AnomalyDetection):
def __init__(self):
This analysis function will include a predicted status because an anomaly can either be
- A downtime i.e end of day
- A spike and thus a potential imminent crash
@param xo matrix of variables
@param info information about what was learnt
def predict(self,xo,info):
x = xo[len(xo)-1]
r = AnomalyDetection.predict(self,[x],info)
# In order to determine what the anomaly is we compute the slope (idle or crash)
# The slope is computed using the covariance / variance of features
if r is not None:
N = len(info['features'])
xy = ML.Extract(info['features'],xo)
xy = np.array(xy)
vxy= np.array([ np.var(xy[:,i]) for i in range(0,N)])
if np.sum(vxy) == 0:
vxy = cxy
alpha = cxy/vxy
r = {"anomaly":r[0][1],"slope":list(alpha[:,0])}
return r
class Regression:
parameters = {}
def predict(xo):
def __init__(self,config):

#import multiprocessing
from threading import Thread, RLock
#from utils import transport
from utils.transport import *
from utils.ml import AnomalyDetection,ML
import time
import monitor
import sys
import os
import datetime
class BasicWorker(Thread):
def __init__(self,config,lock):
self.reader_class = config['store']['class']['read']
self.write_class = config['store']['class']['write']
self.rw_args = config['store']['args']
self.factory = DataSourceFactory()
self.lock = lock
This class is intended to collect data given a configuration
class Top(Thread):
def __init__(self,_config,lock):
self.lock = lock
self.reader_class = _config['store']['class']['read']
self.write_class = _config['store']['class']['write']
self.rw_args = _config['store']['args']
self.factory = DataSourceFactory()
self.name = 'Zulu-Top'
self.quit = False
className = ''.join(['monitor.',_config['monitor']['processes']['class'],'()'])
self.handler = eval(className)
self.config = _config['monitor']['processes']['config']
def stop(self):
self.quit = True
def run(self):
while self.quit == False:
print ' ** ',self.name,datetime.datetime.today()
for label in self.config :
gwriter = self.factory.instance(type=self.write_class,args=self.rw_args)
apps = self.config[label]
r = self.handler.composite()
if 'MONITOR_CONFIG_PATH' in os.environ:
# This suggests we are in development mode
print "Exiting ",self.name
class Learner(Thread) :
This function expects paltform config (store,learner)
It will leverage store and learner in order to operate
def __init__(self,config,lock):
self.name = 'Zulu-Learner'
self.lock = lock
self.reader_class = config['store']['class']['read']
self.write_class = config['store']['class']['write']
self.rw_args = config['store']['args']
self.features = config['learner']['anomalies']['features']
self.yo = config['learner']['anomalies']['label']
self.apps = config['learner']['anomalies']['apps']
self.factory = DataSourceFactory()
self.quit = False
def stop(self):
self.quit = True
This function will initiate learning every (x-hour)
If there is nothing to learn the app will simply go to sleep
def run(self):
reader = self.factory.instance(type=self.reader_class,args=self.rw_args)
data = reader.read()
# Let's make sure we extract that which has aleady been learnt
if 'learn' in data:
r = data['learn']
del data['learn']
r = ML.Extract('label',r)
logs = [row[0] for row in r]
logs = list(set(logs))
logs = []
# In order to address the inefficiencies below, we chose to adopt the following policy
# We don't learn that which is already learnt, This measure consists in filtering out the list of the apps that already have learning data
self.apps = list(set(self.apps) - set(logs))
while self.quit == False:
r = {}
lapps = list(self.apps)
print ' ** ',self.name,datetime.datetime.today()
for key in data :
logs = data[key]
# There poor design at this point, we need to make sure things tested don't get tested again
# This creates innefficiencies (cartesian product)
for app in lapps:
handler = AnomalyDetection()
value = handler.learn(logs,'label',app,self.features,self.yo)
if value is not None:
if key not in r:
r[key] = {}
r[key][app] = value
i = lapps.index(app)
del lapps[i]
# This offers a clean write to the data store upon value retrieved
# The removal of the application enables us to improve efficiency (among other things)
value = dict(value,**{"features":self.features})
writer = self.factory.instance(type=self.write_class,args=self.rw_args)
# Usually this is used for development
# @TODO : Remove this and find a healthy way to stop the server
if 'MONITOR_CONFIG_PATH' in os.environ:
# This suggests we are in development mode
TIME_ELLAPSED = 60*120 #-- Every 2 hours
print "Exiting ",self.name
class FileWatchWorker(BasicWorker):
def __init__(self,config,lock):
self.name = "Zulu-FileWatch"
self.config = config ;
self.folder_config = config['monitor']['folders']['config']
self.quit = False
def stop(self):
self.quit = True
def run(self):
TIME_ELAPSED = 60 * 10
handler = monitor.FileWatch()
ml_handler = ML()
while self.quit == False :
r = []
print ' ** ',self.name,datetime.datetime.today()
for id in self.folder_config :
folders = self.folder_config [id]
xo = handler.composite()
# We should perform a distribution analysis of the details in order to have usable data
xrow = {}
xrow[id] = []
for xo_row in xo:
xo_age = [row['age'] for row in xo_row['details']]
xo_size= [row['size'] for row in xo_row['details']]
xo_row['details'] = {"age":ML.distribution(xo_age,self.lock),"size":ML.distribution(xo_size,self.lock)}
xo_row['id'] = id
# Now we can save the file
writer = self.factory.instance(type=self.write_class,args=self.rw_args)
if 'MONITOR_CONFIG_PATH' in os.environ:
# This suggests we are in development mode
print 'Exiting ',self.name
This class is a singleton designed to start quit dependent threads
* monitor is designed to act as a data collection agent
* learner is designed to be a learner i.e machine learning model(s)
- How to move them to processes that can be read by the os (that would allow us to eat our own dog-food)
- Additionally we also need to have a pruning thread, to control the volume of data we have to deal with.This instills the "will to live" in the application
class ThreadManager:
Pool = {}
def start(config):
lock = RLock()
ThreadManager.Pool['monitor'] = Top(config,lock)
ThreadManager.Pool['learner'] = Learner(config,lock)
if 'folders' not in config :
ThreadManager.Pool['file-watch'] = FileWatchWorker(config,lock)
for id in ThreadManager.Pool :
thread = ThreadManager.Pool[id]
def stop():
for id in ThreadManager.Pool :
thread = ThreadManager.Pool[id]
def status():
r = {}
for id in ThreadManager.Pool :
thread = ThreadManager.Pool[id]
r[id] = thread.isAlive()
class Factory :
This function will return an instance of an object in the specified in the configuration file
def instance(id,config):
if id in config['monitor'] :
className = config['monitor'][id]['class']
ref = "".join(["monitor.",className,"()"])
ref = eval(ref)
return {"class":ref,"config":config['monitor'][id]["config"]}
return None
if __name__ =='__main__' :
import utils.params as SYS_ARGS
import json
f = open(PARAMS['path'])
CONFIG = json.loads(f.read())