From cbef913877ecd9bd3d1d1762f8c847badcdd5237 Mon Sep 17 00:00:00 2001 From: steve Date: Thu, 19 Jan 2017 16:48:39 -0600 Subject: [PATCH] fixing learning bugs on integration --- src/api/index.py | 89 +++++++++++++++++++++----------------------- src/utils/ml.py | 51 ++++++++++++++++++++++--- src/utils/workers.py | 56 +++++++++++++++++++++------- 3 files changed, 129 insertions(+), 67 deletions(-) diff --git a/src/api/index.py b/src/api/index.py index 4e32b65..7a07f7a 100644 --- a/src/api/index.py +++ b/src/api/index.py @@ -23,40 +23,17 @@ import monitor import Queue from utils.transport import * from utils.workers import ThreadManager, Factory +from utils.ml import ML,AnomalyDetection +import utils.params as SYS_ARGS import atexit -PARAMS = {'context':''} -if len(sys.argv) > 1: - - N = len(sys.argv) - for i in range(1,N): - value = None - if sys.argv[i].startswith('--'): - key = sys.argv[i].replace('-','') - - if i + 1 < N: - value = sys.argv[i + 1] = sys.argv[i+1].strip() - if key and value: - PARAMS[key] = value - - i += 2 - app = Flask(__name__) app.config['SECRET_KEY'] = '!h8-[0v8]247-4-360' #app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX=?RT' - +PARAMS = SYS_ARGS.PARAMS f = open(PARAMS['path']) CONFIG = json.loads(f.read()) -#HANDLERS= {} - -#for key in CONFIG['monitor'] : - - #className = CONFIG['monitor'][key]['class'] - #ref = "".join(["monitor.",className,"()"]) - #ref = eval(ref) - #HANDLERS[key] = {"class":ref,"config":CONFIG['monitor'][key]["config"]} - f.close() # @@ -66,12 +43,7 @@ p = CONFIG['store']['args'] class_read = CONFIG['store']['class']['read'] class_write= CONFIG['store']['class']['write'] factory = DataSourceFactory() -#gWriter = factory.instance(type='CouchdbWritera',args=p) -#gReader = factory.instance(type='CouchdbReader',args=p) -#p['qid'] = HANDLERS['processes']['config'].keys() gReader = factory.instance(type=class_read,args=p) -#gWriter = factory.instance(type=class_write,args=p) -#mthread = monitor.Monitor(HANDLERS,gWriter,'processes',) atexit.register(ThreadManager.stop) @app.route('/get/') @@ -82,9 +54,13 @@ def procs(id): r = {} for label in d : - index = len(d[label]) - 1 - r[label] = d[label][index] - + if label not in ['learn'] : + index = len(d[label]) - 1 + r[label] = d[label][index] + #for row in r[label] : + #yo = ML.Extract(['status'],row) + #xo = ML.Extract(['cpu_usage','memory_usage'],row) + except Exception, e: print e r = [] @@ -122,13 +98,14 @@ def trends (): p = CONFIG['store']['args'] class_read = CONFIG['store']['class']['read'] - p['qid'] =[id] #HANDLERS['processes']['config'].keys() + gReader = factory.instance(type=class_read,args=p) r = gReader.read() if id in r: r = r[id] #--matrix series = [] + for row in r: series += [item for item in row if str(item['label'])== app] @@ -156,19 +133,37 @@ def dashboard(): """ @app.route('/learn') def learn(): - app = request.args.get('app') - id = request.args.get('id') + global CONFIG p = CONFIG['store']['args'] - class_read = CONFIG['store']['class']['read'] - - p['qid'] =[id] #HANDLERS['processes']['config'].keys() + class_read = CONFIG['store']['class']['read'] gReader = factory.instance(type=class_read,args=p) - - r = gReader.read() - r = r[id] - r = ML.Filter('label',app,r) - label = ML.Extract(['status'],r) - r = ML.Extract(['cpu_usage','memory_usage'],r) + d = gReader.read() + if 'learn' in d : + logs = d['learn'] + del d['learn'] + else : + logs = [] + r = [] + if 'id' in request.args: + id = request.args['id'] + d = d[id] + print CONFIG['monitor']['processes']['config'][id] + print (apps) + + + #apps = list(set(ML.Extract(['label'],d))) + p = AnomalyDetection() + #for row in d : + #xo = ML.Filter('label',app,d) + #info = ML.Filter('label',app,logs) + #value = p.predict(xo,info) + #print app,value + #if value is not None: + # r.append(value) + print r + return json.dumps("[]") + + @app.route('/anomalies/status') def anomalies_status(): @@ -180,7 +175,7 @@ def anomalies_get(): if __name__== '__main__': - ThreadManager.start(CONFIG) + #ThreadManager.start(CONFIG) app.run(host='0.0.0.0',debug=True,threaded=True) diff --git a/src/utils/ml.py b/src/utils/ml.py index 8c722bd..041dad1 100644 --- a/src/utils/ml.py +++ b/src/utils/ml.py @@ -16,9 +16,12 @@ class ML: # @TODO: Make sure this approach works across all transport classes # We may have a potential issue of how the data is stored ... it may not scale # + return [item[0] for item in data if item and attr in item[0] and item[0][attr] == value] @staticmethod def Extract(lattr,data): + if isinstance(lattr,basestring): + lattr = [lattr] return [[row[id] for id in lattr] for row in data] @@ -27,6 +30,7 @@ class ML: @TODO: determine computationally determine epsilon """ class AnomalyDetection: + def split(self,data,index=-1,threshold=0.8) : N = len(data) # if N < LIMIT: @@ -64,12 +68,21 @@ class AnomalyDetection: yo = self.split(yo) if xo['train'] : - p = self.gParameters(xo['train']) - - px = self.gPx(p['mean'],p['cov'],xo['test']) + E = 0.01 + for i in range(0,10): + Epsilon = E + (2*E*i) + p = self.gParameters(xo['train']) + + px = self.gPx(p['mean'],p['cov'],xo['test'],Epsilon) + + perf = self.gPerformance(px,yo['test']) + if perf['fscore'] > 0 : + + perf['epsilon'] = Epsilon + + break - perf = self.gPerformance(px,yo['test']) - return {"parameters":p,"performance":perf} + return {"label":value,"parameters":p,"performance":perf} return None def getLabel(self,yo,label_conf): return [ int(len(set(item) & set(label_conf["1"]))>0) for item in yo ] @@ -77,9 +90,10 @@ class AnomalyDetection: """ This function will compute the probability density function given a particular event/set of events + The return value is [px,yo] @pre xu.shape[0] == sigma[0] == sigma[1] """ - def gPx(self,xu,sigma,data,EPSILON=0.25): + def gPx(self,xu,sigma,data,EPSILON=0.01): n = len(data[0]) r = [] @@ -95,6 +109,21 @@ class AnomalyDetection: px = float(b/a) r.append([px,int(px < EPSILON)]) return r + """ + This function uses stored learnt information to predict on raw data + In this case it will determin if we have an anomaly or not + @param xo raw observations (matrix) + @param info stored information about this + """ + def predict(self,xo,info): + xo = ML.Filter(info['extract'],xo) + if not xo : + return None + + sigma = info['parameters']['cov'] + xu = info['parameters']['mean'] + epsilon = info['performance']['epsilon'] + return self.getPx(xu,sigma,xo,epsilon) """ This function computes performance metrics i.e precision, recall and f-score for details visit https://en.wikipedia.org/wiki/Precision_and_recall @@ -134,3 +163,13 @@ class AnomalyDetection: sigma = np.cov(m) sigma = [ list(row) for row in sigma] return {"cov":sigma,"mean":list(u)} + + +class Regression: + parameters = {} + @staticmethod + def predict(xo): + pass + + def __init__(self,config): + pass \ No newline at end of file diff --git a/src/utils/workers.py b/src/utils/workers.py index 706ee83..3ad0cac 100644 --- a/src/utils/workers.py +++ b/src/utils/workers.py @@ -1,7 +1,7 @@ #import multiprocessing from threading import Thread, RLock from utils import transport -from utils.ml import AnomalyDetection +from utils.ml import AnomalyDetection,ML import time import monitor import sys @@ -76,16 +76,38 @@ class Learner(Thread) : def run(self): reader = self.factory.instance(type=self.reader_class,args=self.rw_args) data = reader.read() + + + # + # Let's make sure we extract that which has aleady been learnt + # + + if 'learn' in data: + r = data['learn'] + del data['learn'] + + r = ML.Extract('label',r) + logs = [row[0] for row in r] + logs = list(set(logs)) + + + else: + logs = [] # - # This is the motherload of innefficiency ... + # In order to address the inefficiencies below, we chose to adopt the following policy + # We don't learn that which is already learnt, This measure consists in filtering out the list of the apps that already have learning data # + self.apps = list(set(self.apps) - set(logs)) while self.quit == False: r = {} + lapps = list(self.apps) for key in data : logs = data[key] - - for app in self.apps: - + # + # There poor design at this point, we need to make sure things tested don't get tested again + # This creates innefficiencies (cartesian product) + # + for app in lapps: handler = AnomalyDetection() value = handler.learn(logs,'label',app,self.features,self.yo) @@ -94,17 +116,23 @@ class Learner(Thread) : if key not in r: r[key] = {} r[key][app] = value + i = lapps.index(app) + del lapps[i] + # + # This offers a clean write to the data store upon value retrieved + # The removal of the application enables us to improve efficiency (among other things) + # + value = dict(value,**{"features":self.features}) + self.lock.acquire() + writer = self.factory.instance(type=self.write_class,args=self.rw_args) + writer.write(label='learn',row=value) + self.lock.release() + + # - # At this point we've already learnt every thing we need to learn + # Usually this is used for development + # @TODO : Remove this and find a healthy way to stop the server # - - if r.keys() : - - self.lock.acquire() - writer = self.factory.instance(type=self.write_class,args=self.rw_args) - writer.write(label='learn',row=r) - self.lock.release() - if 'MONITOR_CONFIG_PATH' in os.environ: # # This suggests we are in development mode