From a16b969b699f402eb32636a25bbb94c0e96cf19f Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Wed, 5 Feb 2020 18:14:30 -0600 Subject: [PATCH] bug fix & aliasing --- transport/queue.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/transport/queue.py b/transport/queue.py index bfb4acf..e16159a 100644 --- a/transport/queue.py +++ b/transport/queue.py @@ -33,6 +33,9 @@ class MessageQueue: self.connection = None self.channel = None + self.name = self.__class__.__name__.lower() if 'name' not in params else 'wtf' + + self.credentials= pika.PlainCredentials('guest','guest') if 'username' in params : self.credentials = pika.PlainCredentials( @@ -57,6 +60,8 @@ class MessageQueue: resp = self.connection is not None and self.connection.is_open self.close() return resp + def finalize(self): + pass def close(self): if self.connection.is_closed == False : self.channel.close() @@ -81,12 +86,13 @@ class QueueWriter(MessageQueue,Writer): - """ + + def write(self,data,_type='text/plain'): + """ This function writes a stream of data to the a given queue @param object object to be written (will be converted to JSON) @TODO: make this less chatty - """ - def write(self,data,_type='text/plain'): + """ # xchar = None # if 'xchar' in params: # xchar = params['xchar'] @@ -207,11 +213,14 @@ class QueueReader(MessageQueue,Reader): # r[qid].append( self.data) return self.data -class QueueListener(QueueReader): +class QueueListener(MessageQueue): """ This class is designed to have an active listener (worker) against a specified Exchange/Queue It is initialized as would any other object and will require a callback function to address the objects returned. """ + def __init__(self,**args): + MessageQueue.__init__(self,**args) + self.listen = self.read # def init(self,qid): # properties = pika.ConnectionParameters(host=self.host) # self.connection = pika.BlockingConnection(properties) @@ -222,11 +231,18 @@ class QueueListener(QueueReader): # self.channel.queue_bind(exchange=self.exchange,queue=self.info.method.queue,routing_key=qid) #self.callback = callback + + def finalize(self,channel,ExceptionReason): + pass + def callback(self,channel,method,header,stream) : raise Exception("....") def read(self): self.init(self.queue) + self.channel.basic_consume(self.queue,self.callback,auto_ack=True); self.channel.start_consuming() + +