建立连接
消息队列的连接是通过kombu.Connection类控制的。Connection类一般采用一个消息队列连接串作为参数完成实例化。例如:connection = kombu.Connection("amqp://guest:guest@localhost")。
Kombu采用的消息队列连接串中声明了Kombu要使用的消息队列传递媒介和对应的驱动,以及消息队列的位置和登录信息等。消息队列连接串的格式为驱动库名称://用户名:密码@主机:端口/虚拟主机。
除了使用连接串实例化Connection类以外,Connection类还接收以下命名参数来完成实例化。
hostname,消息队列主机。userid,登录用户名。password,登录密码。virtual_host,虚拟主机。port,端口。transport,消息队列传输媒介及驱动名称,字符串类型值。ssl,是否使用SSL进行连接,布尔类型值。connect_timeout,连接超时的秒数。transport_options,其他配置项,字典类型值。
仅仅创建Connection类实例并不能真正的建立连接,还需要使用Connection类提供的方法和属性来控制。
.connect(),显式建立连接。.connected,查看连接状态。.close(),显式关闭连接。.release(),根据是否使用连接池来进行连接的关闭和资源释放。
Connection类还实现了__enter__和__exit__方法,来使用with ... as语句。例如:with Connection() as connection:。
Kombu针对消息队列连接复用提供了一个全局的连接池。连接池位于kombu.pools包中。连接池的使用方法可参考以下示例。
from kombu import Connection
from kombu.pools import connections
connection = Connection("amqp://guest:guest@localhost/")
pool = connections[connection]
with pool.acquire(block=True) as conn:
# 进行消息队列的操作
连接池采用.acquire()方法来获取可用的连接,指定block=True表示在连接池中连接不足的情况下等待可用连接。如果不指定block=True,则在连接池中连接不足的时候,就会抛出kombu.exceptions.ConnectionLimitExceeded异常。如果需要设置连接池的大小,可在建立连接池前使用kombu.pools.set_limit(n)来指定连接池的大小。
除了使用kombu.pools.connections来建立连接池以外,还可以使用kombu.connection.ConnectionPool类和Connection类中的.Pool(limit)方法来创建。其中ConnectionPool类接收Connection类实例和表示连接池大小的limit两个参数来完成实例化。使用这两种方法创建的连接池,在使用是同样需要使用.acquire(block, timeout)方法来获取连接,并且在完成使用后需要使用Connection类实例的.release()方法将连接交回连接池。