建立连接

消息队列的连接是通过kombu.Connection类控制的。Connection类一般采用一个消息队列连接串作为参数完成实例化。例如:connection = kombu.Connection("amqp://guest:guest@localhost")

Kombu采用的消息队列连接串中声明了Kombu要使用的消息队列传递媒介和对应的驱动,以及消息队列的位置和登录信息等。消息队列连接串的格式为驱动库名称://用户名:密码@主机:端口/虚拟主机

除了使用连接串实例化Connection类以外,Connection类还接收以下命名参数来完成实例化。

  • hostname,消息队列主机。
  • userid,登录用户名。
  • password,登录密码。
  • virtual_host,虚拟主机。
  • port,端口。
  • transport,消息队列传输媒介及驱动名称,字符串类型值。
  • ssl,是否使用SSL进行连接,布尔类型值。
  • connect_timeout,连接超时的秒数。
  • transport_options,其他配置项,字典类型值。

仅仅创建Connection类实例并不能真正的建立连接,还需要使用Connection类提供的方法和属性来控制。

  • .connect(),显式建立连接。
  • .connected,查看连接状态。
  • .close(),显式关闭连接。
  • .release(),根据是否使用连接池来进行连接的关闭和资源释放。

Connection类还实现了__enter____exit__方法,来使用with ... as语句。例如:with Connection() as connection:

Kombu针对消息队列连接复用提供了一个全局的连接池。连接池位于kombu.pools包中。连接池的使用方法可参考以下示例。

from kombu import Connection
from kombu.pools import connections


connection = Connection("amqp://guest:guest@localhost/")
pool = connections[connection]

with pool.acquire(block=True) as conn:
	# 进行消息队列的操作

连接池采用.acquire()方法来获取可用的连接,指定block=True表示在连接池中连接不足的情况下等待可用连接。如果不指定block=True,则在连接池中连接不足的时候,就会抛出kombu.exceptions.ConnectionLimitExceeded异常。如果需要设置连接池的大小,可在建立连接池前使用kombu.pools.set_limit(n)来指定连接池的大小。

除了使用kombu.pools.connections来建立连接池以外,还可以使用kombu.connection.ConnectionPool类和Connection类中的.Pool(limit)方法来创建。其中ConnectionPool类接收Connection类实例和表示连接池大小的limit两个参数来完成实例化。使用这两种方法创建的连接池,在使用是同样需要使用.acquire(block, timeout)方法来获取连接,并且在完成使用后需要使用Connection类实例的.release()方法将连接交回连接池。