refactoring workers for simpler use & minor ui bug fixes

master
Steve L. Nyemba 8 years ago
parent fb6d0df6f0
commit 99f43359ed

@ -99,7 +99,7 @@ def procs(id):
def sandbox(): def sandbox():
global CONFIG global CONFIG
if 'sandbox' in CONFIG['monitor']: if 'sandbox' in CONFIG: #CONFIG['monitor']:
#handler = HANDLERS['sandbox']['class'] #handler = HANDLERS['sandbox']['class']
#conf = HANDLERS['sandbox']['config'] #conf = HANDLERS['sandbox']['config']
r = [] r = []

@ -85,6 +85,7 @@
</div> </div>
<div id="sandbox_pager"></div> <div id="sandbox_pager"></div>
</div> </div>
</div>
<div style="margin-top:10px"> <div style="margin-top:10px">
<div id="folder_summary"> <div id="folder_summary">
<div class=""> <div class="">
@ -145,7 +146,7 @@
<div id="chartfolder" ></div> <div id="chartfolder" ></div>
</div> </div>
</div>
</div> </div>

@ -28,6 +28,8 @@ class Analysis:
def getNow(self): def getNow(self):
d = datetime.datetime.now() d = datetime.datetime.now()
return {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute} return {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute}
def getName(self):
return self.__class__.__name__
""" """
This class is designed to analyze environment variables. Environment variables can either be folders, files or simple values This class is designed to analyze environment variables. Environment variables can either be folders, files or simple values
@ -155,7 +157,8 @@ class DetailProcess(Analysis):
def init (self,names): def init (self,names):
#Analysis.init(self) #Analysis.init(self)
self.names = names; self.names = names;
def getName(self):
return "apps"
def split(self,name,stream): def split(self,name,stream):
pattern = "(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)".replace(":name",name).strip() pattern = "(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)\x20*(\d+.{0,1}\d*)".replace(":name",name).strip()
@ -235,6 +238,8 @@ class FileWatch(Analysis):
pass pass
def init(self,folders): def init(self,folders):
self.folders = folders; self.folders = folders;
def getName(self):
return "folders"
def split(self,row): def split(self,row):
x = row.split(' ') x = row.split(' ')

@ -0,0 +1,102 @@
"""
This is the implementation of a data collection agent
The agent's role is intended to :
- collect data associated with folder and processes
- The agent will also perform various learning tasks
Usage:
python --path <config> --delay xxx --procs p1,p2,p3 --folders path1,path2
"""
from threading import Thread, RLock
from utils.params import PARAMS
import os
import json
import time
from datetime import datetime
from utils.transport import *
import monitor
class ICollector(Thread) :
def __init__(self) :
Thread.__init__(self)
self.folders = None
self.procs = None
self.config = None
self.pool = []
self.lock = RLock()
self.factory = DataSourceFactory()
self.init()
def init(self):
#
# data store configuration (needs to be in a file)
#
path = PARAMS['path']
if os.path.exists(path) :
f = open(path)
self.config = json.loads(f.read())
#if 'store' in self.config :
# self.config = self.config['store']
f.close()
self.id = self.config['id'] #PARAMS['id']
if 'folders' in self.config : #PARAMS :
folders = self.config['folders'] #PARAMS['folders'].split(',')
self.register('monitor.FileWatch',folders)
if 'procs' in self.config : #PARAMS :
procs = self.config['procs'] #PARAMS['procs'].split(',')
self.register('monitor.DetailProcess',procs)
self.quit = False
#self.DELAY = PARAMS['delay']*60
self.DELAY = self.config['delay']
"""
This function returns an instance of a data collector class :
ProcessDetails, FileWatch, ... provided the class name
"""
def register(self,className,params) :
try:
agent = eval(className+"()")
agent.init(params)
self.pool.append( agent )
except Exception,e:
print e
def stop(self):
self.quit = True
def run(self):
write_class = self.config['store']['class']['write']
read_args = self.config['store']['args']
while self.quit == False:
for thread in self.pool :
id = "@".join([thread.getName(),self.id])
data = thread.composite()
label = thread.getName()
row = {}
if label == 'folders':
row[id] = data
else:
label = id
row = data
self.lock.acquire()
store = self.factory.instance(type=write_class,args=read_args)
store.write(label=label,row=row)
self.lock.release()
if 'MONITOR_CONFIG_PATH' in os.environ :
break
time.sleep(self.DELAY)
print ' *** Exiting ',self.name
# read_class=self.config['class']['read']
# store = self.factory.instance(type=write_class,args=read_args)
# store.flush()
if __name__ == '__main__':
thread = ICollector()
# thread.daemon = True
thread.start()

@ -0,0 +1,83 @@
"""
This file encapsulates a class that is intended to perform learning
"""
from __future__ import division
import numpy as np
from threading import Thread,RLock
from utils.transport import *
from utils.ml import AnomalyDetection,ML
from utils.params import PARAMS
import time
"""
This class is intended to apply anomaly detection to various areas of learning
The areas of learning that will be skipped are :
['_id','_rev','learn'] ...
@TODO:
- Find a way to perform dimensionality reduction if need be
"""
class Anomalies(Thread) :
def __init__(self,lock):
Thread.__init__(self)
path = PARAMS['path']
self.name = self.__class__.__name__.lower()
if os.path.exists(path) :
f = open(path)
self.config = json.loads(f.read())
f.close()
#
# Initializing data store & factory class
#
self.id = self.config['id']
self.apps = self.config['procs'] if 'procs' in self.config else []
self.rclass = self.config['store']['class']['read']
self.wclass = self.config['store']['class']['write']
self.rw_args = self.config['store']['args']
self.factory = DataSourceFactory()
self.quit = False
self.lock = lock
def format(self,stream):
pass
def stop(self):
self.quit = True
def run(self):
DELAY = self.config['delay'] * 60
reader = self.factory.instance(type=self.rclass,args=self.rw_args)
data = reader.read()
key = 'apps'
rdata = data[key]
features = ['memory_usage','cpu_usage']
yo = {"1":["running"],"name":"status"}
while self.quit == False :
print ' *** ',self.name, ' ' , str(datetime.today())
for app in self.apps:
print '\t',str(datetime.today()),' ** ',app
logs = ML.Filter('label',app,rdata)
if logs :
handler = AnomalyDetection()
value = handler.learn(logs,'label',app,features,yo)
print value
if value is not None:
value = dict(value,**{"features":features})
r[id][app] = value
self.lock.acquire()
writer = self.factory.instance(type=self.wclass,args=self.rw_args)
writer.write(label='learn',row=value)
self.lock.release()
#
if 'MONITOR_CONFIG_PATH' in os.environ :
break
time.sleep(DELAY)
print ' *** Exiting ',self.name.replace('a','A')
class Regression(Thread):
def __init__(self,params):
pass
if __name__ == '__main__' :
lock = RLock()
thread = Anomalies(lock)
thread.start()

@ -10,6 +10,7 @@ import numpy as np
from couchdbkit import Server from couchdbkit import Server
import re import re
from csv import reader from csv import reader
from datetime import datetime
""" """
@TODO: Write a process by which the class automatically handles reading and creating a preliminary sample and discovers the meta data @TODO: Write a process by which the class automatically handles reading and creating a preliminary sample and discovers the meta data
""" """
@ -556,6 +557,23 @@ class CouchdbWriter(Couchdb,Writer):
document[label].append(row) document[label].append(row)
self.dbase.save_doc(document) self.dbase.save_doc(document)
def flush(self,params=None):
document = self.dbase.get(self.uid)
content = {}
_doc = {}
for id in document:
if id in ['_id','_rev','_attachments'] :
_doc[id] = document[id]
else:
content[id] = document[id]
content = json.dumps(content)
document= _doc
now = str(datetime.today())
name = '-'.join([document['_id'] , now,'.json'])
self.dbase.save_doc(document)
self.dbase.put_attachment(document,content,name,'application/json')
""" """
This class acts as a factory to be able to generate an instance of a Reader/Writer This class acts as a factory to be able to generate an instance of a Reader/Writer
Against a Queue,Disk,Cloud,Couchdb Against a Queue,Disk,Cloud,Couchdb

Loading…
Cancel
Save