From 7423f6a9ec18b97235ed034f4326c888d20c9f70 Mon Sep 17 00:00:00 2001 From: steve Date: Tue, 21 Mar 2017 17:52:18 -0600 Subject: [PATCH] Bug fix, with router in transport class and actor hierarchy --- src/utils/agents/actor.py | 24 +++++++++++++++++++++--- src/utils/transport.py | 8 ++++---- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/utils/agents/actor.py b/src/utils/agents/actor.py index 2a4ea6f..736629f 100644 --- a/src/utils/agents/actor.py +++ b/src/utils/agents/actor.py @@ -20,6 +20,10 @@ class Actor(Thread): Thread.__init__(self) self.config = config self.items = [] + self.__id = config['id'] + def getIdentifier(self): + return self.__id + def init(self,litems): self.items = litems def process(self,item): @@ -33,8 +37,18 @@ class Actor(Thread): pass return stream def callback(self,channel,method,header,stream): - print stream - + print [self.getIdentifier(),stream] + #message = None + #try: + #message = json.loads(stream) + #except Exception, e: + #pass + #if message is not None: + #if 'id' in message : + #if 'payload' in message: + #self.execute(message['payload'] + + def run(self): info = {} host = self.config['api'] @@ -68,4 +82,8 @@ class Alert(Actor): config = {"id":"demo","key":"[0v8]-247&7!v3","api":"localhost"} actor = Kill(config) -actor.start() \ No newline at end of file +actor.start() + +config = {"id":"demo-100","key":"[0v8]-247&7!v3","api":"localhost"} +actor_1 = Kill(config) +actor_1.start() \ No newline at end of file diff --git a/src/utils/transport.py b/src/utils/transport.py index 9d7e50d..5cd110c 100644 --- a/src/utils/transport.py +++ b/src/utils/transport.py @@ -410,7 +410,6 @@ class QueueReader(MessageQueue,Reader): pass #self.close() - # r[qid].append( self.data) return self.data @@ -419,10 +418,11 @@ class QueueListener(QueueReader): properties = pika.ConnectionParameters(host=self.host) self.connection = pika.BlockingConnection(properties) self.channel = self.connection.channel() - self.channel.exchange_declare(exchange=self.uid,type='fanout') + self.channel.exchange_declare(exchange=self.uid,type='direct' ) - self.info = self.channel.queue_declare(queue=qid,exclusive=True) - self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue) + self.info = self.channel.queue_declare(exclusive=True,queue=qid) + print self.info.method.queue + self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid) #self.callback = callback def read(self):