【转】Spring AMQP 源码分析 05 - 异常处理

转自 首夜盲毒预言家

### 准备

## 目标

了解 Spring AMQP Message Listener 如何处理异常

## 前置知识

《Spring AMQP 源码分析 04 – MessageListener》

## 相关资源

Offical doc:<http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/html/_reference.html#exception-handling>

Sample code:<https://github.com/gordonklg/study>,rabbitmq module

源码版本:Spring AMQP 1.7.3.RELEASE

## 测试代码

gordon.study.rabbitmq.springamqp.AsyncConsumerWithErrorHandler.java

《【转】Spring AMQP 源码分析 05 - 异常处理》

 

### 分析

## 消息消费异常处理流程

根据上一遍文章的分析,示例代码 AsyncConsumerWithErrorHandler 第26行开始消费消息,方法运行在 AsyncMessageProcessingConsumer 线程实例中,调用栈如下:

《【转】Spring AMQP 源码分析 05 - 异常处理》

 

由 AbstractMessageListenerContainer 的 doInvokeListener 方法直接发起对 onMessage 方法的调用,代码如下:

《【转】Spring AMQP 源码分析 05 - 异常处理》

对于 onMessage 消息处理过程中抛出的异常, wrapToListenerExecutionFailedExceptionIfNeeded 方法将所有非 ListenerExecutionFailedException 异常包装为 ListenerExecutionFailedException 异常(通过 instanceof 判断,因此 ListenerExecutionFailedException 子类不会被包装),并重新抛出。

 

调用栈一直退出,直到 SimpleMessageListenerContainer(AbstractMessageListenerContainer).executeListener(Channel, Message) line: 729   

《【转】Spring AMQP 源码分析 05 - 异常处理》

 

handleListenerException 方法最终调用 invokeErrorHandler 方法,通过属性 ErrorHandler errorHandler 的 handleError 方法正式处理异常。

《【转】Spring AMQP 源码分析 05 - 异常处理》

 

默认的 errorHandle 就是 ConditionalRejectingErrorHandler,其 handleError 逻辑很简单:如果抛出的异常,其原因链中不包含 AmqpRejectAndDontRequeueException,同时 ConditionalRejectingErrorHandler 内部属性 FatalExceptionStrategy exceptionStrategy 的 isFatal 方法返回 true(可以看成是无法恢复的严重异常),则将异常包装为 AmqpRejectAndDontRequeueException 重新抛出。

《【转】Spring AMQP 源码分析 05 - 异常处理》

 

这是判断原因链中是否包含 AmqpRejectAndDontRequeueException 的代码:

 《【转】Spring AMQP 源码分析 05 - 异常处理》

 

默认情况下, exceptionStrategy 是 DefaultExceptionStrategy 的实例:

《【转】Spring AMQP 源码分析 05 - 异常处理》

只有 ListenerExecutionFailedException 异常及其子类才可能是 fatal,但是对于本次调用栈,wrapToListenerExecutionFailedExceptionIfNeeded 方法保证了抛出的异常是 ListenerExecutionFailedException。

isCauseFatal 方法定义了一些严重异常,很显然,这些异常都是些无论重试多少次都会出错的异常,因此应该被包装为 AmqpRejectAndDontRequeueException  异常。

DefaultExceptionStrategy 预留了 isUserCauseFatal 方法给用户扩展。

 

异常继续往外抛,到 SimpleMessageListenerContainer.doReceiveAndExecute(BlockingQueueConsumer) line: 1260,会调用 BlockingQueueConsumer 的 rollbackOnExceptionIfNecessary 方法。该方法先判断是否要向 RabbitMQ 确认消息,如果需要确认(意味着要调用 channel 的 basicReject 方法),再根据异常的原因链中是否存在 AmqpRejectAndDontRequeueException 异常决定如何设置  basicReject 方法的 requeue 参数。

 

## 示例代码分析

从流程分析中可知,onMessage 方法如果抛出异常,一般情况下会导致消息被 reject,同时重新入队。这个默认设置比较安全。

 

如果我们不想要消息重新入队呢?最简单的方法就是抛出 AmqpRejectAndDontRequeueException,就像示例代码中被注释掉的第33行代码那样。但是这导致业务消费逻辑与框架实现绑定过深,因此,我们采用重载 DefaultExceptionStrategy 的 isUserCauseFatal 方法来决定不同的业务异常要不要让 reject 的消息重新入队,正如示例代码第42行所示。当异常类型是 UserDefineException 时,消息被 reject 同时不会重入队列。

    原文作者:Spring MVC
    原文地址: https://blog.csdn.net/AnY11/article/details/82930421
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞