Rabbitmq发送消息确认机制

一、场景
消息发送的时候,有可能消息无法正常发送到rabbitmq情况。例如电商项目中,用户下了订单,订单系统处理完成后,将订单消息发送到消息中间件,待库存系统消费,结果这个消息没有发送到消息中间件,这时候就会造成订单系统处理了消息,而库存系统没有处理的情况。
二、问题
《Rabbitmq发送消息确认机制》
针对消息发送,主要有两类问题
1.消息未能发送到交换机
2.消息未能路由到队列
三、解决方案
1.消息未能发送到交换机,主要通过发送方确认回调
2.消息未能路由到队列,主要是通过返回失败回调,并且可以设置死信交换机,消息重新路由到绑定到死信交换机的队列上。
四、实现
1.首先需要在连接参数上设置确认机制

		//开启消息确认机制
        connectionFactory.setPublisherConfirms(true);

yml形式

spring:
	rabbitmq:
		publisher-confirms: true

2.发送方确认回调
在RabbitTemplate定义时,设置确认回调参数

		//发送方确认回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                //实现发送方确认,比如发送交换机失败,则将发送的消息入库
				//这个布尔值就是消息是否发送到交换机成功标识,可以根据这个标识对错误信息,进行处理
                System.out.println(b);
                //发送失败信息
                System.out.println(s);
				//CorrelationData是业务id的封装类,可以在客户端发送消息时,携带这个信息(可以在发送时,增加这个参数,参加RabbitmqMessageSend),当这里出现问题时,找到这笔消息
                System.out.println(correlationData);
            }
        });

3.失败回调
也是在RabbitTemplate定义时设置

//开启失败回调
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //message:发送的消息 “hello”+ 消息的配置(消息过期时间、排序权重 rpc contentType contentEncoding)
                System.out.println(message);
                //返回码
                System.out.println(replyCode);
                //返回信息
                System.out.println(replyText);
                //交换机
                System.out.println(exchange);
                //路由键
                System.out.println(routingKey);
            }
        });

4.死信交换机
在设置交换机时,定义死信交换机,可以将失败回调的消息重新路由到死信交换机相应的队列上。如果最后没法路由到队列,也会再执行失败回调代码块。
1)设置死信交换机

//创建交换机directExchage3,并设置它的死信交换机exchange
    @Bean
    public DirectExchange directExchange(){
        //增加死信交换机
        Map<String, Object> map = new HashMap<>();
        rabbitAdmin(connectionFactory()).deleteExchange("directExchage2");
		//这个类型交换机,一般是fanout类型交换机
        map.put("alternate-exchange","exchange");
        return new DirectExchange("directExchage",false,false,map);
    }

2)执行过程
无死信交换机
正常路由
《Rabbitmq发送消息确认机制》
无法正常路由
《Rabbitmq发送消息确认机制》
有死信交换机
《Rabbitmq发送消息确认机制》
消息如果通过directExchange交换机,无法路由到队列,则消息会通过死信交换机exchange重新路由,直到路由到相应队列,返回会触发失败回调。
五、总结
confirm机制是确认我们的消息是否投递到了 RabbitMq(Broker)上面,
而mandatory是在我们的消息进入队列失败时候不会被遗弃(让我们自己进行处理)

    原文作者:DURAN-IS-LEGEND
    原文地址: https://blog.csdn.net/li1987by/article/details/90417041
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞