事件系统

Nameko的事件系统是一个异步消息系统,可以在服务之间使用消息来触发一些处理功能。一个Nameko事件可以被一个或多个服务消费,也可以不被任何服务响应。以下是一个简单的特定事件的触发和响应的示例。

from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc


class ServiceA:
	name = 'service_a'
	
	dispatch = EventDispatcher()
	
	@rpc
	def dispatch_message(self, payload):
		self.dispatch('event_name', payload)


class ServiceB:
	name = 'service_b'
	
	@event_handler('service_a', 'event_name')
	def handle_event(self, payload):
		# 处理payload中携带的信息
		pass

EventDispatcher类是Nameko的事件分发来源,EventDispatcher类实例提供了一个.dispatch()方法,其接受两个参数:event_type,作为消息转发的Routing_key;event_data,作为消息主体。

用于响应事件的事件处理方法需要使用@event_handler修饰器进行修饰。@event_handler修饰器可以接受以下几个参数。

  • source_service,指定事件的来源服务名称,可以用来指定处理方法特定响应指定服务发来的事件。
  • event_type,对应EventDispatcher发送事件时使用的event_type参数。
  • handler_type,处理方法类型,主要有以下三类。
    • events.SERVICE_POOL,自动匹配型,所有的处理方法都放在一个池中并根据其关注的服务来源和event_type进行分组,每个事件在一个处理方法组中只会有一个方法响应。这是handler_type的默认值。
    • events.SINGLETON,单一型,每个事件仅有一个注册的处理方法来响应,只有这个处理方法抛出错误并且设定requeue_on_error时,才会由其他的处理方法来响应。
    • events.BROADCAST,群发型,每个事件都会被所有的处理方法响应。
  • requeue_on_error,设定当处理方法抛出错误时,是否将事件重新排回队列。
  • reliable_delivery,设定为True时,事件会一直保存在队列中,直到有事件处理器来处理。

以下给出一个群发型事件处理器的示例。

from nameko.events import BROADCAST, event_handler


class ListenerService:
	name = 'listener'
	
	@event_handler(
		'monitor', 'ping', handler_type=BROADCAST, reliable_delvery=True
	)
	def ping(self, payload):
		# 所有正在运行的服务都会进行响应
		print(f'reponse from {payload}')