消费者

消费者是消息队列中相对于生产者的另一端。Kombu中的消费者可以从ConnectionChannel或者一系列Queue中获取消息。消费者的创建要比生产者复杂一些,并且Kombu还提供了一系列Mixins来辅助消费者的快速创建。Kombu是一个异步事件型的消息处理库,主要通过回调函数来处理传入的消息。

最简单的消费者创建方法是使用kombu.Consumer类,在创建Consumer实例的时候,最少要指定Consumer实例所使用的ConnectionQueue,以及可接受的消息类型(Content-Type)。Kombu自3.0之后,默认只能接受JSON/Binary和纯文本消息,如果需要其他类型的消息,必须显式声明,例如Pickle或者YAML类型。Consumer常用的构造参数主要有以下这些。

  • channel,指定Consumer使用的Connection或者Channel
  • queues,指定Consumer使用的Queue,列表类型。
  • no_ack,是否由库自动确认消息。
  • auto_declare,自动创建相关消息队列元件。
  • callbacks,指定当消息收到时需要按次序调用的回调函数,列表类型。
  • on_message,当收到消息时调用的回调函数,回调函数接受Message实例作为唯一参数,设定后callbacks将会失效。
  • accept,指定可接受的Content Type。
  • on_decode_error,当消息解码失败后的回调函数。

在消费者建立后,可以调用Connection类实例的.drain_events(timeout)方法来等待消息的传入,timeout参数用来设定等待超时时间。或者还可以使用Consumer实例中的.consume()来启动消费者的消息接收。Consumer类实例中常用的方法主要有以下这些。

  • .consume(no_ack),开始Consumer的消息接收。
  • .cancel(),结束Consumer的消息接收。
  • .purge(),删除所有Queue中的消息。
  • .recover(requeue),重新发送所有未确认的消息。

然而在实际项目中,使用Mixins来建立Consumer实例是一个更加方便的选择。Kombu在kombu.mixins包中提供了两个常用的Mixins类供使用:ConsumerMixinConsumerProducerMixin。其中ConsumerMixin主要用于单向接收和处理消息,ConsumerProducerMixin可以用于双向处理消息。先从ConsumerMixin开始,以下是一个最简单的应用ConsumerMixin的消费者。

from kombu import Queue, Exchange
from kombu.mixins import ConsumerMixin


class Worker(ConsumerMixin):
	task_queue = Queue('tasks', Exchange('tasks'), 'tasks')
	
	def __init__(self, connection):
		self.connection = connection
	
	def get_consumers(self, Consumer, channel):
		return [Consumer(
            queues=self.task_queue,
			on_message=self.on_request
        )]
	
	def on_request(self, message):
		# 处理消息
		message.ack()


Worker(connection).run()

ConsumerMixin中提供了许多预置的方法和属性,用于方便处理消息,在继承ConsumerMixin时,可以根据需要进行重载。常用的方法和属性主要有以下这些。

  • .on_connection_error(exception, interval),当连接失败或丢失时的回调。
  • .on_connection_revived(),当连接恢复时的回调。
  • .on_consume_ready(connection, channel, consumers),当消费者准备好处理消息时的回调。
  • .on_consume_end(connection, channel),当消费者调用了.cancel()方法之后的回调。
  • .on_iteration(),当取得事件时每次调用处理函数时的回调。
  • .on_decode_error(message, exception),当消息无法解码时的回调。
  • .get_consumers(Consumer, channel),设定消息来源消费者的列表,是必须重载的方法。
  • .run(),启动消费者。

ConsumerMixin中,处理消息的方法是通过.get_consumers()返回的消费者列表中不同的消费者的回调方法来设定的,具体可参考示例中的书写。

ConsumerProducerMixin则提供了双向通信的功能,允许在一个类中,既可以接收和处理消息,又可以发出消息。与ConsumerMixin提供的预置方法和属性基本一致,但额外提供了.producer属性来提供生产者功能。利用ConsumerProducerMixin可以方便的实现RPC功能。以下给出一个近似RPC功能的示例。

class Worker(ConsumerProducerMixin):
	
	def __init__(self, connection):
		self.connection = connection
	
	def get_consumers(self, Consumer, channel):
		return [Consumer(
            queues=[Queue('foo')],
			on_message=self.handle_message,
			accept='application/json',
		    prefetch_count=10
        )]
	
	def handle_message(self, message):
		self.producer.publish(
			{'message': 'something need returned'},
			exchange='rpc_exchange',
			routing_key=message.properties['reply_to'],
			correlation_id=messsage.properties['correlation_id'],
			retry=True
        )