生产者

当完成消息队列的连接之后,就可以建立消息生产者和消息消费者了。在Kombu中建立消息生产者可以使用消息队列连接实例建立或者直接使用kombu.Producer类实例化。

通过消息队列连接建立生产者比较容易,直接通过Connection类的.Producer()方法就可以获得可用的生产者实例。直接建立生产者需要先获取一个Channel实例,Channel实例可以通过Connection类的.channel()方法直接获取,例如。

with Connection("amqp://") as conn:
	with conn.channel() as channel:
		producer = Producer(channel)

Producer类在实例化时,可以使用以下常用参数来进行定制。

  • channel,要绑定到的Connection实例或者Channel实例。
  • exchange,指定默认要使用的Exchange。
  • routing_key,指定默认绑定键。
  • serializer,指定默认的串行化方法,默认采用JSON。
  • compression,指定默认的压缩方法,默认不压缩。
  • auto_declare,指定是否创建默认的Exchange,默认是True
  • on_return,当消息不能被发送出去而返回生产者时的回调。

对于生产者来说,最重要的就是使用.publish()方法发送消息。.publish()方法可以将一条消息发送到指定的Exchange。.publish()方法常用的参数主要有以下这些。

  • body,消息内容,可以是字符串、字典、列表等,无需指定参数名称。
  • routing_key,绑定键,用于决定消息转发到哪个Queue中。
  • delivery_mode,发送模式,与Exchange中的同名参数意义相同。
  • priority,消息优先级。
  • content_type,内容类型,字符串型,默认是自动检测。
  • content_encoding,内容编码方式,字符串型,默认是自动检测。
  • serializer,串行化消息要使用的串行化方法,字符串型,默认是自动检测。
  • compression,消息压缩方法,字符串型,默认是自动检测。
  • headers,消息头,字典类型。
  • declare,在发送消息之前需要完成的消息传递元件创建,列表型,可以传入Exchange实例等。
  • retry,指示是否在连接丢失时重试发送消息。
  • retry_policy,消息重发的策略,字典类型。
    • interval_start,重试开始延迟秒数。
    • interval_step,每次重试间隔时间增加的秒数。
    • interval_max,两次重试之间最大间隔时间。
    • max_retries,最大重试次数。
  • expiration,消息过期时间,浮点型,单位是秒。

Kombu也提供了生产者池,允许在程序中共用一组生产者以提高消息发送效率。与连接池类似,生产者池可以使用kombu.pools.producers[connection]来创建。从生产者池中获取生产者的方式与从连接池里获取连接相同,都是使用.acquire(block)方法。除此之外,生产者池还可以通过kombu.pools.ProducerPool()来创建。