建立连接
消息队列的连接是通过kombu.Connection
类控制的。Connection类一般采用一个消息队列连接串作为参数完成实例化。例如:connection = kombu.Connection("amqp://guest:guest@localhost")
。
Kombu采用的消息队列连接串中声明了Kombu要使用的消息队列传递媒介和对应的驱动,以及消息队列的位置和登录信息等。消息队列连接串的格式为驱动库名称://用户名:密码@主机:端口/虚拟主机
。
除了使用连接串实例化Connection类以外,Connection类还接收以下命名参数来完成实例化。
hostname
,消息队列主机。userid
,登录用户名。password
,登录密码。virtual_host
,虚拟主机。port
,端口。transport
,消息队列传输媒介及驱动名称,字符串类型值。ssl
,是否使用SSL进行连接,布尔类型值。connect_timeout
,连接超时的秒数。transport_options
,其他配置项,字典类型值。
仅仅创建Connection类实例并不能真正的建立连接,还需要使用Connection类提供的方法和属性来控制。
.connect()
,显式建立连接。.connected
,查看连接状态。.close()
,显式关闭连接。.release()
,根据是否使用连接池来进行连接的关闭和资源释放。
Connection类还实现了__enter__
和__exit__
方法,来使用with ... as
语句。例如:with Connection() as connection:
。
Kombu针对消息队列连接复用提供了一个全局的连接池。连接池位于kombu.pools
包中。连接池的使用方法可参考以下示例。
from kombu import Connection
from kombu.pools import connections
connection = Connection("amqp://guest:guest@localhost/")
pool = connections[connection]
with pool.acquire(block=True) as conn:
# 进行消息队列的操作
连接池采用.acquire()
方法来获取可用的连接,指定block=True
表示在连接池中连接不足的情况下等待可用连接。如果不指定block=True
,则在连接池中连接不足的时候,就会抛出kombu.exceptions.ConnectionLimitExceeded
异常。如果需要设置连接池的大小,可在建立连接池前使用kombu.pools.set_limit(n)
来指定连接池的大小。
除了使用kombu.pools.connections
来建立连接池以外,还可以使用kombu.connection.ConnectionPool
类和Connection类中的.Pool(limit)
方法来创建。其中ConnectionPool类接收Connection类实例和表示连接池大小的limit
两个参数来完成实例化。使用这两种方法创建的连接池,在使用是同样需要使用.acquire(block, timeout)
方法来获取连接,并且在完成使用后需要使用Connection类实例的.release()
方法将连接交回连接池。