Spring事件机制问题排查

前言

之前使用Spring的事件机制来改造系统,完成了部分模块的解耦。但是实际使用时却发现存在以下问题:

ApplicationEventPublisher批量推送ApplicationEvent时,如果ApplicationListener在处理的过程中抛出异常,则会导致后续的推送中断。

PS:Spring版本为5.1.5.RELEASE

下面将会展示一个复盘的示例

复盘示例

自定义事件

import org.springframework.context.ApplicationEvent;

/**
 * 自定义事件
 * @author RJH
 * create at 2018/10/29
 */
public class SimpleEvent extends ApplicationEvent {

    private int i;
    /**
     * Create a new ApplicationEvent.
     *
     * @param source the object on which the event initially occurred (never {@code null})
     */
    public SimpleEvent(Object source) {
        super(source);
        i=Integer.valueOf(source.toString());
    }

    public int getI() {
        return i;
    }

}

事件监听器

import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * 自定义事件监听器
 * @author RJH
 * create at 2018/10/29
 */
@Component
public class SimpleEventListener implements ApplicationListener<SimpleEvent> {

    @Override
    public void onApplicationEvent(SimpleEvent event) {
        if(event.getI()%10==0){
            throw new RuntimeException();
        }
        System.out.println("Time:"+event.getTimestamp()+" event:"+event.getSource());
    }

}

事件推送

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

/**
 * 事件推送
 * @author RJH
 * create at 2018/10/29
 */
public class EventApplication {

    public static void main(String[] args) {
        //扫描特定package
        ApplicationContext context=new AnnotationConfigApplicationContext("com.rjh.event");
        for(int i=1;i<=100;i++){//批量推送事件
            context.publishEvent(new SimpleEvent(i));
        }
    }
}

运行结果

Time:1553607971143 event:1
Time:1553607971145 event:2
Time:1553607971145 event:3
Time:1553607971145 event:4
Time:1553607971145 event:5
Time:1553607971145 event:6
Time:1553607971146 event:7
Time:1553607971146 event:8
Time:1553607971146 event:9
Exception in thread "main" java.lang.RuntimeException
    at com.rjh.event.SimpleEventListener.onApplicationEvent(SimpleEventListener.java:17)
    at com.rjh.event.SimpleEventListener.onApplicationEvent(SimpleEventListener.java:11)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:393)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:347)
    at com.rjh.event.EventApplication.main(EventApplication.java:17)

分析

期待结果为SimpleEventListener抛出异常不影响EventApplication中后续事件的推送。但是实际上却是SimpleEventListener抛出异常会导致EventApplication后续事件的推送中断。从这里可以看出事件的推送和事件的监听是同步阻塞进行,而并非是异步。详细可以参考文档中的介绍:

Notice that ApplicationListener is generically parameterized with the type of your custom event (BlackListEvent in the preceding example). This means that the onApplicationEvent() method can remain type-safe, avoiding any need for downcasting. You can register as many event listeners as you wish, but note that, by default, event listeners receive events synchronously. This means that the publishEvent() method blocks until all listeners have finished processing the event. One advantage of this synchronous and single-threaded approach is that, when a listener receives an event, it operates inside the transaction context of the publisher if a transaction context is available. If another strategy for event publication becomes necessary, See the javadoc for Spring’s ApplicationEventMulticaster interface.

解决办法

将事件监听改造为异步处理,这里将会展示基于JavaConfig即注解的解决方案

开启异步

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;

/**
 * 开启异步服务配置类
 * @author RJH
 * create at 2019-03-26
 */
@EnableAsync
@Configuration
public class AsyncConfig {

}

异步事件监听

import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 * 异步事件监听
 * @author RJH
 * create at 2019-03-26
 */
@Component
public class AsyncSimpleEventListener {

    @EventListener
    @Async
    public void handleEvent(SimpleEvent event){
        if(event.getI()%10==0){
            throw new RuntimeException();
        }
        System.out.println("Time:"+event.getTimestamp()+" event:"+event.getSource());
    }
}

运行结果

Time:1553614469990 event:1
Time:1553614470007 event:72
Time:1553614470006 event:64
Time:1553614470006 event:67
Time:1553614470007 event:73
Time:1553614470007 event:71
Time:1553614470007 event:75
Time:1553614470006 event:68
Time:1553614470007 event:69
Time:1553614470006 event:62
Time:1553614470005 event:61
Time:1553614470006 event:63
Time:1553614470006 event:65
Time:1553614470007 event:74
Time:1553614470006 event:66
Time:1553614470005 event:59
Time:1553614470005 event:57
Time:1553614470005 event:55
Time:1553614470005 event:58
Time:1553614470004 event:51
Time:1553614470004 event:52
Time:1553614470002 event:43
Time:1553614470004 event:53
Time:1553614470002 event:38
Time:1553614470001 event:36
Time:1553614470004 event:54
Time:1553614470001 event:33
Time:1553614470000 event:29
Time:1553614470000 event:27
Time:1553614470005 event:56
Time:1553614469999 event:23
Time:1553614469999 event:22
Time:1553614469999 event:21
三月 26, 2019 11:34:30 下午 org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler handleUncaughtException
严重: Unexpected error occurred invoking async method: public void com.rjh.event.AsyncSimpleEventListener.handleEvent(com.rjh.event.SimpleEvent)
Time:1553614470000 event:24java.lang.RuntimeException
    at com.rjh.event.AsyncSimpleEventListener.handleEvent(AsyncSimpleEventListener.java:19)

    at com.rjh.event.AsyncSimpleEventListener$$FastClassBySpringCGLIB$$61742dbf.invoke(<generated>)
Time:1553614469998 event:15    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:736)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)

    at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:115)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
...内容过长省略部分结果

分析

改造为异步执行后,事件监听就由线程池进行处理,此处还可以通过自定义线程池,并设置异常处理器来处理未捕获的异常。

参考资料

  1. https://docs.spring.io/spring…
  2. https://docs.spring.io/spring…
    原文作者:Null
    原文地址: https://segmentfault.com/a/1190000018667561
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞