From 8e7cad9a11a5282f20700646b770205bf002ddd5 Mon Sep 17 00:00:00 2001 From: "Steve L. Nyemba" Date: Tue, 29 Aug 2017 02:15:07 -0500 Subject: [PATCH] DC - House keeping work, removing unused files --- src/utils/mailer.py | 38 ------ src/utils/ml.py | 312 ------------------------------------------- src/utils/workers.py | 266 ------------------------------------ 3 files changed, 616 deletions(-) delete mode 100644 src/utils/mailer.py delete mode 100644 src/utils/ml.py delete mode 100644 src/utils/workers.py diff --git a/src/utils/mailer.py b/src/utils/mailer.py deleted file mode 100644 index c63f2bb..0000000 --- a/src/utils/mailer.py +++ /dev/null @@ -1,38 +0,0 @@ -import smtplib -from email.mime.multipart import MIMEMultipart -from email.mime.text import MIMEText - -class MailAgent : - def __init__(self,conf) : - self.uid = conf['uid'] - - - try: - - self.handler = smtplib.SMTP_SSL(conf['host'],conf['port']) - r = self.handler.login(self.uid,conf['password']) - # - # @TODO: Check the status of the authentication - # If not authenticated the preconditions have failed - # - except Exception,e: - print e - self.handler = None - pass - - - def send(self,**args) : - subject = args['subject'] - message = args['message'] - to = args['to'] - if '<' in message and '>' in message : - message = MIMEText(message,'html') - else: - message = MIMEText(message,'plain') - message['From'] = self.uid - message['To'] = to - message['Subject'] = subject - return self.handler.sendmail(self.uid,to,message.as_string()) - def close(self): - self.handler.quit() - diff --git a/src/utils/ml.py b/src/utils/ml.py deleted file mode 100644 index 4c56910..0000000 --- a/src/utils/ml.py +++ /dev/null @@ -1,312 +0,0 @@ -""" - This file is intended to perfom certain machine learning tasks based on numpy - We are trying to keep it lean that's why no sklearn involved yet - - @TODO: - Create factory method for the learners implemented here - Improve preconditions (size of the dataset, labels) -""" -from __future__ import division -import numpy as np - -class ML: - @staticmethod - def Filter (attr,value,data) : - # - # @TODO: Make sure this approach works across all transport classes - # We may have a potential issue of how the data is stored ... it may not scale - # - value = ML.CleanupName(value) - #return [item[0] for item in data if item and attr in item[0] and item[0][attr] == value] - #return [[item for item in row if item[attr] == value][0] for row in data] - # - # We are making the filtering more rescillient, i.e if an item doesn't exist we don't have to throw an exception - # This is why we expanded the loops ... fully expressive but rescilient - # - r = [] - for row in data : - if isinstance(row,list) : - for item in row : - - if attr in item and item[attr] == value: - r.append(item) - else: - # - # We are dealing with a vector of objects - # - if attr in row and row[attr] == value: - r.append(row) - - return r - @staticmethod - def Extract(lattr,data): - if isinstance(lattr,basestring): - lattr = [lattr] - # return [[row[id] for id in lattr] for row in data] - r = [[row[id] for id in lattr] for row in data] - if len(lattr) == 1 : - return [x[0] for x in r] - else: - return r - @staticmethod - def CleanupName(value) : - return value.replace('$','').replace('.+','') - @staticmethod - def distribution(xo,lock,scale=False) : - - d = [] - m = {} - if scale : - xu = np.mean(xo) - sd = np.sqrt(np.var(xo)) - for xi in xo : - value = round(xi,2) - if scale : - value = round((value - xu)/sd,2) - id = str(value) - lock.acquire() - if id in m : - index = m[id] - d[index][1] += 1 - else: - m[id] = len(d) - d.append([value,1]) - lock.release() - del m - return d - -""" - Implements a multivariate anomaly detection - @TODO: determine computationally determine epsilon -""" -class AnomalyDetection: - def __init__(self): - pass - def split(self,data,index=-1,threshold=0.65) : - N = len(data) - # if N < LIMIT: - # return None - - end = int(N*threshold) - train = data[:end] - test = data[end:] - - return {"train":train,"test":test} - - """ - - @param key field name by which the data will be filtered - @param value field value for the filter - @param features features to be used in the analysis - @param labels used to assess performance - @TODO: Map/Reduce does a good job at filtering - """ - def learn(self,data,key,value,features,label): - - - if len(data) < 10: - return None - xo = ML.Filter(key,value,data) - if len(xo) < 10 : - return None - # attr = conf['features'] - # label= conf['label'] - - yo= ML.Extract([label['name']],xo) - xo = ML.Extract(features,xo) - yo = self.getLabel(yo,label) - # - # @TODO: Insure this can be finetuned, training size matters for learning. It's not obvious to define upfront - # - xo = self.split(xo) - yo = self.split(yo) - p = self.gParameters(xo['train']) - has_cov = np.linalg.det(p['cov']) if p else False #-- making sure the matrix is invertible - - if xo['train'] and has_cov : - E = 0.001 - ACCEPTABLE_FSCORE = 0.6 - fscore = 0 - # - # We need to find an appropriate epsilon for the predictions - # The appropriate epsilon is one that yields an f-score [0.5,1[ - # - - __operf__ = None - perf = None - for i in range(0,10): - Epsilon = E + (2*E*i) - - if p is None : - return None - # - # At this point we've got enough data for the parameters - # We should try to fine tune epsilon for better results - # - - px = self.gPx(p['mean'],p['cov'],xo['test'],Epsilon) - - - __operf__ = self.gPerformance(px,yo['test']) - print value,__operf__ - if __operf__['fscore'] == 1 : - continue - if perf is None : - perf = __operf__ - elif perf['fscore'] < __operf__['fscore'] and __operf__['fscore'] > ACCEPTABLE_FSCORE : - perf = __operf__ - perf['epsilon'] = Epsilon - # - # At this point we are assuming we came out of the whole thing with an acceptable performance - # The understanding is that error drives performance thus we reject fscore==1 - # - - if perf and perf['fscore'] > ACCEPTABLE_FSCORE : - return {"label":value,"parameters":p,"performance":perf} - else: - return None - return None - """ - This function determines if the preconditions for learning are met - For that parameters are passed to the function - p - """ - def canLearn(self,p) : - pass - def getLabel(self,yo,label_conf): - return [ int(len(set(item) & set(label_conf["1"]))>0) for item in yo ] - - - """ - This function will compute the probability density function given a particular event/set of events - The return value is [px,yo] - @pre xu.shape[0] == sigma[0] == sigma[1] - """ - def gPx(self,xu,sigma,data,EPSILON=0.01): - n = len(data[0]) - - r = [] - a = (2*(np.pi)**(n/2))*np.linalg.det(sigma)**0.5 - # EPSILON = np.float64(EPSILON) - test = np.array(data) - for row in test: - row = np.array(row) - d = np.matrix(row - xu) - d.shape = (n,1) - - b = np.exp((-0.5*np.transpose(d)) * (np.linalg.inv(sigma)*d)) - - px = float(b/a) - r.append([px,int(px < EPSILON)]) - return r - """ - This function uses stored learnt information to predict on raw data - In this case it will determin if we have an anomaly or not - @param xo raw observations (matrix) - @param info stored information about this - """ - def predict(self,xo,info): - - xo = ML.Extract(info['features'],xo) - - if not xo : - return None - - sigma = info['parameters']['cov'] - xu = info['parameters']['mean'] - epsilon = info['performance']['epsilon'] - - return self.gPx(xu,sigma,xo,epsilon) - """ - This function computes performance metrics i.e precision, recall and f-score - for details visit https://en.wikipedia.org/wiki/Precision_and_recall - - """ - def gPerformance(self,test,labels) : - N = len(test) - tp = 0 # true positive - fp = 0 # false positive - fn = 0 # false negative - tn = 0 # true negative - for i in range(0,N): - tp += 1 if (test[i][1]==labels[i] and test[i][1] == 1) else 0 - fp += 1 if (test[i][1] != labels[i] and test[i][1] == 1) else 0 - fn += 1 if (test[i][1] != labels[i] and test[i][1] == 0) else 0 - tn += 1 if (test[i][1] == labels[i] and test[i][1] == 0) else 0 - precision = tp /( (tp + fp) if tp + fp > 0 else 1) - recall = tp / ((tp + fn) if tp + fn > 0 else 1) - - fscore = (2 * precision * recall)/ ((precision + recall) if (precision + recall) > 0 else 1) - return {"precision":precision,"recall":recall,"fscore":fscore} - - """ - This function returns gaussian parameters i.e means and covariance - The information will be used to compute probabilities - """ - def gParameters(self,train) : - - n = len(train[0]) - m = np.transpose(np.array(train)) - - u = np.array([ np.mean(m[i][:]) for i in range(0,n)]) - if np.sum(u) == 0: - return None - r = np.array([ np.sqrt(np.var(m[i,:])) for i in range(0,n)]) - # - # Before we normalize the data we must insure there's is some level of movement in this application - # A lack of movement suggests we may not bave enough information to do anything - # - if 0 in r : - return None - # - #-- Normalizing the matrix then we will compute covariance matrix - # - - m = np.array([ (m[i,:] - u[i])/r[i] for i in range(0,n)]) - sigma = np.cov(m) - sigma = [ list(row) for row in sigma] - return {"cov":sigma,"mean":list(u)} - -class AnalyzeAnomaly(AnomalyDetection): - def __init__(self): - AnomalyDetection.__init__(self) - """ - This analysis function will include a predicted status because an anomaly can either be - - A downtime i.e end of day - - A spike and thus a potential imminent crash - @param xo matrix of variables - @param info information about what was learnt - """ - def predict(self,xo,info): - x = xo[len(xo)-1] - r = AnomalyDetection.predict(self,[x],info) - # - # In order to determine what the anomaly is we compute the slope (idle or crash) - # The slope is computed using the covariance / variance of features - # - if r is not None: - N = len(info['features']) - xy = ML.Extract(info['features'],xo) - xy = np.array(xy) - - vxy= np.array([ np.var(xy[:,i]) for i in range(0,N)]) - cxy=np.array(info['parameters']['cov']) - #cxy=np.cov(np.transpose(xy)) - if np.sum(vxy) == 0: - vxy = cxy - - alpha = cxy/vxy - - - - r = {"anomaly":r[0][1],"slope":list(alpha[:,0])} - - return r -class Regression: - parameters = {} - @staticmethod - def predict(xo): - pass - - def __init__(self,config): - pass diff --git a/src/utils/workers.py b/src/utils/workers.py deleted file mode 100644 index 332d361..0000000 --- a/src/utils/workers.py +++ /dev/null @@ -1,266 +0,0 @@ -#import multiprocessing -from threading import Thread, RLock -#from utils import transport -from utils.transport import * -from utils.ml import AnomalyDetection,ML -import time -import monitor -import sys -import os -import datetime -class BasicWorker(Thread): - def __init__(self,config,lock): - Thread.__init__(self) - self.reader_class = config['store']['class']['read'] - self.write_class = config['store']['class']['write'] - self.rw_args = config['store']['args'] - self.factory = DataSourceFactory() - self.lock = lock - -""" - This class is intended to collect data given a configuration - -""" -class Top(Thread): - def __init__(self,_config,lock): - Thread.__init__(self) - self.lock = lock - self.reader_class = _config['store']['class']['read'] - self.write_class = _config['store']['class']['write'] - self.rw_args = _config['store']['args'] - self.factory = DataSourceFactory() - - self.name = 'Zulu-Top' - self.quit = False - - - className = ''.join(['monitor.',_config['monitor']['processes']['class'],'()']) - self.handler = eval(className) - self.config = _config['monitor']['processes']['config'] - def stop(self): - self.quit = True - def run(self): - while self.quit == False: - print ' ** ',self.name,datetime.datetime.today() - for label in self.config : - self.lock.acquire() - gwriter = self.factory.instance(type=self.write_class,args=self.rw_args) - apps = self.config[label] - self.handler.init(apps) - r = self.handler.composite() - - gwriter.write(label=label,row=r) - time.sleep(5) - self.lock.release() - if 'MONITOR_CONFIG_PATH' in os.environ: - # - # This suggests we are in development mode - # - break - ELLAPSED_TIME = 60*20 - time.sleep(ELLAPSED_TIME) - print "Exiting ",self.name - -class Learner(Thread) : - - """ - This function expects paltform config (store,learner) - It will leverage store and learner in order to operate - """ - def __init__(self,config,lock): - Thread.__init__(self) - self.name = 'Zulu-Learner' - self.lock = lock - self.reader_class = config['store']['class']['read'] - self.write_class = config['store']['class']['write'] - self.rw_args = config['store']['args'] - self.features = config['learner']['anomalies']['features'] - self.yo = config['learner']['anomalies']['label'] - self.apps = config['learner']['anomalies']['apps'] - self.factory = DataSourceFactory() - self.quit = False - - def stop(self): - self.quit = True - """ - This function will initiate learning every (x-hour) - If there is nothing to learn the app will simply go to sleep - """ - def run(self): - reader = self.factory.instance(type=self.reader_class,args=self.rw_args) - data = reader.read() - - - # - # Let's make sure we extract that which has aleady been learnt - # - - if 'learn' in data: - r = data['learn'] - del data['learn'] - - r = ML.Extract('label',r) - logs = [row[0] for row in r] - logs = list(set(logs)) - - - else: - logs = [] - # - # In order to address the inefficiencies below, we chose to adopt the following policy - # We don't learn that which is already learnt, This measure consists in filtering out the list of the apps that already have learning data - # - self.apps = list(set(self.apps) - set(logs)) - while self.quit == False: - r = {} - lapps = list(self.apps) - print ' ** ',self.name,datetime.datetime.today() - for key in data : - logs = data[key] - # - # There poor design at this point, we need to make sure things tested don't get tested again - # This creates innefficiencies (cartesian product) - # - for app in lapps: - handler = AnomalyDetection() - value = handler.learn(logs,'label',app,self.features,self.yo) - - if value is not None: - - if key not in r: - r[key] = {} - r[key][app] = value - i = lapps.index(app) - del lapps[i] - # - # This offers a clean write to the data store upon value retrieved - # The removal of the application enables us to improve efficiency (among other things) - # - value = dict(value,**{"features":self.features}) - self.lock.acquire() - writer = self.factory.instance(type=self.write_class,args=self.rw_args) - writer.write(label='learn',row=value) - self.lock.release() - - - # - # Usually this is used for development - # @TODO : Remove this and find a healthy way to stop the server - # - if 'MONITOR_CONFIG_PATH' in os.environ: - # - # This suggests we are in development mode - # - break - - TIME_ELLAPSED = 60*120 #-- Every 2 hours - time.sleep(TIME_ELLAPSED) - print "Exiting ",self.name - -class FileWatchWorker(BasicWorker): - def __init__(self,config,lock): - BasicWorker.__init__(self,config,lock) - self.name = "Zulu-FileWatch" - self.config = config ; - self.folder_config = config['monitor']['folders']['config'] - self.quit = False - def stop(self): - self.quit = True - def run(self): - TIME_ELAPSED = 60 * 10 - handler = monitor.FileWatch() - ml_handler = ML() - while self.quit == False : - r = [] - print ' ** ',self.name,datetime.datetime.today() - for id in self.folder_config : - folders = self.folder_config [id] - handler.init(folders) - xo = handler.composite() - # - # We should perform a distribution analysis of the details in order to have usable data - # - xrow = {} - xrow[id] = [] - for xo_row in xo: - xo_age = [row['age'] for row in xo_row['details']] - xo_size= [row['size'] for row in xo_row['details']] - xo_row['details'] = {"age":ML.distribution(xo_age,self.lock),"size":ML.distribution(xo_size,self.lock)} - - xo_row['id'] = id - xrow[id].append(xo_row) - # - # Now we can save the file - # - self.lock.acquire() - writer = self.factory.instance(type=self.write_class,args=self.rw_args) - writer.write(label='folders',row=xrow) - self.lock.release() - if 'MONITOR_CONFIG_PATH' in os.environ: - # - # This suggests we are in development mode - # - break - time.sleep(TIME_ELAPSED) - print 'Exiting ',self.name - -""" - This class is a singleton designed to start quit dependent threads - * monitor is designed to act as a data collection agent - * learner is designed to be a learner i.e machine learning model(s) - @TODO: - - How to move them to processes that can be read by the os (that would allow us to eat our own dog-food) - - Additionally we also need to have a pruning thread, to control the volume of data we have to deal with.This instills the "will to live" in the application -""" -class ThreadManager: - - Pool = {} - @staticmethod - def start(config): - lock = RLock() - ThreadManager.Pool['monitor'] = Top(config,lock) - ThreadManager.Pool['learner'] = Learner(config,lock) - if 'folders' not in config : - ThreadManager.Pool['file-watch'] = FileWatchWorker(config,lock) - for id in ThreadManager.Pool : - thread = ThreadManager.Pool[id] - thread.start() - @staticmethod - def stop(): - for id in ThreadManager.Pool : - thread = ThreadManager.Pool[id] - thread.stop() - @staticmethod - def status(): - r = {} - for id in ThreadManager.Pool : - thread = ThreadManager.Pool[id] - r[id] = thread.isAlive() - - - - -class Factory : - """ - This function will return an instance of an object in the specified in the configuration file - """ - @staticmethod - def instance(id,config): - if id in config['monitor'] : - className = config['monitor'][id]['class'] - ref = "".join(["monitor.",className,"()"]) - ref = eval(ref) - return {"class":ref,"config":config['monitor'][id]["config"]} - else: - return None - - -if __name__ =='__main__' : - import utils.params as SYS_ARGS - import json - PARAMS = SYS_ARGS.PARAMS - f = open(PARAMS['path']) - CONFIG = json.loads(f.read()) - f.close() - ThreadManager.start(CONFIG) - \ No newline at end of file