From 7fc6210ac21d9b39ba09b121e147d7213e1281ad Mon Sep 17 00:00:00 2001 From: steve Date: Mon, 3 Apr 2017 15:07:59 -0500 Subject: [PATCH] Bug fix with actors and remote initialization --- src/utils/agents/actor.py | 110 ++++++++++++++++++++++++++++---------- src/utils/transport.py | 10 ++-- 2 files changed, 90 insertions(+), 30 deletions(-) diff --git a/src/utils/agents/actor.py b/src/utils/agents/actor.py index 04677bb..b9a5183 100644 --- a/src/utils/agents/actor.py +++ b/src/utils/agents/actor.py @@ -15,7 +15,7 @@ from threading import Thread import os import subprocess from monitor import ProcessCounter -from utils.transport import QueueListener, QueueWriter +from utils.transport import QueueListener, QueueWriter, QueueReader class Actor: def __init__(self,config): self.config = config @@ -56,6 +56,7 @@ class Actor: class Kill(Actor): def isValid(self,item): + print self.config return (item is not None) and (item in self.config) def process(self,item): cmd = "".join(["ps -eo pid,command|grep ",item,'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9']) @@ -67,7 +68,7 @@ class Start(Actor): def __init__(self,config): Actor.__init__(self,config) def isValid(self,name): - return name in self.config: + return name in self.config def process(self,name): item = self.config[name] @@ -87,43 +88,87 @@ class Orchestrator(Actor,Thread): Thread.__init__(self) Actor.__init__(self,config) self.actors = {} - for id in config['actions'] : - conf = config['actions'][id] - item = eval("".join([id,"(",json.dumps(conf),")"])) + self.is_master_node = False + self.items = [] + # + # If the configuration only has id,key then this is NOT the maestro + # + host = config['api'] + qid = config['id'] + if 'actions' in config : + # + # We are to assume this is the maestro/main node, and it will make the configuration available to other nodes + # NOTE: This is has a predefined master thus is subject to known short comings (zombies) + # + + q = QueueWriter(host=host,uid=config['key'],qid=qid) + q.flush(config['id']) + q.write(label=qid,row=json.dumps(config['actions'])) + q.close() + self.is_master_node = True + + + else: + qid = config['master'] + q = QueueReader(host=host,uid=config['key'],qid=qid) + r = q.read() + q.close() + info = r[qid][0] + self.init(info) + print "Initialized ***** ",self.getIdentifier(), " as ",config['id'] + + # + # This object will have to request for the configuration + # + #for id in config['actions'] : + #conf = config['actions'][id] + #item = eval("".join([id,"(",json.dumps(conf),")"])) + #self.actors[id.lower()] = item + """ + This function is designed to provide the orchestrator a configuration + @pre + """ + def init(self,config): + + for id in config: + + setup_info = config[id] + item = eval("".join([id,"(",json.dumps(setup_info),")"])) self.actors[id.lower()] = item - def callback(self,channel,method,header,stream): message = json.loads(stream) - content = message['content'] - sender = message['from'] - to = message['to'] - if content.lower() == 'quit' or to == 'quit': - channel.close() - print " *** ",self.getIdentifier() - elif content.lower() == 'ping' or to == 'ping': - self.post(to=sender,content="1") - else: - id = to.lower() - actor = self.actors[id] - if actor.isValid(content) : - actor.process(content) + if 'content' in message : + content = message['content'] + sender = message['from'] + to = message['to'] + if content.lower() == 'quit' or to == 'quit': + channel.close() + + elif content.lower() == 'ping' or to == 'ping': + self.post(to=sender,content="1") else: - content = {"status":"invalid","content":content} - - self.post(to=sender,content=content) + id = to.lower() + actor = self.actors[id] + print "\tPerforming ",actor.getIdentifier()," on ",content," status ",actor.isValid(content) + if actor.isValid(content) : + actor.process(content) + else: + content = {"status":"invalid","content":content} + + self.post(to=sender,content=content) def run(self): info = {} host = self.config['api'] uid = self.config['key'] qid = self.config['id'] - - qlistener = QueueListener(qid=qid,uid=uid,host=host) - qlistener.callback = self.callback - qlistener.read() - #r = [self.process(item) for item in self.items] + if not self.is_master_node : + qlistener = QueueListener(qid=qid,uid=uid,host=host) + qlistener.callback = self.callback + qlistener.read() + r = [self.process(item) for item in self.items] """ This class is designed to send a message to a given AMQP enpoint @@ -144,8 +189,19 @@ config = { } +#thread = Orchestrator(config) +#thread.start() + +config = { + "master":"demo-000", + "id":"demo-001", + "key":"[0v8]-247&7!v3","api":"localhost" + + + } thread = Orchestrator(config) thread.start() + #config = {"id":"demo","key":"[0v8]-247&7!v3","api":"localhost"} #actor = Kill(config) #actor.start() diff --git a/src/utils/transport.py b/src/utils/transport.py index 13d08ae..13b6406 100644 --- a/src/utils/transport.py +++ b/src/utils/transport.py @@ -261,6 +261,7 @@ class MessageQueue: self.close() return resp def close(self): + if self.connection.is_closed == False : self.channel.close() self.connection.close() """ @@ -351,7 +352,6 @@ class QueueReader(MessageQueue,Reader): self.durable = False self.size = -1 self.data = {} - def init(self,qid): properties = pika.ConnectionParameters(host=self.host) @@ -368,6 +368,7 @@ class QueueReader(MessageQueue,Reader): """ def callback(self,channel,method,header,stream): + r = [] if re.match("^\{|\[",stream) is not None: r = json.loads(stream) @@ -399,9 +400,12 @@ class QueueReader(MessageQueue,Reader): # We enabled the reader to be able to read from several queues (sequentially for now) # The qid parameter will be an array of queues the reader will be reading from # + if isinstance(self.qid,basestring) : + self.qid = [self.qid] for qid in self.qid: self.init(qid) # r[qid] = [] + if self.info.method.message_count > 0: self.channel.basic_consume(self.callback,queue=qid,no_ack=False); @@ -420,8 +424,8 @@ class QueueListener(QueueReader): self.channel = self.connection.channel() self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True ) - self.info = self.channel.queue_declare(exclusive=True,queue=qid) - print self.info.method.queue + self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid) + self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid) #self.callback = callback def read(self):