From 9509c66973298df98495a0fdf5338c431579a4d7 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Thu, 21 Dec 2017 20:32:36 -0600 Subject: [PATCH] DC: connecting to queue --- src/data-collector.py | 1 + src/utils/agents/actor.py | 97 +++++++++++++++++++++-------- src/utils/agents/manager.py | 119 ++++++++++-------------------------- 3 files changed, 105 insertions(+), 112 deletions(-) diff --git a/src/data-collector.py b/src/data-collector.py index 42dc14f..85cba33 100755 --- a/src/data-collector.py +++ b/src/data-collector.py @@ -84,6 +84,7 @@ class Collector(Thread) : # Initialiing the agents with the parameter values we know of r = [] for agent in _agents : + if agent.getName() in SYS_ARGS : values = SYS_ARGS[agent.getName()] # print (["init ",agent.getName(),values]) diff --git a/src/utils/agents/actor.py b/src/utils/agents/actor.py index d7bd444..ac22912 100755 --- a/src/utils/agents/actor.py +++ b/src/utils/agents/actor.py @@ -55,7 +55,7 @@ class Actor(): def init(self,args): self.config = args - def isValid(self,item): + def isValid(self,**item): return False def execute(self,cmd): @@ -75,47 +75,83 @@ class Apps(Actor) : """ def __init__(self): Actor.__init__(self) - self.ng = None - def init(self,config) : + # self.ng = None + + def isValid(self,**args): + """ + We insure that the provided application exists and that the payload is correct + p validate the payload + q validate the app can be restarted + + @NOTE: killing the application has no preconditions/requirements + """ + params = args['params'] + action = args['action'] + p = len(set(params.keys()) & set(['cmd','label'])) == 2 + q = False + r = action in ['reboot','kill','start'] + if p : + q = os.path.exists(params['cmd']) + return p and q and r + def init(self,action,params) : """ This function will initialize the the actor with applications and associated arguments @param args {"apps_o":"","app_x":params} """ - Actor.init(self,config) - self.ng = ng(self.config.keys()) + self.action = action + self.params = params + # self.ng = ng(self.config.keys()) - def can_start(self,name): - """ - This function is intended to determine if it is possible to boot an application + # def can_start(self,name): + # """ + # This function is intended to determine if it is possible to boot an application - """ - items = self.ng.search(name) if self.ng is not None else [] - if len(items) == 0 : - return False - else: - return items[0][1] > 0.01 + # """ + # items = self.ng.search(name) if self.ng is not None else [] + # if len(items) == 0 : + # return False + # else: + # return items[0][1] > 0.01 - def startup(self,name) : + def startup(self,cmd) : """ This function is intended to start a program given the configuration """ - items = self.ng.search(name)[0] - app = items[0] - args = self.config[app] - - cmd = " ".join([app,args,"&" ]) - self.execute([app,args]) + # items = self.ng.search(name)[0] + # app = items[0] + # args = self.config[app] + try: + os.system(cmd +" &") + except Exception, e: + print e def kill(self,name) : """ kill processes given the name, The function will not be case sensitive and partial names are accepted @NOTE: Make sure the reference to the app is not ambiguous """ - args = "".join(['ps -eo pid,command|grep -E -i "',name.lower(),'"|grep -E "^ {0,}[0-9]+" -o|xargs kill -9']) - #self.execute([args]) - subprocess.call([args],shell=True) - + try: + args = "".join(['ps -eo pid,command|grep -E -i "',name.lower(),'"|grep -E "^ {0,}[0-9]+" -o|xargs kill -9']) + #self.execute([args]) + subprocess.call([args],shell=True) + except Exception,e: + print e + def run(self): + __action = str(self.action) + __params = dict(self.params) + if self.action == 'reboot' : + def pointer(): + self.kill(__params['label']) + self.startup(__params['cmd']) + else: + def pointer() : + self.startup(__params['cmd']) + print __action,__params + print pointer + pointer() + # thread = Thread(target=pointer) + # thread.start() def analyze(self,logs) : """ This function is designed to analyze a few logs and take appropriate action @@ -186,7 +222,16 @@ class Folders(Actor): #self.action = args['action'] #{clear,archive} config['actions']['folders'] self.threshold = self.get_size( args['threshold']) #self.config['threshold']) - + def isValid(self,**args): + action = args['action'] + params = args['params'] + p = action in ['clean','archive','backup'] + q = False + r = False + if p : + q = params.keys()[0] in ['label','folder'] + r = os.path.exists(params.values[0]) + return p and q and r def archive(self,item): """ This function will archive all files in a given folder diff --git a/src/utils/agents/manager.py b/src/utils/agents/manager.py index 7ab03f8..4aa7970 100755 --- a/src/utils/agents/manager.py +++ b/src/utils/agents/manager.py @@ -38,7 +38,13 @@ class Manager() : self.DELAY = int(self.plan['metadata']['delay']) self.host = args['host'] self.update() #-- Initializing status information - + _args={"host":"dev.the-phi.com","qid":self.id,"uid":self.key} + # + # Connecting to the messaging service + self.qlistener = self.factory.instance(type="QueueListener",args=_args) + self.qlistener.callback = self.callback + self.qlistener.init(self.id) + self.qlistener.read() def update(self) : """ This method inspect the plans for the current account and makes sure it can/should proceed @@ -64,7 +70,9 @@ class Manager() : self.LIMIT = -1 #self.filter(meta) - + # + # We are removing all that is not necessary i.e making sure the features matches the plan user has paid for + # self.agents = self.filter('agents',meta,self.agents) self.actors = self.filter('actors',meta,self.actors) self.setup(meta) @@ -89,60 +97,14 @@ class Manager() : def filter(self,id,meta,objects): values = meta[id].replace(' ','').split(',') return [item for item in objects if item.getName() in values] - def __filter(self,meta) : - scope = [] - lactors= [] - for item in meta : - scope = scope + item['scope'].split(',') - if 'actors' in item : - lactors= lactors + item['actors'].split(',') - self.agents = [agent for agent in self.agents if agent.getName() in scope] - if len(lactors) == 0 : - self.actors = [] - self.actors = [ actor for actor in self.actors if actor.getIdentifier() in lactors] - if len(self.actors) > 0 : - # - # We should configure the actors accordingly and make sure they are operational - # - - conf = {"apps":None} - # - # We need to get the configuration for the apps remotely - # - read_class = self.config['store']['class']['read'] - read_args = self.config['store']['args'] - couchdb = self.factory.instance(type=read_class,args=read_args) - uinfo = couchdb.view('config/apps',key=self.key) - if 'apps' in uinfo : - conf['apps'] = uinfo['apps'] - # - # Threshold will always give a default value - # - info = couchdb.view('config/folders',key=self.key) - threshold = info - conf['folder_threshold'] = threshold - - mailer = None - for actor in self.actors : - id = actor.getIdentifier() - if id == "mailer" : - mailer = actor.Mailer() - if conf[id] is None : - continue - args = conf[id] - actor.init(args) - # - # Initializing the mailer - if mailer is not None and mailer in self.config: - mailer.init(self.config['mailer']) - - return meta + def setup(self,meta) : conf = {"folders":None,"apps":None} read_class = self.config['store']['class']['read'] read_args = self.config['store']['args'] + args = None couchdb = self.factory.instance(type=read_class,args=read_args) args = couchdb.view('config/apps',key=self.key) if len(args.keys()) > 0 : @@ -157,41 +119,7 @@ class Manager() : for actor in self.actors : if args is not None and actor.getName() == name and len(args.keys()) > 0: actor.init(args) - - - def __setup(self,meta): - # - # We should configure the actors accordingly and make sure they are operational - # - conf = {"folders":meta['folder_threshold'],"apps":None} - # - # We need to get the configuration for the apps remotely - # - read_class = self.config['store']['class']['read'] - read_args = self.config['store']['args'] - - couchdb = self.factory.instance(type=read_class,args=read_args) - uinfo = couchdb.view('config/apps',key=self.key) - if 'apps' in uinfo : - conf['apps'] = uinfo['apps'] - for agent in self.agents : - agent.init(conf['apps']) - - mailer = None - for actor in self.actors : - id = actor.getIdentifier() - if id == "mailer" : - mailer = actor.Mailer() - if conf[id] is None : - continue - args = conf[id] - actor.init(args) - # - # Initializing the mailer - if mailer is not None and mailer in self.config: - mailer.init(self.config['mailer']) - def isvalid(self): self.update() return self.DELAY > -1 and self.LIMIT > -1 @@ -210,10 +138,25 @@ class Manager() : This function enables the manager to be able to receive messages and delegate them to the appropriate actor @channel """ - print [channel,header] message = json.loads(stream) - data = message['data'] + # + # we should inspect the message and insure it has reached the appropriate recepient + # + if 'node' in message and message['node'] == self.id : + action = message['action'] + params = message['params'] + self.delegate(action,params) + + def delegate(self,action,params): + for actor in self.actors : + + if actor.isValid(action=action,params=params) : + + actor.init(action,params) + actor.run() + break + pass def run(self): #DELAY=35*60 #- 35 Minutes #LIMIT=1000 @@ -240,6 +183,10 @@ class Manager() : else: #label = id row = data + # + # @TODO: + # This data should be marked if it has been flagged for reboot + # if type(row)==list and len(row) == 0 : continue