You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
data-transport/transport/queue.py

200 lines
5.3 KiB
Python

import pika
from datetime import datetime
import re
import json
import os
from common import Reader, Writer
import json
class MessageQueue:
"""
This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)
:host
def __init__(self,**params):
"""
@param host host
@param uid exchange identifier
@param qid queue identifier
"""
#self.host= params['host']
#self.uid = params['uid']
#self.qid = params['qid']
MessageQueue.__init__(self,**params);
if 'durable' in params :
self.durable = True
else:
self.durable = False
self.size = -1
self.data = {}
def init(self,qid):
properties = pika.ConnectionParameters(host=self.host)
self.connection = pika.BlockingConnection(properties)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True)
self.info = self.channel.queue_declare(queue=qid,durable=True)
def callback(self,channel,method,header,stream):
"""
This is the callback function designed to process the data stream from the queue
"""
r = []
if re.match("^\{|\[",stream) is not None:
r = json.loads(stream)
else:
r = stream
qid = self.info.method.queue
if qid not in self.data :
self.data[qid] = []
self.data[qid].append(r)
#
# We stop reading when the all the messages of the queue are staked
#
if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count:
self.close()
def read(self,size=-1):
"""
This function will read, the first message from a queue
@TODO:
Implement channel.basic_get in order to retrieve a single message at a time
Have the number of messages retrieved be specified by size (parameter)
"""
r = {}
self.size = size
#
# We enabled the reader to be able to read from several queues (sequentially for now)
# The qid parameter will be an array of queues the reader will be reading from
#
if isinstance(self.qid,basestring) :
self.qid = [self.qid]
for qid in self.qid:
self.init(qid)
# r[qid] = []
if self.info.method.message_count > 0:
self.channel.basic_consume(self.callback,queue=qid,no_ack=False);
self.channel.start_consuming()
else:
pass
#self.close()
# r[qid].append( self.data)
return self.data
class QueueListener(QueueReader):
def init(self,qid):
properties = pika.ConnectionParameters(host=self.host)
self.connection = pika.BlockingConnection(properties)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True )
self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid)
self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid)
#self.callback = callback
def read(self):
self.init(self.qid)
self.channel.basic_consume(self.callback,queue=self.qid,no_ack=True);
self.channel.start_consuming()