生产者
当完成消息队列的连接之后,就可以建立消息生产者和消息消费者了。在Kombu中建立消息生产者可以使用消息队列连接实例建立或者直接使用kombu.Producer
类实例化。
通过消息队列连接建立生产者比较容易,直接通过Connection
类的.Producer()
方法就可以获得可用的生产者实例。直接建立生产者需要先获取一个Channel
实例,Channel
实例可以通过Connection
类的.channel()
方法直接获取,例如。
with Connection("amqp://") as conn:
with conn.channel() as channel:
producer = Producer(channel)
Producer
类在实例化时,可以使用以下常用参数来进行定制。
channel
,要绑定到的Connection实例或者Channel实例。exchange
,指定默认要使用的Exchange。routing_key
,指定默认绑定键。serializer
,指定默认的串行化方法,默认采用JSON。compression
,指定默认的压缩方法,默认不压缩。auto_declare
,指定是否创建默认的Exchange,默认是True
。on_return
,当消息不能被发送出去而返回生产者时的回调。
对于生产者来说,最重要的就是使用.publish()
方法发送消息。.publish()
方法可以将一条消息发送到指定的Exchange。.publish()
方法常用的参数主要有以下这些。
body
,消息内容,可以是字符串、字典、列表等,无需指定参数名称。routing_key
,绑定键,用于决定消息转发到哪个Queue中。delivery_mode
,发送模式,与Exchange中的同名参数意义相同。priority
,消息优先级。content_type
,内容类型,字符串型,默认是自动检测。content_encoding
,内容编码方式,字符串型,默认是自动检测。serializer
,串行化消息要使用的串行化方法,字符串型,默认是自动检测。compression
,消息压缩方法,字符串型,默认是自动检测。headers
,消息头,字典类型。declare
,在发送消息之前需要完成的消息传递元件创建,列表型,可以传入Exchange实例等。retry
,指示是否在连接丢失时重试发送消息。retry_policy
,消息重发的策略,字典类型。interval_start
,重试开始延迟秒数。interval_step
,每次重试间隔时间增加的秒数。interval_max
,两次重试之间最大间隔时间。max_retries
,最大重试次数。
expiration
,消息过期时间,浮点型,单位是秒。
Kombu也提供了生产者池,允许在程序中共用一组生产者以提高消息发送效率。与连接池类似,生产者池可以使用kombu.pools.producers[connection]
来创建。从生产者池中获取生产者的方式与从连接池里获取连接相同,都是使用.acquire(block)
方法。除此之外,生产者池还可以通过kombu.pools.ProducerPool()
来创建。