From cbed6dbb70760f321713a1236d2cf0a6568369fc Mon Sep 17 00:00:00 2001 From: "Steve L. Nyemba" Date: Sun, 22 Jan 2017 23:07:54 -0600 Subject: [PATCH] refactoring learning and enabling learning api --- src/api/index.py | 61 ++++++++++++++++++++++++++---------------- src/utils/ml.py | 2 +- src/utils/transport.py | 19 ++++++++----- 3 files changed, 52 insertions(+), 30 deletions(-) diff --git a/src/api/index.py b/src/api/index.py index 216c3af..0846092 100644 --- a/src/api/index.py +++ b/src/api/index.py @@ -43,7 +43,7 @@ p = CONFIG['store']['args'] class_read = CONFIG['store']['class']['read'] class_write= CONFIG['store']['class']['write'] factory = DataSourceFactory() -gReader = factory.instance(type=class_read,args=p) +# gReader = factory.instance(type=class_read,args=p) atexit.register(ThreadManager.stop) @app.route('/get/') @@ -99,8 +99,7 @@ def trends (): class_read = CONFIG['store']['class']['read'] - gReader = factory.instance(type=class_read,args=p) - + gReader = factory.instance(type=class_read,args=p) r = gReader.read() if id in r: r = r[id] #--matrix @@ -131,15 +130,17 @@ def dashboard(): This function is designed to trigger learning for anomaly detection @TODO: forward this to a socket i.e non-blocking socket """ -@app.route('/learn') +@app.route('/anomalies/get') def learn(): global CONFIG p = CONFIG['store']['args'] class_read = CONFIG['store']['class']['read'] gReader = factory.instance(type=class_read,args=p) d = gReader.read() + if 'learn' in d : info = d['learn'] + del d['learn'] else : info = [] @@ -147,27 +148,45 @@ def learn(): if 'id' in request.args: id = request.args['id'] d = d[id] - apps = CONFIG['monitor']['processes']['config'][id] - #print (apps) - params = {} for item in info: - id = item['label'] - params[id] = item + + label = item['label'] + params[label] = item + #apps = list(set(ML.Extract(['label'],d))) - p = AnomalyDetection() - for name in apps : - xo = ML.Filter('label',name,d) - _info = params[name] - #info = ML.Filter('label',app,logs) - value = p.predict(xo,_info) - print [row[1] for row in value] - break + r = [] + if params : + # + # If we have parameters available + p = AnomalyDetection() + apps = params.keys() + for name in apps : + if name not in params: + continue + _info = params[name] + try: + xo = ML.Filter('label',name,d) + except Exception,e: + xo = [] + #print name,e + if len(xo) == 0: + continue + xo = [xo[ len(xo) -1]] + + value = p.predict(xo,_info)[0] + + if len(value): + report = dict(_info,**{'predicton':value}) + r.append(report) + + + #print app,value #if value is not None: # r.append(value) - print r - return json.dumps([]) + + return json.dumps(r) @@ -175,10 +194,6 @@ def learn(): def anomalies_status(): pass -@app.route('/anomalies/get') -def anomalies_get(): - pass - if __name__== '__main__': #ThreadManager.start(CONFIG) diff --git a/src/utils/ml.py b/src/utils/ml.py index b06a5b5..3428533 100644 --- a/src/utils/ml.py +++ b/src/utils/ml.py @@ -116,7 +116,7 @@ class AnomalyDetection: @param info stored information about this """ def predict(self,xo,info): - + xo = ML.Extract(info['features'],xo) if not xo : diff --git a/src/utils/transport.py b/src/utils/transport.py index 6b990e9..487a7d1 100644 --- a/src/utils/transport.py +++ b/src/utils/transport.py @@ -246,12 +246,12 @@ class MessageQueue: self.qid = params['qid'] def isready(self): - self.init() + #self.init() resp = self.connection is not None and self.connection.is_open self.close() return resp def close(self): - + print "closing ..." self.channel.close() self.connection.close() """ @@ -463,7 +463,10 @@ class CouchdbReader(Couchdb,Reader): # # setting the basic parameters for Couchdb.__init__(self,**args) - self.filename = args['filename'] + if 'filename' in args : + self.filename = args['filename'] + else: + self.filename = None def isready(self): # @@ -487,8 +490,7 @@ class CouchdbReader(Couchdb,Reader): r = False return r - - def read(self,size=-1): + def stream(self): content = self.dbase.fetch_attachment(self.uid,self.filename).split('\n') ; i = 1 for row in content: @@ -496,7 +498,12 @@ class CouchdbReader(Couchdb,Reader): if size > 0 and i == size: break i = i + 1 - + + def read(self,size=-1): + if self.filename is not None: + self.stream() + else: + return self.basic_read() def basic_read(self): document = self.dbase.get(self.uid) del document['_id'], document['_rev']