|
|
@ -301,7 +301,10 @@ class QueueWriter(MessageQueue,Writer):
|
|
|
|
_type = 'text/plain'
|
|
|
|
_type = 'text/plain'
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
stream = json.dumps(object)
|
|
|
|
stream = json.dumps(object)
|
|
|
|
|
|
|
|
if 'type' in params :
|
|
|
|
_type = params['type']
|
|
|
|
_type = params['type']
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
_type = 'application/json'
|
|
|
|
|
|
|
|
|
|
|
|
self.channel.basic_publish(
|
|
|
|
self.channel.basic_publish(
|
|
|
|
exchange=self.uid,
|
|
|
|
exchange=self.uid,
|
|
|
@ -329,15 +332,16 @@ class QueueReader(MessageQueue,Reader):
|
|
|
|
#self.qid = params['qid']
|
|
|
|
#self.qid = params['qid']
|
|
|
|
MessageQueue.__init__(self,**params);
|
|
|
|
MessageQueue.__init__(self,**params);
|
|
|
|
self.size = -1
|
|
|
|
self.size = -1
|
|
|
|
self.data = []
|
|
|
|
self.data = {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def init(self,qid):
|
|
|
|
|
|
|
|
|
|
|
|
def init(self):
|
|
|
|
|
|
|
|
properties = pika.ConnectionParameters(host=self.host)
|
|
|
|
properties = pika.ConnectionParameters(host=self.host)
|
|
|
|
self.connection = pika.BlockingConnection(properties)
|
|
|
|
self.connection = pika.BlockingConnection(properties)
|
|
|
|
self.channel = self.connection.channel()
|
|
|
|
self.channel = self.connection.channel()
|
|
|
|
self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True)
|
|
|
|
self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True)
|
|
|
|
|
|
|
|
|
|
|
|
self.info = self.channel.queue_declare(queue=self.qid,durable=True)
|
|
|
|
self.info = self.channel.queue_declare(queue=qid,durable=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -346,13 +350,19 @@ class QueueReader(MessageQueue,Reader):
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
def callback(self,channel,method,header,stream):
|
|
|
|
def callback(self,channel,method,header,stream):
|
|
|
|
|
|
|
|
r = []
|
|
|
|
if re.match("^\{|\[",stream) is not None:
|
|
|
|
if re.match("^\{|\[",stream) is not None:
|
|
|
|
self.data = json.loads(stream)
|
|
|
|
r = json.loads(stream)
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
|
|
|
|
|
|
|
|
self.data.append(stream)
|
|
|
|
r = stream
|
|
|
|
|
|
|
|
|
|
|
|
if self.size == len(self.data) or len(self.data) == self.info.method.message_count:
|
|
|
|
qid = self.info.method.queue
|
|
|
|
|
|
|
|
if qid not in self.data :
|
|
|
|
|
|
|
|
self.data[qid] = []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.data[qid].append(r)
|
|
|
|
|
|
|
|
if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count:
|
|
|
|
self.close()
|
|
|
|
self.close()
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
"""
|
|
|
@ -362,18 +372,22 @@ class QueueReader(MessageQueue,Reader):
|
|
|
|
Have the number of messages retrieved be specified by size (parameter)
|
|
|
|
Have the number of messages retrieved be specified by size (parameter)
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
def read(self,size=-1):
|
|
|
|
def read(self,size=-1):
|
|
|
|
|
|
|
|
r = {}
|
|
|
|
self.size = size
|
|
|
|
self.size = size
|
|
|
|
self.init()
|
|
|
|
for qid in self.qid:
|
|
|
|
self.data = []
|
|
|
|
self.init(qid)
|
|
|
|
|
|
|
|
# r[qid] = []
|
|
|
|
if self.info.method.message_count > 0:
|
|
|
|
if self.info.method.message_count > 0:
|
|
|
|
|
|
|
|
|
|
|
|
self.channel.basic_consume(self.callback,queue=self.qid,no_ack=False);
|
|
|
|
self.channel.basic_consume(self.callback,queue=qid,no_ack=False);
|
|
|
|
self.channel.start_consuming()
|
|
|
|
self.channel.start_consuming()
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
self.data = []
|
|
|
|
|
|
|
|
self.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
#self.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# r[qid].append( self.data)
|
|
|
|
|
|
|
|
print self.data
|
|
|
|
return self.data
|
|
|
|
return self.data
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|