Spring Events 实现 ApplicationContext 通过 ApplicationEvent 类和 ApplicationListener 接口进行事件处理。 如果将实现 ApplicationListener 接口的 bean 注入到上下文中,则每次使用 ApplicationContext 发布 ApplicationEvent 时,都会通知该 bean。 本质上,这是标准的观察者设计模式,是事件驱动模型 在设计层面的体现。
Spring Events 虽然在应用代码中不是很常用,但其框架本身大量使用了事件机制,例如 ApplicationStartedEvent、ApplicationEnvironmentPreparedEvent 等。同时 Spring 允许自定义事件,下面就来演示一下如何实现自定义事件。
Spring 的事件机制和事件发布是 ApplicationContext
本身提供的功能,要实现 Spring Events 需要遵循以下几点:
自定义事件必须继承 ApplicationEvent
事件发布者需要注入并使用ApplicationEventPublisher发布事件,也可以直接使用 ApplicationContext
发布事件,因为它继承了 ApplicationEventPublisher
接口
事件监听器需要实现ApplicationListener接口,也可以使用注解 @EventListener
(推荐)
Spring事件通知原理 首先我们跟踪publishEvent
方法,这个方法在AbstractApplicationContext
类中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 protected void publishEvent(Object event, @Nullable ResolvableType eventType) { Assert.notNull(event, "Event must not be null" ); ApplicationEvent applicationEvent; if (event instanceof ApplicationEvent) { applicationEvent = (ApplicationEvent) event; } else { applicationEvent = new PayloadApplicationEvent<>(this , event); if (eventType == null ) { eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType(); } } if (this .earlyApplicationEvents != null ) { this .earlyApplicationEvents.add(applicationEvent); } else { getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); } if (this .parent != null ) { if (this .parent instanceof AbstractApplicationContext) { ((AbstractApplicationContext) this .parent).publishEvent(event, eventType); } else { this .parent.publishEvent(event); } } } ApplicationEventMulticaster getApplicationEventMulticaster() throws IllegalStateException { if (this .applicationEventMulticaster == null ) { throw new IllegalStateException("ApplicationEventMulticaster not initialized - " + "call 'refresh' before multicasting events via the context: " + this ); } return this .applicationEventMulticaster; }
经过上面的分析,我们看到事件是通过applicationEventMulticaster
来广播出去的。
applicationEventMulticaster
在Spring的启动过程中被建立,我们在之前的文章Spring启动过程分析1(overview) 中分析过Spring的启动过程,在核心方法refresh
中建立applicationEventMulticaster
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 initMessageSource ();initApplicationEventMulticaster ();onRefresh ();registerListeners ();finishBeanFactoryInitialization (beanFactory);finishRefresh ();
关注initApplicationEventMulticaster
和registerListeners
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 protected void initApplicationEventMulticaster ( ) { ConfigurableListableBeanFactory beanFactory = getBeanFactory (); if (beanFactory.containsLocalBean (APPLICATION_EVENT_MULTICASTER_BEAN_NAME )) { this .applicationEventMulticaster = beanFactory.getBean (APPLICATION_EVENT_MULTICASTER_BEAN_NAME , ApplicationEventMulticaster .class ); if (logger.isTraceEnabled ()) { logger.trace ("Using ApplicationEventMulticaster [" + this .applicationEventMulticaster + "]" ); } } else { this .applicationEventMulticaster = new SimpleApplicationEventMulticaster (beanFactory); beanFactory.registerSingleton (APPLICATION_EVENT_MULTICASTER_BEAN_NAME , this .applicationEventMulticaster ); if (logger.isTraceEnabled ()) { logger.trace ("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " + "[" + this .applicationEventMulticaster .getClass ().getSimpleName () + "]" ); } } }protected void registerListeners ( ) { for (ApplicationListener <?> listener : getApplicationListeners ()) { getApplicationEventMulticaster ().addApplicationListener (listener); } String [] listenerBeanNames = getBeanNamesForType (ApplicationListener .class , true , false ); for (String listenerBeanName : listenerBeanNames) { getApplicationEventMulticaster ().addApplicationListenerBean (listenerBeanName); } Set <ApplicationEvent > earlyEventsToProcess = this .earlyApplicationEvents ; this .earlyApplicationEvents = null ; if (earlyEventsToProcess != null ) { for (ApplicationEvent earlyEvent : earlyEventsToProcess) { getApplicationEventMulticaster ().multicastEvent (earlyEvent); } } }
经过前面的分析,我们知道了事件广播器applicationEventMulticaster
如何被构建,下面我们分析事件的广播过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Override public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); // 根据event类型获取适合的监听器 for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) { // 获取SimpleApplicationEventMulticaster中的线程执行器,如果存在线程执行器则在新线程中异步执行,否则直接同步执行监听器中的方法 Executor executor = getTaskExecutor(); if (executor != null ) { executor.execute(() -> invokeListener(listener, event)) ; } else { invokeListener (listener, event) ; } } } protected void invokeListener (ApplicationListener<?> listener, ApplicationEvent event) { // 如果存在ErrorHandler ,调用监听器方法如果抛出异常则调用ErrorHandler 来处理异常。否则直接调用监听器方法 ErrorHandler errorHandler = getErrorHandler () ; if (errorHandler != null ) { try { doInvokeListener (listener, event) ; } catch (Throwable err) { errorHandler .handleError (err) ; } } else { doInvokeListener (listener, event) ; } }
经过上面的分析,我们知道了Spring如何发送并响应事件。下面我们来分析如何使Spring能够异步响应事件。
异步响应Event 默认情况下,Spring是同步执行Event的响应方法的。如果响应方法的执行时间很长会阻塞发送事件的方法,因此很多场景下,我们需要让事件的响应异步化。
为了更直观地说明Event的响应默认是同步的,我们修改一下EventDemoListener
并增加一个EventDemoListener2
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Componentpublic class EventDemoListener implements ApplicationListener <EventDemo > { Logger logger = LoggerFactory.getLogger(EventDemoListener.class ); @Override public void onApplicationEvent (EventDemo event ) { logger.info("receiver " + event .getMessage()); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } @Componentpublic class EventDemoListener2 implements ApplicationListener <EventDemo > { Logger logger = LoggerFactory.getLogger(EventDemoListener2.class ); @Override public void onApplicationEvent (EventDemo event ) { logger.info("receiver 2 " + event .getMessage()); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果如下:
执行结果显示:EventDemoListener2
和EventDemoListener
的执行间隔1秒,EventDemoListener2
的执行和程序的结束也间隔1秒。结果表示我们的响应程序是同步执行的,一个响应程序的执行会阻塞下一个响应程序的执行。
自定义SimpleApplicationEventMulticaster 通过前面的代码分析,我们发现如果SimpleApplicationEventMulticaster
中的taskExecutor
如果不为null,将在taskExecutor
中异步执行响应程序。applicationEventMulticaster
的新建在initApplicationEventMulticaster
方法中,默认情况下它会新建一个SimpleApplicationEventMulticaster
,其中的taskExecutor
为null。因此想要taskExecutor
不为null,我们可以自己手动创建一个SimpleApplicationEventMulticaster
然后设置一个taskExecutor
。
修改Config
类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Configuration @ComponentScan ("love.wangqi" )public class Config { @Bean public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor ( ) { return new SimpleAsyncTaskExecutor (); } @Bean public SimpleApplicationEventMulticaster applicationEventMulticaster ( ) { SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster (); simpleApplicationEventMulticaster.setTaskExecutor (simpleAsyncTaskExecutor ()); return simpleApplicationEventMulticaster; } }
此时再次执行程序,执行结果如下:
可以看到,EventDemoListener
和EventDemoListener2
是同时执行的,同时它们的执行没有阻塞主程序的执行。事件的响应做到了异步化。
@Async 前面我们看到,通过手动新建SimpleApplicationEventMulticaster
并设置TaskExecutor
可以使所有的事件响应程序都在另外的线程中执行,不阻塞主程序的执行。不过这样也带来一个问题,那就是所有的事件响应程序都异步化了,某些场景下我们希望某些关系密切的响应程序可以同步执行另外一些响应程序异步执行。这种场景下,我们就不能简单地新建SimpleApplicationEventMulticaster
并设置TaskExecutor
。
Spring中提供了一个@Async
注解,可以将加上这个注解的方法在另外的线程中执行。通过这个注解我们可以将指定的事件响应程序异步化。
我们修改EventDemoListener
,在onApplicationEvent
中加上@Async
注解;同时修改Config
类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Component public class EventDemoListener implements ApplicationListener <EventDemo > { Logger logger = LoggerFactory .getLogger (EventDemoListener .class ); @Async @Override public void onApplicationEvent (EventDemo event ) { logger.info ("receiver " + event.getMessage ()); try { Thread .sleep (1000 ); } catch (InterruptedException e) { e.printStackTrace (); } } }@Configuration @ComponentScan ("love.wangqi" )@EnableAsync public class Config { @Bean public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor ( ) { return new SimpleAsyncTaskExecutor (); } }
注意Config
类中需要加上@EnableAsync
注释,并定义TaskExecutor
。
执行结果如下:
我们看到,EventDemoListener
是在另外的线程中执行的,但是EventDemoListener2
仍然在主线程中执行,因此EventDemoListener2
阻塞了主线程的执行。
@Async原理 @Async
注解可以将方法异步化,下面我们来看看它的原理是什么。
我们在Config
类中添加了@EnableAsync
注释。@EnableAsync
注释引入AsyncConfigurationSelector
类,AsyncConfigurationSelector
类导入ProxyAsyncConfiguration
类,ProxyAsyncConfiguration
类新建过程中会新建AsyncAnnotationBeanPostProcessor
。
AsyncAnnotationBeanPostProcessor
类继承了BeanPostProcessor
,当每个Bean新建完成后会调用AsyncAnnotationBeanPostProcessor
的postProcessAfterInitialization
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (this .advisor == null || bean instanceof AopInfrastructureBean) { return bean; } if (bean instanceof Advised) { Advised advised = (Advised) bean; if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) { if (this .beforeExistingAdvisors) { advised.addAdvisor(0 , this .advisor); } else { advised.addAdvisor(this .advisor); } return bean; } } if (isEligible(bean, beanName)) { ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this .advisor); customizeProxyFactory(proxyFactory); return proxyFactory.getProxy(getProxyClassLoader()); } return bean; }
postProcessAfterInitialization
方法判断bean是否符合要求(方法上是否加了@Async
注释),如果符合要求则对bean加上代理,代理类为AnnotationAsyncExecutionInterceptor
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Override @Nullable public Object invoke (final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null ); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null ) { throw new IllegalStateException ( "No executor specified and no default executor set on AsyncExecutionInterceptor either" ); } Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null ; }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
调用我们的方法时首先调用AnnotationAsyncExecutionInterceptor
的invoke
方法,invoke
方法将我们真正的方法包装成一个Callable
任务,将这个任务提交到executor
中执行。由此达到了将我们的方法异步化的目的。
注意点
Spring Events 只能监听同一个 JVM 中的事件,如果需要实现跨应用的事件发布和监听则需要引入消息中间件(RabbitMQ,JMQ等)
异步事件需要关注线程资源,在大并发的情况下需要自定义线程池
方法只要注解了@Async
并且开启异步即可实现异步,异步是多线程中的知识点,而event是一种设计模式(观察者模式或发布订阅模式)