From 024ee9550f57d8d2dbe48c786abdd3345106b324 Mon Sep 17 00:00:00 2001 From: "Steve L. Nyemba" Date: Mon, 4 Sep 2017 21:57:33 -0500 Subject: [PATCH] CO - Bug found and fixed --- src/utils/agents/actor.py | 187 +------------------------------------- 1 file changed, 2 insertions(+), 185 deletions(-) diff --git a/src/utils/agents/actor.py b/src/utils/agents/actor.py index ccfd59d..7e2c85e 100644 --- a/src/utils/agents/actor.py +++ b/src/utils/agents/actor.py @@ -49,10 +49,8 @@ class Actor(): def getName(self): return self.__class__.__name__.lower() - def getIdentifier(self): - return self.__class__.__name__.lower() - - + # def getIdentifier(self): + # return self.__class__.__name__.lower() def init(self,args): self.config = args @@ -254,184 +252,3 @@ class Folders(Actor): id = self.config['action'].strip() pointer = r[id] pointer (item) - - - -# class Kill(Actor): - -# def isValid(self,item): -# return (item is not None) and (item in self.config) -# def process(self,item): -# args = "".join(["-eo pid,command|grep ",item,'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9']) -# self.execute(["ps",args]) -# # -# # We need to make sure we can get assess the process on this server -# # - -# class Start(Actor): -# def __init__(self): -# Actor.__init__(self) -# self.ng = None - -# def init(self,config,item): -# Actor.init(self,config,item) -# self.config = config['apps'] -# self.ng = ng(self.config.keys()) - -# def isValid(self,name): -# items = self.ng.search(name) -# if len(items) == 0 : -# return False -# else: -# return items[0][1] > 0.1 - -# def process(self,row): -# name = row['label'] -# items = self.ng.search(name)[0] -# app = items[0] - - -# args = self.config[app] -# cmd = " ".join([app,args]) - -# self.execute([app,args]) -# """ -# This class is designed to handle applications i.e start/stopping applications -# @TODO: Assess if a reboot is required, by looking at the variance/anomaly detection -# """ -# class Apps(Actor): -# def __init__(self): -# Actor.__init__(self) -# self.crashes = [] -# self.running = [] - -# def isValid(self,rows): -# status = [row['status'] for row in rows] -# return 'crash' in status - -# def classify(self,rows): -# self.crashes = [] -# self.running = [] -# for row in rows: -# if row['status'] == 'crash' : -# self.crashes.append(row) -# else: -# self.running.append(row) -# def reboot(self): -# for row_run in self.running: -# pass -# def start(self): -# for row_crash in self.crashes: -# thread = Start() -# thread.init(self.config,row_crash) -# thread.daemon = True -# thread.start() - -# def process(self,rows): -# self.classify(rows) -# if self.crashes : -# self.start() -# if self.running: -# self.reboot() - - - -class Event(Thread): - def __init__(self,config): - pass - def run(self): - """ - The orchestrator class is designed to aggregate actions and communicate back to the caller - Mesage passing is structured as follows {from,to,content} The content is designed to be understood by the actor - - The orchestrator is implemented using a simple iterator design-pattern - @TODO: action specifications should be provided remotely - """ - pass -class Orchestrator(Actor): - - def __init__(self,config=None): - Actor.__init__(self) - # if config is None: - # f = open(PARAMS['path']) - # config = json.loads(f.read()) - # f.close() - self.config = config - Actor.__init__(self) - - self.actors = {"apps":Apps(),"folders":Folders()} - self.is_master_node = False - self.items = [] - # - # If the configuration only has id,key then this is NOT the maestro - # - host = config['api'] - qid = config['id'] - print "Initialized ***** ",self.getIdentifier(), " as ",config['id'] - - # - # This object will have to request for the configuration - # - #for id in config['actions'] : - #conf = config['actions'][id] - #item = eval("".join([id,"(",json.dumps(conf),")"])) - #self.actors[id.lower()] = item - """ - This function is designed to provide the orchestrator a configuration - @pre - """ - def init(self,config): - - for id in config: - - setup_info = config[id] - item = eval("".join([id,"(",json.dumps(setup_info),")"])) - self.actors[id.lower()] = item - - def callback(self,channel,method,header,stream): - - message = json.loads(stream) - if 'content' in message : - content = message['content'] - print self.actors - to = message['to'] - if isinstance(content,basestring) and content.lower() in ['quit'] or to=='quit': - if content.lower() == 'quit' or to == 'quit': - print '**** closing ',self.getIdentifier() - channel.close() - else: - - id = to.lower() - actor = self.actors[id] - - if actor is not None and actor.isValid(content) : - actor.init(self.config['actions']) - actor.process(content) - else: - content = {"status":"invalid","content":content} - - #self.post(to=sender,content=content) - - def run(self): - - info = {} - host = self.config['api'] - uid = self.config['key'] - qid = self.config['id'] - - qlistener = QueueListener(qid=qid,uid=uid,host=host) - qlistener.callback = self.callback - qlistener.read() - r = [self.process(item) for item in self.items] - -""" - This class is designed to send a message to a given AMQP enpoint - The AMQP endpoint is implemented by QueueWriter class -""" -# class Alert(Actor): -# def process(self,item): -# pass -conf = {"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome":"https://the-phi.com"} -a = Actor.instance('Apps',conf) -if __name__ == '__main__': - print dir(a)