CO - bug fix & enhancements

community
Steve L. Nyemba 8 years ago
parent 677f037adf
commit e696cf1aac

@ -25,9 +25,10 @@ import re
import monitor
import Queue
from utils.transport import *
from utils.workers import ThreadManager, Factory
from utils.ml import ML,AnomalyDetection,AnalyzeAnomaly
#from utils.ml import ML,AnomalyDetection,AnalyzeAnomaly
import utils.params as SYS_ARGS
from utils.agents.actor import *
import pickle
from utils.agents.manager import Manager
@ -233,7 +234,10 @@ def app_status() :
@app.route('/init/collector',methods=['POST'])
def InitCollector():
"""
This endpoint is intended to initialize the collection agent
@pre registration of the client should be done against the store API
"""
global CONFIG
r = []
manager={}
@ -247,7 +251,9 @@ def InitCollector():
# @TODO : Validate the account & plan, insure preconditions are met/satisfied
#
m = {'apps':'monitor.DetailProcess','folders':'monitor.FileWatch'}
a = {'apps':'Apps','mailer':'Mailer','folder':'Folder'}
lagents = []
lactors = []
for id in m :
if id in body :
agent = eval(m[id]+"()")
@ -255,6 +261,9 @@ def InitCollector():
if args is not None :
agent.init(args)
lagents.append(agent)
#
# We should insure the user has access to actors :
#
config = dict(CONFIG)
#
# @TODO: The database name should be provided by the active plan

@ -30,6 +30,8 @@ class Analysis:
return {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute}
def getName(self):
return self.__class__.__name__
def reboot(self,row,conf) :
return False
def cleanup(self,text):
return re.sub('[^a-zA-Z0-9\s:]',' ',str(text)).strip()
@ -109,6 +111,8 @@ class Sandbox(Analysis):
return [row.replace('-',' ').replace('_',' ') for row in r if row.strip() != '']
def evaluate(self):
pass
def reboot(self,rows,limit=None) :
return sum([ len(item['missing']) for item in rows ]) > 0
"""
This function returns the ratio of existing modules relative to the ones expected
"""
@ -170,6 +174,8 @@ class DetailProcess(Analysis):
return list(g.groups())+['1']+[name]
else:
return ''
def reboot(self,rows,conf=None) :
return np.sum([int(item['label']=='crash') for item in rows]) > 0
def evaluate(self,name) :
cmd = "ps -eo pmem,pcpu,vsize,command|grep -E \":app\""
handler = subprocess.Popen(cmd.replace(":app",name),shell=True,stdout=subprocess.PIPE)
@ -286,6 +292,13 @@ class FileWatch(Analysis):
#return [self.split(stream) for stream in ostream if stream.strip() != '' and '.DS_Store' not in stream and 'total' not in stream]
return [self.split(stream) for stream in ostream if path not in stream and not set(['','total','.DS_Store']) & set(stream.split(' '))]
def toMB(self,size):
m = {'GB':1000,'TB':1000000}
v,u = size.split(' ')
return round(float(v)* m[u.upper()],2)
def reboot(self,rows,limit) :
return np.sum([ int(self.toMB(item['size']) > self.toMB(limit)) for item in rows]) > 0
def composite(self):
d = [] #-- vector of details (age,size)

@ -129,10 +129,12 @@ class Apps(Actor) :
#
class Mailer (Actor):
def __init__(self):
Actor.__init__(self)
"""
conf = {uid:<account>,host:<host>,port:<port>,password:<password>}
"""
def __init__(self,conf) :
def init(self,conf) :
self.uid = conf['uid']
@ -170,12 +172,14 @@ class Folders(Actor):
"""
This is designed to handle folders i.e cleaning/archiving the folders
if the user does NOT have any keys to cloud-view than she will not be able to archive
{threshold:value}
@params threshold in terms of size, or age. It will be applied to all folders
"""
def init(self,**args):
Actor.init(self,config,item)
self.lfolders = args['folders'] #config['folders']
self.action = args['action'] #{clear,archive} config['actions']['folders']
#self.lfolders = args['folders'] #config['folders']
#self.action = args['action'] #{clear,archive} config['actions']['folders']
self.threshold = self.get_size( args['threshold']) #self.config['threshold'])
self.item = item
@ -191,7 +195,8 @@ class Folders(Actor):
self.clean(item)
#
# @TODO: The archive can be uploaded to the cloud or else where
# - This allows the submission of data to a processing engine if there ever were one
# @param id cloud service idenfier {dropbox,box,google-drive,one-drive}
# @param key authorization key for the given service
#
pass
@ -211,6 +216,9 @@ class Folders(Actor):
#
def get_size(self,value):
"""
converts size values into MB and returns the value without units
"""
units = {'MB':1000,'GB':1000000,'TB':1000000000} # converting to kb
key = set(unites) & set(re.split('(\d+)',value.upper()))
if len(key) == 0:
@ -218,7 +226,7 @@ class Folders(Actor):
key = key.pop()
return float(value.upper().replace('MB','').strip()) * units[key]
def isvalid(self,item):
def can_clean(self,item):
"""
This function returns whether the following :
p : folder exists
@ -233,7 +241,7 @@ class Folders(Actor):
def analyze(self,logs):
r = {'clean':self.clean,'archive':self.archive}
for item in logs :
if self.isValid(item) :
if self.can_clean(item) :
id = self.config['action'].strip()
pointer = r[id]

@ -1,3 +1,9 @@
"""
Features :
- data collection
- detection, reboot (service)
- respond to commands (service)
"""
#from threading import Thread, RLock
from __future__ import division
import os
@ -11,13 +17,12 @@ class Manager() :
def version(self):
return 1.0
"""
delay : <value>
limit : <value>
scope : apps,folders,learner,sandbox
"""
def __init__(self):
#Thread.__init__(self)
#self.lock = RLock()
self.factory = DataSourceFactory()
def set(self,name,value):
setattr(name,value)
@ -26,8 +31,9 @@ class Manager() :
self.pool = args['pool']
self.config = args['config']
self.key = args['key']
print self.config['store']
self.status() #-- Initializing status information
def status(self) :
"""
This method inspect the plans for the current account and makes sure it can/should proceed
@ -48,14 +54,37 @@ class Manager() :
self.DELAY = -1
self.LIMIT = -1
scope = []
lactors= []
for item in meta :
scope = scope + item['scope'].split(',')
if 'actors' in item :
lactors= lactors + item['actors'].split(',')
self.pool = [agent for agent in self.pool if agent.getName() in scope]
self.actors = [ actor for actor in self.actors if actor.getName() in lactors]
return meta
def isvalid(self):
self.status()
return self.DELAY > -1 and self.LIMIT > -1
def post(self,row) :
"""
This function is designed to take appropriate action if a particular incident has been detected
@param label
@param row data pulled extracted
"""
message = {}
message['action'] = 'reboot'
message['node'] = label
def callback(self,channel,method,header,stream):
"""
This function enables the manager to be able to receive messages and delegate them to the appropriate actor
@channel
"""
print [channel,header]
message = json.loads(stream)
data = message['data']
def run(self):
#DELAY=35*60 #- 35 Minutes
#LIMIT=1000
@ -81,6 +110,12 @@ class Manager() :
else:
#label = id
row = data
#
#
if agent.reboot(row) :
#
self.post(row)
self.lock.acquire()
store = self.factory.instance(type=write_class,args=read_args)
store.flush(size=self.LIMIT)

Loading…
Cancel
Save