CO - Enabled configuration of actors and agents tied to plan

community
Steve L. Nyemba 8 years ago
parent e696cf1aac
commit 7711472513

Binary file not shown.

@ -23,8 +23,10 @@ import os
import json
import re
import monitor
import Queue
from utils.transport import *
from utils.agents import actor
#from utils.ml import ML,AnomalyDetection,AnalyzeAnomaly
import utils.params as SYS_ARGS
@ -251,27 +253,31 @@ def InitCollector():
# @TODO : Validate the account & plan, insure preconditions are met/satisfied
#
m = {'apps':'monitor.DetailProcess','folders':'monitor.FileWatch'}
a = {'apps':'Apps','mailer':'Mailer','folder':'Folder'}
a = {'apps':'Apps','mailer':'Mailer','folders':'Folder'}
lagents = []
lactors = []
for id in m :
if id in body :
agent = eval(m[id]+"()")
actor = eval(m[id]+"()")
args = body[id] if id in body else None
if args is not None :
agent.init(args)
lagents.append(agent)
lactors.append(actor)
#
# We should insure the user has access to actors :
#
config = dict(CONFIG)
#
# @TODO: The database name should be provided by the active plan
# The database name will be overriden by the user's current plan
#
config['store']['args']['dbname'] = 'monitor-logs'
config['store']['args']['dbname'] = None #'monitor-logs'
config['store']['args']['uid'] = key
manager = Manager()
manager.init(pool=lagents,config=config,key=key,node=node) ;
manager.init(actors = actors,agents=lagents,config=config,key=key,node=node) ;
r = [pickle.dumps(manager)]
except Exception,e:
print '***** ',e

@ -126,9 +126,12 @@ class Apps(Actor) :
if self.can_start(name) :
self.startup(name)
#
#
class Mailer (Actor):
"""
This class is a mailer agent
"""
def __init__(self):
Actor.__init__(self)
"""

@ -28,9 +28,10 @@ class Manager() :
setattr(name,value)
def init(self,**args) :
self.id = args['node']
self.pool = args['pool']
self.agents = args['agents']
self.config = args['config']
self.key = args['key']
self.actors = args['actors']
self.status() #-- Initializing status information
@ -53,17 +54,77 @@ class Manager() :
else:
self.DELAY = -1
self.LIMIT = -1
def filter(meta) :
scope = []
lactors= []
for item in meta :
scope = scope + item['scope'].split(',')
if 'actors' in item :
lactors= lactors + item['actors'].split(',')
self.pool = [agent for agent in self.pool if agent.getName() in scope]
self.agents = [agent for agent in self.agents if agent.getName() in scope]
self.actors = [ actor for actor in self.actors if actor.getName() in lactors]
if len(self.actors) > 0 :
#
# We should configure the actors accordingly and make sure they are operational
#
conf = {"folders":meta['folder_threshold'],"apps":None}
#
# We need to get the configuration for the apps remotely
#
read_class = self.config['store']['class']['read']
read_args = self.config['store']['args']
couchdb = self.factory.instance(type=read_class,args=read_args)
uinfo = couchdb.view('config/apps',key=self.key)
if 'apps' in uinfo :
conf['apps'] = uinfo['apps']
mailer = None
for actor in self.actors :
id = actor.getIdentifier()
if id == "mailer" :
mailer = actor.Mailer()
if conf[id] is None :
continue
args = conf[id]
actor.init(args)
#
# Initializing the mailer
if mailer is not None and mailer in self.config:
mailer.init(self.config['mailer'])
return meta
def setup(self,meta):
#
# We should configure the actors accordingly and make sure they are operational
#
conf = {"folders":meta['folder_threshold'],"apps":None}
#
# We need to get the configuration for the apps remotely
#
read_class = self.config['store']['class']['read']
read_args = self.config['store']['args']
couchdb = self.factory.instance(type=read_class,args=read_args)
uinfo = couchdb.view('config/apps',key=self.key)
if 'apps' in uinfo :
conf['apps'] = uinfo['apps']
mailer = None
for actor in self.actors :
id = actor.getIdentifier()
if id == "mailer" :
mailer = actor.Mailer()
if conf[id] is None :
continue
args = conf[id]
actor.init(args)
#
# Initializing the mailer
if mailer is not None and mailer in self.config:
mailer.init(self.config['mailer'])
def isvalid(self):
self.status()
return self.DELAY > -1 and self.LIMIT > -1
@ -85,6 +146,7 @@ class Manager() :
print [channel,header]
message = json.loads(stream)
data = message['data']
def run(self):
#DELAY=35*60 #- 35 Minutes
#LIMIT=1000
@ -99,7 +161,7 @@ class Manager() :
COUNT = 0
else:
break
for agent in self.pool :
for agent in self.agents :
data = agent.composite()
label = agent.getName()
node = '@'.join([label,self.id])

Loading…
Cancel
Save