You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			272 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			272 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Python
		
	
"""
 | 
						|
Data Transport - 1.0
 | 
						|
Steve L. Nyemba, The Phi Technology LLC
 | 
						|
 | 
						|
This file is a wrapper around rabbitmq server for reading and writing content to a queue (exchange)
 | 
						|
 | 
						|
"""
 | 
						|
import pika
 | 
						|
from datetime import datetime
 | 
						|
import re
 | 
						|
import json
 | 
						|
import os
 | 
						|
import sys
 | 
						|
# if sys.version_info[0] > 2 :
 | 
						|
# 	from transport.common import Reader, Writer
 | 
						|
# else:
 | 
						|
# 	from common import Reader, Writer
 | 
						|
import json
 | 
						|
from multiprocessing import RLock
 | 
						|
class MessageQueue:
 | 
						|
	"""
 | 
						|
		This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)
 | 
						|
		:host	
 | 
						|
		:xid	identifier of the exchange
 | 
						|
		:qid	identifier of the queue
 | 
						|
	"""
 | 
						|
	def __init__(self,**params):
 | 
						|
		self.host= 'localhost' if 'host' not in params else params['host']	#-- location of the queue server
 | 
						|
		self.port= 5672 if 'port' not in params else params['port']
 | 
						|
		self.virtual_host = '/' if 'vhost' not in params else params['vhost']
 | 
						|
		self.exchange = params['exchange']	if 'exchange' in params else 'amq.direct' #-- exchange
 | 
						|
		self.queue = params['queue'] if 'queue' in params else 'demo'	
 | 
						|
		self.connection = None
 | 
						|
		self.channel 	= None
 | 
						|
		
 | 
						|
		self.name 	= self.__class__.__name__.lower() if 'name' not in params else params['name']
 | 
						|
		
 | 
						|
		username = password = None
 | 
						|
		if 'username' in params :
 | 
						|
			username = params['username']
 | 
						|
			password = params['password']
 | 
						|
		if 'auth_file' in params :
 | 
						|
			_info = json.loads((open(params['auth_file'])).read())
 | 
						|
			username=_info['username']
 | 
						|
			password=_info['password']
 | 
						|
			self.virtual_host = _info['virtual_host'] if 'virtual_host' in _info else self.virtual_host
 | 
						|
			self.exchange = _info['exchange'] if 'exchange' in _info else self.exchange
 | 
						|
			self.queue = _info['queue'] if 'queue' in _info else self.queue
 | 
						|
 | 
						|
		self.credentials= pika.PlainCredentials('guest','guest')
 | 
						|
		if 'username' in params :
 | 
						|
			self.credentials = pika.PlainCredentials(
 | 
						|
				params['username'],
 | 
						|
				('' if 'password' not in params else params['password'])
 | 
						|
			)
 | 
						|
	
 | 
						|
	def init(self,label=None):
 | 
						|
		properties = pika.ConnectionParameters(host=self.host,port=self.port,virtual_host=self.virtual_host,
 | 
						|
		client_properties={'connection_name':self.name},
 | 
						|
		credentials=self.credentials)
 | 
						|
		self.connection = pika.BlockingConnection(properties)
 | 
						|
		self.channel	= self.connection.channel()
 | 
						|
		self.info = self.channel.exchange_declare(exchange=self.exchange,exchange_type='direct',durable=True)
 | 
						|
		if label is None:
 | 
						|
			self.qhandler = self.channel.queue_declare(queue=self.queue,durable=True)	
 | 
						|
		else:
 | 
						|
			self.qhandler = self.channel.queue_declare(queue=label,durable=True)
 | 
						|
		
 | 
						|
		self.channel.queue_bind(exchange=self.exchange,queue=self.qhandler.method.queue) 
 | 
						|
 | 
						|
	def isready(self):
 | 
						|
		#self.init()
 | 
						|
		resp =  self.connection is not None and self.connection.is_open
 | 
						|
		# self.close()
 | 
						|
		return resp
 | 
						|
	def finalize(self):
 | 
						|
		pass
 | 
						|
	def close(self):
 | 
						|
		if self.connection.is_closed == False :
 | 
						|
			self.channel.close()
 | 
						|
			self.connection.close()
 | 
						|
 | 
						|
class Writer(MessageQueue):
 | 
						|
	"""
 | 
						|
		This class is designed to publish content to an AMQP (Rabbitmq)
 | 
						|
		The class will rely on pika to implement this functionality
 | 
						|
 | 
						|
		We will publish information to a given queue for a given exchange
 | 
						|
	"""
 | 
						|
	def __init__(self,**params):
 | 
						|
		#self.host= params['host']
 | 
						|
		#self.exchange = params['uid']
 | 
						|
		#self.queue = params['queue']
 | 
						|
		MessageQueue.__init__(self,**params);
 | 
						|
		self.init()
 | 
						|
	def write(self,data,_type='text/plain'):
 | 
						|
		"""
 | 
						|
		This function writes a stream of data to the a given queue
 | 
						|
		@param object	object to be written (will be converted to JSON)
 | 
						|
		@TODO: make this less chatty
 | 
						|
		"""
 | 
						|
	
 | 
						|
		stream = json.dumps(data) if isinstance(data,dict) else data
 | 
						|
		self.channel.basic_publish(
 | 
						|
			exchange=self.exchange,
 | 
						|
			routing_key=self.queue,
 | 
						|
			body=stream,
 | 
						|
			properties=pika.BasicProperties(content_type=_type,delivery_mode=2)
 | 
						|
		);
 | 
						|
		# self.close()
 | 
						|
 | 
						|
	def flush(self):
 | 
						|
		self.init()
 | 
						|
		_mode = 1  #-- Non persistent
 | 
						|
		self.channel.queue_delete( queue=self.queue);
 | 
						|
		self.close()
 | 
						|
		
 | 
						|
class Reader(MessageQueue):
 | 
						|
	"""
 | 
						|
	This class will read from a queue provided an exchange, queue and host
 | 
						|
	@TODO: Account for security and virtualhosts
 | 
						|
	"""
 | 
						|
 | 
						|
	def __init__(self,**params):
 | 
						|
		"""
 | 
						|
			@param	host	host
 | 
						|
			@param	uid	exchange identifier
 | 
						|
			@param	qid	queue identifier
 | 
						|
		"""
 | 
						|
 | 
						|
		#self.host= params['host']
 | 
						|
		#self.exchange = params['uid']
 | 
						|
		#self.queue = params['qid']
 | 
						|
		MessageQueue.__init__(self,**params);
 | 
						|
		# self.init()
 | 
						|
		self.durable = False if 'durable' not in params else params['durable']
 | 
						|
		# if 'durable' in params :
 | 
						|
		# 	self.durable = True
 | 
						|
		# else:
 | 
						|
		# 	self.durable = False
 | 
						|
		self.size = -1
 | 
						|
		self.data = {}
 | 
						|
	# def init(self,qid):
 | 
						|
		
 | 
						|
	# 	properties = pika.ConnectionParameters(host=self.host)
 | 
						|
	# 	self.connection = pika.BlockingConnection(properties)
 | 
						|
	# 	self.channel	= self.connection.channel()
 | 
						|
	# 	self.channel.exchange_declare(exchange=self.exchange,type='direct',durable=True)
 | 
						|
 | 
						|
	# 	self.info = self.channel.queue_declare(queue=qid,durable=True)
 | 
						|
	
 | 
						|
 | 
						|
	def callback(self,channel,method,header,stream):
 | 
						|
		"""
 | 
						|
			This is the callback function designed to process the data stream from the queue
 | 
						|
 | 
						|
		"""
 | 
						|
				
 | 
						|
		r = []
 | 
						|
		# if re.match("^\{|\[",stream) is not None:
 | 
						|
		if stream.startswith(b'{') or stream.startswith(b'['): 
 | 
						|
			r = json.loads(stream)
 | 
						|
		else:
 | 
						|
			
 | 
						|
			r = stream
 | 
						|
		
 | 
						|
		qid = self.qhandler.method.queue
 | 
						|
		if qid not in self.data :
 | 
						|
			self.data[qid] = []
 | 
						|
		
 | 
						|
		self.data[qid].append(r)
 | 
						|
		#
 | 
						|
		# We stop reading when the all the messages of the queue are staked
 | 
						|
		#
 | 
						|
		if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count:		
 | 
						|
			self.close()
 | 
						|
 | 
						|
	def read(self,**args):
 | 
						|
		"""
 | 
						|
		This function will read, the first message from a queue
 | 
						|
		@TODO: 
 | 
						|
			Implement channel.basic_get in order to retrieve a single message at a time
 | 
						|
			Have the number of messages retrieved be specified by size (parameter)
 | 
						|
		"""
 | 
						|
		r = {}
 | 
						|
		self.size = -1 if 'size' in args else int(args['size'])
 | 
						|
		#
 | 
						|
		# We enabled the reader to be able to read from several queues (sequentially for now)
 | 
						|
		# The qid parameter will be an array of queues the reader will be reading from
 | 
						|
		#
 | 
						|
		if isinstance(self.queue,str) :
 | 
						|
					self.queue = [self.queue]
 | 
						|
		
 | 
						|
		for qid in self.queue:
 | 
						|
			self.init(qid)
 | 
						|
			# r[qid] = []
 | 
						|
			
 | 
						|
			if self.qhandler.method.message_count > 0:
 | 
						|
				
 | 
						|
				self.channel.basic_consume(queue=qid,on_message_callback=self.callback,auto_ack=False);
 | 
						|
				self.channel.start_consuming()
 | 
						|
			else:
 | 
						|
				
 | 
						|
				pass
 | 
						|
				#self.close()
 | 
						|
			# r[qid].append( self.data)
 | 
						|
 | 
						|
		return self.data
 | 
						|
class QueueListener(MessageQueue):
 | 
						|
	lock = RLock()
 | 
						|
	"""
 | 
						|
	This class is designed to have an active listener (worker) against a specified Exchange/Queue
 | 
						|
	It is initialized as would any other object and will require a callback function to address the objects returned.
 | 
						|
	"""
 | 
						|
	def __init__(self,**args):
 | 
						|
		MessageQueue.__init__(self,**args)
 | 
						|
		self.listen = self.read
 | 
						|
		self.apply = args['apply'] if 'apply' in args else print
 | 
						|
		self.lock = False if 'lock' not in args else args['lock']
 | 
						|
	
 | 
						|
	def finalize(self,channel,ExceptionReason):
 | 
						|
		pass
 | 
						|
	
 | 
						|
	def callback(self,channel,method,header,stream) :
 | 
						|
		_info= {}
 | 
						|
		# if re.match("^\{|\[",stream) is not None:
 | 
						|
		
 | 
						|
 | 
						|
		if stream.startswith(b"[") or stream.startswith(b"{"):
 | 
						|
			_info = json.loads(stream)
 | 
						|
		else:
 | 
						|
			
 | 
						|
			_info = stream
 | 
						|
		#
 | 
						|
		# At this point we should invoke the apply function  with a lock if need be
 | 
						|
		# @TODO: Establish a vocabulary 
 | 
						|
		
 | 
						|
		if stream == b'QUIT' :
 | 
						|
			# channel.exit()
 | 
						|
			self.close()
 | 
						|
		if self.lock == True :
 | 
						|
			QueueListener.lock.acquire()
 | 
						|
		try:
 | 
						|
			#
 | 
						|
			# In case the user has not specified a function to apply the data against, it will simply be printed
 | 
						|
			#
 | 
						|
			self.apply(_info)
 | 
						|
		except Exception as e:
 | 
						|
			pass
 | 
						|
		if self.lock == True :
 | 
						|
			QueueListener.lock.release()
 | 
						|
	def read(self):
 | 
						|
    	
 | 
						|
		self.init(self.queue)
 | 
						|
		
 | 
						|
		self.channel.basic_consume(self.queue,self.callback,auto_ack=True);
 | 
						|
		self.channel.start_consuming()
 | 
						|
		
 | 
						|
		
 | 
						|
 
 | 
						|
class Factory :
 | 
						|
	@staticmethod
 | 
						|
	def instance(**_args):
 | 
						|
		"""
 | 
						|
		:param count	number of workers
 | 
						|
		:param apply	function workers		
 | 
						|
		"""
 | 
						|
		_apply = _args['apply']
 | 
						|
		_count = _args['count']
 | 
						|
		for i in np.arange(_count) :
 | 
						|
			_name = _args['name'] if 'name' in _args else 'worker_'+str(i)
 | 
						|
		transport.factory.instance(provider="rabbit",context="listener",apply=_apply,auth_file=_args['auth_file']) |