通道

通道(Channel)是Pika收发消息的根本途径。Channel是通过消息队列连接(Connection)的.channel()方法来获取的,格式为.channel(channel_number=None, on_open_callback=None)。其中channel_number表示申请使用的通道编号,如果不指定通道编号,则由消息队列自动分配。on_open_calback则是当通道建立时调用的回调函数。

通道中常用的操作消息的方法主要有以下这些。

  • .basic_ack(delivery_tag=0, multiple=False),应答收到一条或者多条消息。
    • delivery_tag,消息服务对消息设定的编号。
    • multiple,如果设为True,则表示确认收到包含delivery_tag及以前的全部消息,否则仅确认delivery_tag代表的这一条消息。
  • .basic_cancel(consumer_tag='', callback=None),取消一个消息消费者。这个操作不会影响已经发送的消息,但并不意味着消息队列不会再向消费者发送更多的消息。
  • .basic_consume(queue, on_message_callback=None, auto_ack=False, exclusive=False, consumer_tag=None, arguments=None, callback=None),向消息队列发送AMQP 0-9-1 Basic.Consume命令建立对指定队列的消费响应。
    • queue,指定响应的消息队列。
    • on_message_callback,收到消息的时候调用的回调函数,函数签名为on_message_callback(channel, method, properties, body)
    • auto_ack,设定是否自动确认收到消息。
    • exclusive,设定是否在同一队列上允许存在其他消费者。
    • consumer_tag,设定消费者编号,如果不设定将由消息队列自动分配。
    • arguments,用于配置消费者的参数。
    • callback,用于响应Basic.ConsumeOk指令的回调函数。
  • .basic_get(queue, callback, auto_ack=False),从指定消息队列中获取一条消息。
  • .basic_nack(delivery_tag=None, multiple=False, requeue=True),拒收一条或者多条消息。
    • requeue,设定被拒收的消息是否重新排回消息队列。
  • .basic_publish(exchange, routing_key, body, properties=None, mandatory=False),向指定Channel发送一条消息。
    • exchange,消息要经由的交换网关。
    • routing_key,消息的路由键值。
    • body,字节数组类型,消息主体。
    • properties,消息的属性,pika.spec.BasicProperties类型。
    • mandatory,强制标志。
  • .basic_reject(delivery_tag, requeue=True),拒收一条收到的消息。
  • .basic_recover(requeue=False, callback=None),通知消息队列重新发送所有未确认的消息。
  • .close(reply_code=0, reply_text='Normal shutdown'),关闭Channel。
  • .confirm_delivery(ack_nack_callback, callback=None),转到Channel的确认模式,用来通知消息队列消息的确认状态。
  • .exchange_bind(destination, source, routing_key='', arguments=None, callback=None),绑定一个消息交换网关到另一个网关。
    • destination,要绑定到的目标网关。
    • source,消息来源网关。
    • routing_key,要进行路由的消息路由键值。
    • arguments,设定绑定的属性。
  • .exchange_declare(exchange, exchange_type='direct', passive=False, durable=False, auto_delete=True, internal=False, arguments=None, callback=None),声明一个新的消息交换网关。
    • exchange,消息交换网关的名称。
    • exchange_type,消息交换网关的类型,默认为'direct'直接发送型网关,还可以选择'topic'主题转发型网关,'fanout'群发型网关。
    • passive,设定只在网关不存在时新建。
    • durable,设定当RabbitMQ重启后是否重建。
    • auto_delete,设定当没有队列绑定到网关时,自动删除网关。
    • internal,设定只能接收从其他网关发来的消息。
  • .exchange_delete(exchange=None, if_unused=False, callback=None),删除消息交换网关。
    • exchange,指定要删除的网关。
    • if_unused,设定只删除未使用的网关。
  • .exchange_unbind(destination=None, source=None, routing_key='', arguments=None, callback=None),解除两个消息交换网关间的绑定。
  • .queue_bind(queue, exchange, routing_key=None, arguments=None, callback=None),将指定队列绑定到交换网关。
  • .queue_declare(queue, passive=False, exclusive=False, auto_delete=False, arguments=None, callback=None),声明一个用于消费消息的队列。
  • .queue_delete(queue, if_unused=False, if_empty=False, callback=None),删除一个队列。
    • if_empty,只在队列是空的时候删除队列。
  • .queue_purge(queue, callback=None),清空队列中的消息。
  • .queue_unbind(queue, exchange=None, routing_key=None, arguments=None, callback=None),将消费队列与交换网关解绑。

Channel同时也提供了一些专用的回调函数附加方法用于将回调函数附加到一些需要关注和操作的状态转换点上。回调函数附加方法主要有以下这些。

  • .add_callback(callback, replies, one_shot=True),附加用于处理消息队列返回的回复信息的回调函数。
    • replies,设定要响应的回复。
    • one_shot,只处理第一个类型的回调。
  • .add_on_cancel_callback(callback),附加当服务器调用.basic_cancel()时的回调。
  • .add_on_close_callback(callback),附加当Channel关闭后的回调。
  • .add_on_return_callback(callback),附加当调用.basic_publish()后消息被拒收并从服务器返回时的回调。