消费者
消费者是消息队列中相对于生产者的另一端。Kombu中的消费者可以从Connection、Channel或者一系列Queue中获取消息。消费者的创建要比生产者复杂一些,并且Kombu还提供了一系列Mixins来辅助消费者的快速创建。Kombu是一个异步事件型的消息处理库,主要通过回调函数来处理传入的消息。
最简单的消费者创建方法是使用kombu.Consumer类,在创建Consumer实例的时候,最少要指定Consumer实例所使用的Connection和Queue,以及可接受的消息类型(Content-Type)。Kombu自3.0之后,默认只能接受JSON/Binary和纯文本消息,如果需要其他类型的消息,必须显式声明,例如Pickle或者YAML类型。Consumer常用的构造参数主要有以下这些。
channel,指定Consumer使用的Connection或者Channel。queues,指定Consumer使用的Queue,列表类型。no_ack,是否由库自动确认消息。auto_declare,自动创建相关消息队列元件。callbacks,指定当消息收到时需要按次序调用的回调函数,列表类型。on_message,当收到消息时调用的回调函数,回调函数接受Message实例作为唯一参数,设定后callbacks将会失效。accept,指定可接受的Content Type。on_decode_error,当消息解码失败后的回调函数。
在消费者建立后,可以调用Connection类实例的.drain_events(timeout)方法来等待消息的传入,timeout参数用来设定等待超时时间。或者还可以使用Consumer实例中的.consume()来启动消费者的消息接收。Consumer类实例中常用的方法主要有以下这些。
.consume(no_ack),开始Consumer的消息接收。.cancel(),结束Consumer的消息接收。.purge(),删除所有Queue中的消息。.recover(requeue),重新发送所有未确认的消息。
然而在实际项目中,使用Mixins来建立Consumer实例是一个更加方便的选择。Kombu在kombu.mixins包中提供了两个常用的Mixins类供使用:ConsumerMixin和ConsumerProducerMixin。其中ConsumerMixin主要用于单向接收和处理消息,ConsumerProducerMixin可以用于双向处理消息。先从ConsumerMixin开始,以下是一个最简单的应用ConsumerMixin的消费者。
from kombu import Queue, Exchange
from kombu.mixins import ConsumerMixin
class Worker(ConsumerMixin):
task_queue = Queue('tasks', Exchange('tasks'), 'tasks')
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(
queues=self.task_queue,
on_message=self.on_request
)]
def on_request(self, message):
# 处理消息
message.ack()
Worker(connection).run()
ConsumerMixin中提供了许多预置的方法和属性,用于方便处理消息,在继承ConsumerMixin时,可以根据需要进行重载。常用的方法和属性主要有以下这些。
.on_connection_error(exception, interval),当连接失败或丢失时的回调。.on_connection_revived(),当连接恢复时的回调。.on_consume_ready(connection, channel, consumers),当消费者准备好处理消息时的回调。.on_consume_end(connection, channel),当消费者调用了.cancel()方法之后的回调。.on_iteration(),当取得事件时每次调用处理函数时的回调。.on_decode_error(message, exception),当消息无法解码时的回调。.get_consumers(Consumer, channel),设定消息来源消费者的列表,是必须重载的方法。.run(),启动消费者。
在ConsumerMixin中,处理消息的方法是通过.get_consumers()返回的消费者列表中不同的消费者的回调方法来设定的,具体可参考示例中的书写。
ConsumerProducerMixin则提供了双向通信的功能,允许在一个类中,既可以接收和处理消息,又可以发出消息。与ConsumerMixin提供的预置方法和属性基本一致,但额外提供了.producer属性来提供生产者功能。利用ConsumerProducerMixin可以方便的实现RPC功能。以下给出一个近似RPC功能的示例。
class Worker(ConsumerProducerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(
queues=[Queue('foo')],
on_message=self.handle_message,
accept='application/json',
prefetch_count=10
)]
def handle_message(self, message):
self.producer.publish(
{'message': 'something need returned'},
exchange='rpc_exchange',
routing_key=message.properties['reply_to'],
correlation_id=messsage.properties['correlation_id'],
retry=True
)