diff --git a/src/utils/agents/actor.py b/src/utils/agents/actor.py index 736629f..5d005f2 100644 --- a/src/utils/agents/actor.py +++ b/src/utils/agents/actor.py @@ -5,7 +5,8 @@ - Alert : Sends an email or Webhook - Apps : Kill, Start - Folder: Archive, Delete (all, age, size) - + By design we are to understand that a message is structured as follows: + {to,from,content} with content either being an arbitrary stream (or JSON) @TODO: - upgrade to python 3.x """ @@ -14,7 +15,7 @@ from threading import Thread import os import subprocess from monitor import ProcessCounter -from utils.transport import QueueListener +from utils.transport import QueueListener, QueueWriter class Actor(Thread): def __init__(self,config): Thread.__init__(self) @@ -38,6 +39,10 @@ class Actor(Thread): return stream def callback(self,channel,method,header,stream): print [self.getIdentifier(),stream] + message = json.loads(stream) + content = message['content'] + sender = message['from'] + self.post(to=sender,content=content) #message = None #try: #message = json.loads(stream) @@ -47,8 +52,22 @@ class Actor(Thread): #if 'id' in message : #if 'payload' in message: #self.execute(message['payload'] - - + """ + Sending a message to a queue with parameters to,from,content + """ + def post(self,**args): + to = args['to'] + content = args['content'] + message = {"from":self.getIdentifier(),"to":to,"content":content} + host = self.config['api'] + uid = self.config['key'] + qid = to#self.config['id'] + print [host,uid,qid] + qwriter = QueueWriter(host=host,uid=uid,qid=qid) + qwriter.init(qid) + qwriter.write(label=qid,row="got it") + #qwriter.close() + pass def run(self): info = {} host = self.config['api'] diff --git a/src/utils/transport.py b/src/utils/transport.py index 5cd110c..13d08ae 100644 --- a/src/utils/transport.py +++ b/src/utils/transport.py @@ -418,7 +418,7 @@ class QueueListener(QueueReader): properties = pika.ConnectionParameters(host=self.host) self.connection = pika.BlockingConnection(properties) self.channel = self.connection.channel() - self.channel.exchange_declare(exchange=self.uid,type='direct' ) + self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True ) self.info = self.channel.queue_declare(exclusive=True,queue=qid) print self.info.method.queue