消费者
消费者是消息队列中相对于生产者的另一端。Kombu中的消费者可以从Connection
、Channel
或者一系列Queue中获取消息。消费者的创建要比生产者复杂一些,并且Kombu还提供了一系列Mixins来辅助消费者的快速创建。Kombu是一个异步事件型的消息处理库,主要通过回调函数来处理传入的消息。
最简单的消费者创建方法是使用kombu.Consumer
类,在创建Consumer
实例的时候,最少要指定Consumer
实例所使用的Connection
和Queue
,以及可接受的消息类型(Content-Type)。Kombu自3.0之后,默认只能接受JSON/Binary和纯文本消息,如果需要其他类型的消息,必须显式声明,例如Pickle或者YAML类型。Consumer
常用的构造参数主要有以下这些。
channel
,指定Consumer
使用的Connection
或者Channel
。queues
,指定Consumer
使用的Queue
,列表类型。no_ack
,是否由库自动确认消息。auto_declare
,自动创建相关消息队列元件。callbacks
,指定当消息收到时需要按次序调用的回调函数,列表类型。on_message
,当收到消息时调用的回调函数,回调函数接受Message实例作为唯一参数,设定后callbacks
将会失效。accept
,指定可接受的Content Type。on_decode_error
,当消息解码失败后的回调函数。
在消费者建立后,可以调用Connection
类实例的.drain_events(timeout)
方法来等待消息的传入,timeout
参数用来设定等待超时时间。或者还可以使用Consumer
实例中的.consume()
来启动消费者的消息接收。Consumer
类实例中常用的方法主要有以下这些。
.consume(no_ack)
,开始Consumer的消息接收。.cancel()
,结束Consumer的消息接收。.purge()
,删除所有Queue中的消息。.recover(requeue)
,重新发送所有未确认的消息。
然而在实际项目中,使用Mixins来建立Consumer
实例是一个更加方便的选择。Kombu在kombu.mixins
包中提供了两个常用的Mixins类供使用:ConsumerMixin
和ConsumerProducerMixin
。其中ConsumerMixin
主要用于单向接收和处理消息,ConsumerProducerMixin
可以用于双向处理消息。先从ConsumerMixin
开始,以下是一个最简单的应用ConsumerMixin
的消费者。
from kombu import Queue, Exchange
from kombu.mixins import ConsumerMixin
class Worker(ConsumerMixin):
task_queue = Queue('tasks', Exchange('tasks'), 'tasks')
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(
queues=self.task_queue,
on_message=self.on_request
)]
def on_request(self, message):
# 处理消息
message.ack()
Worker(connection).run()
ConsumerMixin
中提供了许多预置的方法和属性,用于方便处理消息,在继承ConsumerMixin
时,可以根据需要进行重载。常用的方法和属性主要有以下这些。
.on_connection_error(exception, interval)
,当连接失败或丢失时的回调。.on_connection_revived()
,当连接恢复时的回调。.on_consume_ready(connection, channel, consumers)
,当消费者准备好处理消息时的回调。.on_consume_end(connection, channel)
,当消费者调用了.cancel()
方法之后的回调。.on_iteration()
,当取得事件时每次调用处理函数时的回调。.on_decode_error(message, exception)
,当消息无法解码时的回调。.get_consumers(Consumer, channel)
,设定消息来源消费者的列表,是必须重载的方法。.run()
,启动消费者。
在ConsumerMixin
中,处理消息的方法是通过.get_consumers()
返回的消费者列表中不同的消费者的回调方法来设定的,具体可参考示例中的书写。
ConsumerProducerMixin
则提供了双向通信的功能,允许在一个类中,既可以接收和处理消息,又可以发出消息。与ConsumerMixin
提供的预置方法和属性基本一致,但额外提供了.producer
属性来提供生产者功能。利用ConsumerProducerMixin
可以方便的实现RPC功能。以下给出一个近似RPC功能的示例。
class Worker(ConsumerProducerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(
queues=[Queue('foo')],
on_message=self.handle_message,
accept='application/json',
prefetch_count=10
)]
def handle_message(self, message):
self.producer.publish(
{'message': 'something need returned'},
exchange='rpc_exchange',
routing_key=message.properties['reply_to'],
correlation_id=messsage.properties['correlation_id'],
retry=True
)