You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

307 lines
8.3 KiB
Python

"""
This file is intended to perfom certain machine learning tasks based on numpy
We are trying to keep it lean that's why no sklearn involved yet
@TODO:
Create factory method for the learners implemented here
Improve preconditions (size of the dataset, labels)
"""
from __future__ import division
import numpy as np
class ML:
@staticmethod
def Filter (attr,value,data) :
#
# @TODO: Make sure this approach works across all transport classes
# We may have a potential issue of how the data is stored ... it may not scale
#
value = ML.CleanupName(value)
#return [item[0] for item in data if item and attr in item[0] and item[0][attr] == value]
#return [[item for item in row if item[attr] == value][0] for row in data]
#
# We are making the filtering more rescillient, i.e if an item doesn't exist we don't have to throw an exception
# This is why we expanded the loops ... fully expressive but rescilient
#
r = []
for row in data :
if isinstance(row,list) :
for item in row :
if attr in item and item[attr] == value:
r.append(item)
else:
#
# We are dealing with a vector of objects
#
if attr in row and row[attr] == value:
r.append(row)
return r
@staticmethod
def Extract(lattr,data):
if isinstance(lattr,basestring):
lattr = [lattr]
return [[row[id] for id in lattr] for row in data]
@staticmethod
def CleanupName(value) :
return value.replace('$','').replace('.+','')
@staticmethod
def distribution(xo,lock,scale=False) :
d = []
m = {}
if scale :
xu = np.mean(xo)
sd = np.sqrt(np.var(xo))
for xi in xo :
value = round(xi,2)
if scale :
value = round((value - xu)/sd,2)
id = str(value)
lock.acquire()
if id in m :
index = m[id]
d[index][1] += 1
else:
m[id] = len(d)
d.append([value,1])
lock.release()
del m
return d
"""
Implements a multivariate anomaly detection
@TODO: determine computationally determine epsilon
"""
class AnomalyDetection:
def __init__(self):
pass
def split(self,data,index=-1,threshold=0.65) :
N = len(data)
# if N < LIMIT:
# return None
end = int(N*threshold)
train = data[:end]
test = data[end:]
return {"train":train,"test":test}
"""
@param key field name by which the data will be filtered
@param value field value for the filter
@param features features to be used in the analysis
@param labels used to assess performance
@TODO: Map/Reduce does a good job at filtering
"""
def learn(self,data,key,value,features,label):
if len(data) < 10:
return None
xo = ML.Filter(key,value,data)
if len(xo) < 10 :
return None
# attr = conf['features']
# label= conf['label']
yo= ML.Extract([label['name']],xo)
xo = ML.Extract(features,xo)
yo = self.getLabel(yo,label)
#
# @TODO: Insure this can be finetuned, training size matters for learning. It's not obvious to define upfront
#
xo = self.split(xo)
yo = self.split(yo)
p = self.gParameters(xo['train'])
has_cov = np.linalg.det(p['cov']) if p else False #-- making sure the matrix is invertible
if xo['train'] and has_cov :
E = 0.001
ACCEPTABLE_FSCORE = 0.6
fscore = 0
#
# We need to find an appropriate epsilon for the predictions
# The appropriate epsilon is one that yields an f-score [0.5,1[
#
__operf__ = None
perf = None
for i in range(0,10):
Epsilon = E + (2*E*i)
if p is None :
return None
#
# At this point we've got enough data for the parameters
# We should try to fine tune epsilon for better results
#
px = self.gPx(p['mean'],p['cov'],xo['test'],Epsilon)
__operf__ = self.gPerformance(px,yo['test'])
if __operf__['fscore'] == 1 :
continue
if perf is None :
perf = __operf__
elif perf['fscore'] < __operf__['fscore'] and __operf__['fscore'] > ACCEPTABLE_FSCORE :
perf = __operf__
perf['epsilon'] = Epsilon
#
# At this point we are assuming we came out of the whole thing with an acceptable performance
# The understanding is that error drives performance thus we reject fscore==1
#
if perf and perf['fscore'] > ACCEPTABLE_FSCORE :
return {"label":value,"parameters":p,"performance":perf}
else:
return None
return None
"""
This function determines if the preconditions for learning are met
For that parameters are passed to the function
p
"""
def canLearn(self,p) :
pass
def getLabel(self,yo,label_conf):
return [ int(len(set(item) & set(label_conf["1"]))>0) for item in yo ]
"""
This function will compute the probability density function given a particular event/set of events
The return value is [px,yo]
@pre xu.shape[0] == sigma[0] == sigma[1]
"""
def gPx(self,xu,sigma,data,EPSILON=0.01):
n = len(data[0])
r = []
a = (2*(np.pi)**(n/2))*np.linalg.det(sigma)**0.5
# EPSILON = np.float64(EPSILON)
test = np.array(data)
for row in test:
row = np.array(row)
d = np.matrix(row - xu)
d.shape = (n,1)
b = np.exp((-0.5*np.transpose(d)) * (np.linalg.inv(sigma)*d))
px = float(b/a)
r.append([px,int(px < EPSILON)])
return r
"""
This function uses stored learnt information to predict on raw data
In this case it will determin if we have an anomaly or not
@param xo raw observations (matrix)
@param info stored information about this
"""
def predict(self,xo,info):
xo = ML.Extract(info['features'],xo)
if not xo :
return None
sigma = info['parameters']['cov']
xu = info['parameters']['mean']
epsilon = info['performance']['epsilon']
return self.gPx(xu,sigma,xo,epsilon)
"""
This function computes performance metrics i.e precision, recall and f-score
for details visit https://en.wikipedia.org/wiki/Precision_and_recall
"""
def gPerformance(self,test,labels) :
N = len(test)
tp = 0 # true positive
fp = 0 # false positive
fn = 0 # false negative
tn = 0 # true negative
for i in range(0,N):
tp += 1 if (test[i][1]==labels[i] and test[i][1] == 1) else 0
fp += 1 if (test[i][1] != labels[i] and test[i][1] == 1) else 0
fn += 1 if (test[i][1] != labels[i] and test[i][1] == 0) else 0
tn += 1 if (test[i][1] == labels[i] and test[i][1] == 0) else 0
precision = tp / (tp + fp) if tp + fp > 0 else 1
recall = tp / (tp + fn) if tp + fp > 0 else 1
fscore = (2 * precision * recall)/ ((precision + recall) if (precision + recall) > 0 else 1)
return {"precision":precision,"recall":recall,"fscore":fscore}
"""
This function returns gaussian parameters i.e means and covariance
The information will be used to compute probabilities
"""
def gParameters(self,train) :
n = len(train[0])
m = np.transpose(np.array(train))
u = np.array([ np.mean(m[i][:]) for i in range(0,n)])
if np.sum(u) == 0:
return None
r = np.array([ np.sqrt(np.var(m[i,:])) for i in range(0,n)])
#
# Before we normalize the data we must insure there's is some level of movement in this application
# A lack of movement suggests we may not bave enough information to do anything
#
if 0 in r :
return None
#
#-- Normalizing the matrix then we will compute covariance matrix
#
m = np.array([ (m[i,:] - u[i])/r[i] for i in range(0,n)])
sigma = np.cov(m)
sigma = [ list(row) for row in sigma]
return {"cov":sigma,"mean":list(u)}
class AnalyzeAnomaly(AnomalyDetection):
def __init__(self):
AnomalyDetection.__init__(self)
"""
This analysis function will include a predicted status because an anomaly can either be
- A downtime i.e end of day
- A spike and thus a potential imminent crash
@param xo matrix of variables
@param info information about what was learnt
"""
def predict(self,xo,info):
x = xo[len(xo)-1]
r = AnomalyDetection.predict(self,[x],info)
#
# In order to determine what the anomaly is we compute the slope (idle or crash)
# The slope is computed using the covariance / variance of features
#
if r is not None:
N = len(info['features'])
xy = ML.Extract(info['features'],xo)
xy = np.array(xy)
vxy= np.array([ np.var(xy[:,i]) for i in range(0,N)])
cxy=np.array(info['parameters']['cov'])
#cxy=np.cov(np.transpose(xy))
if np.sum(vxy) == 0:
vxy = cxy
alpha = cxy/vxy
r = {"anomaly":r[0][1],"slope":list(alpha[:,0])}
return r
class Regression:
parameters = {}
@staticmethod
def predict(xo):
pass
def __init__(self,config):
pass