diff --git a/src/utils/transport.py b/src/utils/transport.py index 3846adc..13d08ae 100644 --- a/src/utils/transport.py +++ b/src/utils/transport.py @@ -345,6 +345,10 @@ class QueueReader(MessageQueue,Reader): #self.uid = params['uid'] #self.qid = params['qid'] MessageQueue.__init__(self,**params); + if 'durable' in params : + self.durable = True + else: + self.durable = False self.size = -1 self.data = {} @@ -406,11 +410,26 @@ class QueueReader(MessageQueue,Reader): pass #self.close() - # r[qid].append( self.data) return self.data - +class QueueListener(QueueReader): + def init(self,qid): + properties = pika.ConnectionParameters(host=self.host) + self.connection = pika.BlockingConnection(properties) + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True ) + + self.info = self.channel.queue_declare(exclusive=True,queue=qid) + print self.info.method.queue + self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid) + #self.callback = callback + def read(self): + + self.init(self.qid) + self.channel.basic_consume(self.callback,queue=self.qid,no_ack=True); + self.channel.start_consuming() + """ This class is designed to write output as sql insert statements The class will inherit from DiskWriter with minor adjustments