生产者
当完成消息队列的连接之后,就可以建立消息生产者和消息消费者了。在Kombu中建立消息生产者可以使用消息队列连接实例建立或者直接使用kombu.Producer类实例化。
通过消息队列连接建立生产者比较容易,直接通过Connection类的.Producer()方法就可以获得可用的生产者实例。直接建立生产者需要先获取一个Channel实例,Channel实例可以通过Connection类的.channel()方法直接获取,例如。
with Connection("amqp://") as conn:
with conn.channel() as channel:
producer = Producer(channel)
Producer类在实例化时,可以使用以下常用参数来进行定制。
channel,要绑定到的Connection实例或者Channel实例。exchange,指定默认要使用的Exchange。routing_key,指定默认绑定键。serializer,指定默认的串行化方法,默认采用JSON。compression,指定默认的压缩方法,默认不压缩。auto_declare,指定是否创建默认的Exchange,默认是True。on_return,当消息不能被发送出去而返回生产者时的回调。
对于生产者来说,最重要的就是使用.publish()方法发送消息。.publish()方法可以将一条消息发送到指定的Exchange。.publish()方法常用的参数主要有以下这些。
body,消息内容,可以是字符串、字典、列表等,无需指定参数名称。routing_key,绑定键,用于决定消息转发到哪个Queue中。delivery_mode,发送模式,与Exchange中的同名参数意义相同。priority,消息优先级。content_type,内容类型,字符串型,默认是自动检测。content_encoding,内容编码方式,字符串型,默认是自动检测。serializer,串行化消息要使用的串行化方法,字符串型,默认是自动检测。compression,消息压缩方法,字符串型,默认是自动检测。headers,消息头,字典类型。declare,在发送消息之前需要完成的消息传递元件创建,列表型,可以传入Exchange实例等。retry,指示是否在连接丢失时重试发送消息。retry_policy,消息重发的策略,字典类型。interval_start,重试开始延迟秒数。interval_step,每次重试间隔时间增加的秒数。interval_max,两次重试之间最大间隔时间。max_retries,最大重试次数。
expiration,消息过期时间,浮点型,单位是秒。
Kombu也提供了生产者池,允许在程序中共用一组生产者以提高消息发送效率。与连接池类似,生产者池可以使用kombu.pools.producers[connection]来创建。从生产者池中获取生产者的方式与从连接池里获取连接相同,都是使用.acquire(block)方法。除此之外,生产者池还可以通过kombu.pools.ProducerPool()来创建。