From c268a117c291741b916cee6304c031edf0b461d6 Mon Sep 17 00:00:00 2001 From: Steve Nyemba Date: Wed, 5 Feb 2020 15:06:35 -0600 Subject: [PATCH] bug fixes with upgraded version of pika --- setup.py | 2 +- transport/queue.py | 143 ++++++++++++++++++++++++++------------------- 2 files changed, 83 insertions(+), 62 deletions(-) diff --git a/setup.py b/setup.py index 5768b66..b95c6d2 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() args = { "name":"data-transport", - "version":"1.0.8", + "version":"1.0.9", "author":"The Phi Technology LLC","author_email":"info@the-phi.com", "license":"MIT", "packages":["transport"]} diff --git a/transport/queue.py b/transport/queue.py index eccde1e..bfb4acf 100644 --- a/transport/queue.py +++ b/transport/queue.py @@ -25,10 +25,33 @@ class MessageQueue: :qid identifier of the queue """ def __init__(self,**params): - self.host= params['host'] - self.uid = params['xid'] - self.qid = params['qid'] + self.host= 'localhost' if 'host' not in params else params['host'] #-- location of the queue server + self.port= 5672 if 'port' not in params else params['port'] + self.virtual_host = '/' if 'vhost' not in params else params['vhost'] + self.exchange = params['exchange'] if 'exchange' in params else 'amq.direct' #-- exchange + self.queue = params['queue'] + self.connection = None + self.channel = None + + self.credentials= pika.PlainCredentials('guest','guest') + if 'username' in params : + self.credentials = pika.PlainCredentials( + params['username'], + ('' if 'password' not in params else params['password']) + ) + def init(self,label=None): + properties = pika.ConnectionParameters(host=self.host,port=self.port,virtual_host=self.virtual_host,credentials=self.credentials) + self.connection = pika.BlockingConnection(properties) + self.channel = self.connection.channel() + self.info = self.channel.exchange_declare(exchange=self.exchange,exchange_type='direct',durable=True) + if label is None: + self.qhandler = self.channel.queue_declare(queue=self.queue,durable=True) + else: + self.qhandler = self.channel.queue_declare(queue=label,durable=True) + + self.channel.queue_bind(exchange=self.exchange,queue=self.qhandler.method.queue) + def isready(self): #self.init() resp = self.connection is not None and self.connection.is_open @@ -48,22 +71,13 @@ class QueueWriter(MessageQueue,Writer): """ def __init__(self,**params): #self.host= params['host'] - #self.uid = params['uid'] - #self.qid = params['queue'] + #self.exchange = params['uid'] + #self.queue = params['queue'] MessageQueue.__init__(self,**params); + self.init() - def init(self,label=None): - properties = pika.ConnectionParameters(host=self.host) - self.connection = pika.BlockingConnection(properties) - self.channel = self.connection.channel() - self.info = self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True) - if label is None: - self.qhandler = self.channel.queue_declare(queue=self.qid,durable=True) - else: - self.qhandler = self.channel.queue_declare(queue=label,durable=True) - - self.channel.queue_bind(exchange=self.uid,queue=self.qhandler.method.queue) + @@ -72,37 +86,37 @@ class QueueWriter(MessageQueue,Writer): @param object object to be written (will be converted to JSON) @TODO: make this less chatty """ - def write(self,**params): - xchar = None - if 'xchar' in params: - xchar = params['xchar'] - object = self.format(params['row'],xchar) + def write(self,data,_type='text/plain'): + # xchar = None + # if 'xchar' in params: + # xchar = params['xchar'] + # object = self.format(params['row'],xchar) - label = params['label'] - self.init(label) - _mode = 2 - if isinstance(object,str): - stream = object - _type = 'text/plain' - else: - stream = json.dumps(object) - if 'type' in params : - _type = params['type'] - else: - _type = 'application/json' - + # label = params['label'] + # self.init(label) + # _mode = 2 + # if isinstance(object,str): + # stream = object + # _type = 'text/plain' + # else: + # stream = json.dumps(object) + # if 'type' in params : + # _type = params['type'] + # else: + # _type = 'application/json' + stream = json.dumps(data) if isinstance(data,dict) else data self.channel.basic_publish( - exchange=self.uid, - routing_key=label, + exchange=self.exchange, + routing_key=self.queue, body=stream, - properties=pika.BasicProperties(content_type=_type,delivery_mode=_mode) + properties=pika.BasicProperties(content_type=_type,delivery_mode=2) ); - self.close() + # self.close() - def flush(self,label): - self.init(label) + def flush(self): + self.init() _mode = 1 #-- Non persistent - self.channel.queue_delete( queue=label); + self.channel.queue_delete( queue=self.queue); self.close() class QueueReader(MessageQueue,Reader): @@ -119,23 +133,24 @@ class QueueReader(MessageQueue,Reader): """ #self.host= params['host'] - #self.uid = params['uid'] - #self.qid = params['qid'] + #self.exchange = params['uid'] + #self.queue = params['qid'] MessageQueue.__init__(self,**params); + self.init() if 'durable' in params : self.durable = True else: self.durable = False self.size = -1 self.data = {} - def init(self,qid): + # def init(self,qid): - properties = pika.ConnectionParameters(host=self.host) - self.connection = pika.BlockingConnection(properties) - self.channel = self.connection.channel() - self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True) + # properties = pika.ConnectionParameters(host=self.host) + # self.connection = pika.BlockingConnection(properties) + # self.channel = self.connection.channel() + # self.channel.exchange_declare(exchange=self.exchange,type='direct',durable=True) - self.info = self.channel.queue_declare(queue=qid,durable=True) + # self.info = self.channel.queue_declare(queue=qid,durable=True) def callback(self,channel,method,header,stream): @@ -175,9 +190,9 @@ class QueueReader(MessageQueue,Reader): # We enabled the reader to be able to read from several queues (sequentially for now) # The qid parameter will be an array of queues the reader will be reading from # - if isinstance(self.qid,str) : - self.qid = [self.qid] - for qid in self.qid: + if isinstance(self.queue,str) : + self.queue = [self.queue] + for qid in self.queue: self.init(qid) # r[qid] = [] @@ -193,19 +208,25 @@ class QueueReader(MessageQueue,Reader): return self.data class QueueListener(QueueReader): - def init(self,qid): - properties = pika.ConnectionParameters(host=self.host) - self.connection = pika.BlockingConnection(properties) - self.channel = self.connection.channel() - self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True ) + """ + This class is designed to have an active listener (worker) against a specified Exchange/Queue + It is initialized as would any other object and will require a callback function to address the objects returned. + """ + # def init(self,qid): + # properties = pika.ConnectionParameters(host=self.host) + # self.connection = pika.BlockingConnection(properties) + # self.channel = self.connection.channel() + # self.channel.exchange_declare(exchange=self.exchange,type='direct',durable=True ) - self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid) + # self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid) - self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid) + # self.channel.queue_bind(exchange=self.exchange,queue=self.info.method.queue,routing_key=qid) #self.callback = callback + def callback(self,channel,method,header,stream) : + raise Exception("....") def read(self): - self.init(self.qid) - self.channel.basic_consume(self.callback,queue=self.qid,no_ack=True); + self.init(self.queue) + self.channel.basic_consume(self.queue,self.callback,auto_ack=True); self.channel.start_consuming()