事件系统
Nameko的事件系统是一个异步消息系统,可以在服务之间使用消息来触发一些处理功能。一个Nameko事件可以被一个或多个服务消费,也可以不被任何服务响应。以下是一个简单的特定事件的触发和响应的示例。
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc
class ServiceA:
name = 'service_a'
dispatch = EventDispatcher()
@rpc
def dispatch_message(self, payload):
self.dispatch('event_name', payload)
class ServiceB:
name = 'service_b'
@event_handler('service_a', 'event_name')
def handle_event(self, payload):
# 处理payload中携带的信息
pass
EventDispatcher
类是Nameko的事件分发来源,EventDispatcher
类实例提供了一个.dispatch()
方法,其接受两个参数:event_type
,作为消息转发的Routing_key;event_data
,作为消息主体。
用于响应事件的事件处理方法需要使用@event_handler
修饰器进行修饰。@event_handler
修饰器可以接受以下几个参数。
source_service
,指定事件的来源服务名称,可以用来指定处理方法特定响应指定服务发来的事件。event_type
,对应EventDispatcher
发送事件时使用的event_type
参数。handler_type
,处理方法类型,主要有以下三类。events.SERVICE_POOL
,自动匹配型,所有的处理方法都放在一个池中并根据其关注的服务来源和event_type
进行分组,每个事件在一个处理方法组中只会有一个方法响应。这是handler_type
的默认值。events.SINGLETON
,单一型,每个事件仅有一个注册的处理方法来响应,只有这个处理方法抛出错误并且设定requeue_on_error
时,才会由其他的处理方法来响应。events.BROADCAST
,群发型,每个事件都会被所有的处理方法响应。
requeue_on_error
,设定当处理方法抛出错误时,是否将事件重新排回队列。reliable_delivery
,设定为True
时,事件会一直保存在队列中,直到有事件处理器来处理。
以下给出一个群发型事件处理器的示例。
from nameko.events import BROADCAST, event_handler
class ListenerService:
name = 'listener'
@event_handler(
'monitor', 'ping', handler_type=BROADCAST, reliable_delvery=True
)
def ping(self, payload):
# 所有正在运行的服务都会进行响应
print(f'reponse from {payload}')