From e696cf1aac44b0ca81ef601824d32d5d6f94f4b0 Mon Sep 17 00:00:00 2001 From: "Steve L. Nyemba" Date: Mon, 28 Aug 2017 08:57:11 -0500 Subject: [PATCH] CO - bug fix & enhancements --- src/api/index.py | 15 +++++++++++--- src/monitor.py | 13 ++++++++++++ src/utils/agents/actor.py | 20 ++++++++++++------ src/utils/agents/manager.py | 41 ++++++++++++++++++++++++++++++++++--- 4 files changed, 77 insertions(+), 12 deletions(-) diff --git a/src/api/index.py b/src/api/index.py index 1cf1304..d8b48dc 100644 --- a/src/api/index.py +++ b/src/api/index.py @@ -25,9 +25,10 @@ import re import monitor import Queue from utils.transport import * -from utils.workers import ThreadManager, Factory -from utils.ml import ML,AnomalyDetection,AnalyzeAnomaly + +#from utils.ml import ML,AnomalyDetection,AnalyzeAnomaly import utils.params as SYS_ARGS +from utils.agents.actor import * import pickle from utils.agents.manager import Manager @@ -233,7 +234,10 @@ def app_status() : @app.route('/init/collector',methods=['POST']) def InitCollector(): - +""" + This endpoint is intended to initialize the collection agent + @pre registration of the client should be done against the store API +""" global CONFIG r = [] manager={} @@ -247,7 +251,9 @@ def InitCollector(): # @TODO : Validate the account & plan, insure preconditions are met/satisfied # m = {'apps':'monitor.DetailProcess','folders':'monitor.FileWatch'} + a = {'apps':'Apps','mailer':'Mailer','folder':'Folder'} lagents = [] + lactors = [] for id in m : if id in body : agent = eval(m[id]+"()") @@ -255,6 +261,9 @@ def InitCollector(): if args is not None : agent.init(args) lagents.append(agent) + # + # We should insure the user has access to actors : + # config = dict(CONFIG) # # @TODO: The database name should be provided by the active plan diff --git a/src/monitor.py b/src/monitor.py index 9945a95..c973d2d 100755 --- a/src/monitor.py +++ b/src/monitor.py @@ -30,6 +30,8 @@ class Analysis: return {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute} def getName(self): return self.__class__.__name__ + def reboot(self,row,conf) : + return False def cleanup(self,text): return re.sub('[^a-zA-Z0-9\s:]',' ',str(text)).strip() @@ -109,6 +111,8 @@ class Sandbox(Analysis): return [row.replace('-',' ').replace('_',' ') for row in r if row.strip() != ''] def evaluate(self): pass + def reboot(self,rows,limit=None) : + return sum([ len(item['missing']) for item in rows ]) > 0 """ This function returns the ratio of existing modules relative to the ones expected """ @@ -170,6 +174,8 @@ class DetailProcess(Analysis): return list(g.groups())+['1']+[name] else: return '' + def reboot(self,rows,conf=None) : + return np.sum([int(item['label']=='crash') for item in rows]) > 0 def evaluate(self,name) : cmd = "ps -eo pmem,pcpu,vsize,command|grep -E \":app\"" handler = subprocess.Popen(cmd.replace(":app",name),shell=True,stdout=subprocess.PIPE) @@ -286,6 +292,13 @@ class FileWatch(Analysis): #return [self.split(stream) for stream in ostream if stream.strip() != '' and '.DS_Store' not in stream and 'total' not in stream] return [self.split(stream) for stream in ostream if path not in stream and not set(['','total','.DS_Store']) & set(stream.split(' '))] + def toMB(self,size): + m = {'GB':1000,'TB':1000000} + v,u = size.split(' ') + return round(float(v)* m[u.upper()],2) + + def reboot(self,rows,limit) : + return np.sum([ int(self.toMB(item['size']) > self.toMB(limit)) for item in rows]) > 0 def composite(self): d = [] #-- vector of details (age,size) diff --git a/src/utils/agents/actor.py b/src/utils/agents/actor.py index 3e7c58f..52e09ec 100644 --- a/src/utils/agents/actor.py +++ b/src/utils/agents/actor.py @@ -129,10 +129,12 @@ class Apps(Actor) : # class Mailer (Actor): + def __init__(self): + Actor.__init__(self) """ conf = {uid:,host:,port:,password:} """ - def __init__(self,conf) : + def init(self,conf) : self.uid = conf['uid'] @@ -170,12 +172,14 @@ class Folders(Actor): """ This is designed to handle folders i.e cleaning/archiving the folders if the user does NOT have any keys to cloud-view than she will not be able to archive + {threshold:value} + @params threshold in terms of size, or age. It will be applied to all folders """ def init(self,**args): Actor.init(self,config,item) - self.lfolders = args['folders'] #config['folders'] - self.action = args['action'] #{clear,archive} config['actions']['folders'] + #self.lfolders = args['folders'] #config['folders'] + #self.action = args['action'] #{clear,archive} config['actions']['folders'] self.threshold = self.get_size( args['threshold']) #self.config['threshold']) self.item = item @@ -191,7 +195,8 @@ class Folders(Actor): self.clean(item) # # @TODO: The archive can be uploaded to the cloud or else where - # - This allows the submission of data to a processing engine if there ever were one + # @param id cloud service idenfier {dropbox,box,google-drive,one-drive} + # @param key authorization key for the given service # pass @@ -211,6 +216,9 @@ class Folders(Actor): # def get_size(self,value): + """ + converts size values into MB and returns the value without units + """ units = {'MB':1000,'GB':1000000,'TB':1000000000} # converting to kb key = set(unites) & set(re.split('(\d+)',value.upper())) if len(key) == 0: @@ -218,7 +226,7 @@ class Folders(Actor): key = key.pop() return float(value.upper().replace('MB','').strip()) * units[key] - def isvalid(self,item): + def can_clean(self,item): """ This function returns whether the following : p : folder exists @@ -233,7 +241,7 @@ class Folders(Actor): def analyze(self,logs): r = {'clean':self.clean,'archive':self.archive} for item in logs : - if self.isValid(item) : + if self.can_clean(item) : id = self.config['action'].strip() pointer = r[id] diff --git a/src/utils/agents/manager.py b/src/utils/agents/manager.py index 07ffb74..aeedc92 100644 --- a/src/utils/agents/manager.py +++ b/src/utils/agents/manager.py @@ -1,3 +1,9 @@ +""" + Features : + - data collection + - detection, reboot (service) + - respond to commands (service) +""" #from threading import Thread, RLock from __future__ import division import os @@ -11,13 +17,12 @@ class Manager() : def version(self): return 1.0 """ + delay : limit : scope : apps,folders,learner,sandbox """ def __init__(self): - #Thread.__init__(self) - #self.lock = RLock() self.factory = DataSourceFactory() def set(self,name,value): setattr(name,value) @@ -26,8 +31,9 @@ class Manager() : self.pool = args['pool'] self.config = args['config'] self.key = args['key'] - print self.config['store'] + self.status() #-- Initializing status information + def status(self) : """ This method inspect the plans for the current account and makes sure it can/should proceed @@ -48,14 +54,37 @@ class Manager() : self.DELAY = -1 self.LIMIT = -1 scope = [] + lactors= [] for item in meta : scope = scope + item['scope'].split(',') + if 'actors' in item : + lactors= lactors + item['actors'].split(',') self.pool = [agent for agent in self.pool if agent.getName() in scope] + self.actors = [ actor for actor in self.actors if actor.getName() in lactors] + return meta def isvalid(self): self.status() return self.DELAY > -1 and self.LIMIT > -1 + def post(self,row) : + """ + This function is designed to take appropriate action if a particular incident has been detected + @param label + @param row data pulled extracted + """ + message = {} + message['action'] = 'reboot' + message['node'] = label + + def callback(self,channel,method,header,stream): + """ + This function enables the manager to be able to receive messages and delegate them to the appropriate actor + @channel + """ + print [channel,header] + message = json.loads(stream) + data = message['data'] def run(self): #DELAY=35*60 #- 35 Minutes #LIMIT=1000 @@ -81,6 +110,12 @@ class Manager() : else: #label = id row = data + + # + # + if agent.reboot(row) : + # + self.post(row) self.lock.acquire() store = self.factory.instance(type=write_class,args=read_args) store.flush(size=self.LIMIT)