rabbitmq 消息应用的解耦论述及如何保证消息的正确处理

1.为什么说消息中间件可以进行消息解耦?

举个例子:

比如我们在京东商城买东西,我们买了一双鞋子,那么库存系统则要进行减少一双的动作操作(这里是一个复杂的过程,我们这里只是简单的描述一下应用场景,复杂的场景不仔细讲,比如下单支付,支付扣款,客户接受扣款成功然后减库存),如果传统的做法我们订单成功之后,那么我们库存进行减少,此操作我们必须保证在一个事物里,这是一个原子操作,这样做在当今高并发的场景中并不使用,效率极其的低,并且应用系统之间耦合度较高。那么消息中间件可以解决我们这类的问题,我们首先将订单消息发送给mq的服务队列中,那么库存系统可以接受对应队列中进行消息处理,那么此时我们的应用系统之间并没有什么耦合联系,而都是通过相对应的队列进行消息的接收和处理,这样我们就可以将我们的系统之间的耦合度进行拆分,并且消息机制的异步处理机制可以提高我们的效率,并且集群的使用可以进行并发访问的处理等。

2.消息处理过程中我们会遇到一下几个问题?

1)怎么保证mq消息接收成功(事物或发送确认模式)

2)怎么在消息处理期间进行消息保持(持久化)

3)怎么确认消费者进行消息的消费问题(消息确认模式)

下面就举一个简单的例子:

首先我们配置相关配置文件:

 <!– 配置扫描路径 –>
     <context:component-scan base-package=”com.dongnaoedu”>
      <context:exclude-filter type=”annotation” expression=”org.springframework.stereotype.Controller”/>
     </context:component-scan>
<!– rabbitMQ配置 –>
<bean id=”rabbitConnectionFactory”
  class=”org.springframework.amqp.rabbit.connection.CachingConnectionFactory”>
<constructor-arg value=”127.0.0.1″/>
<property name=”username” value=”guest”/>
<property name=”password” value=”guest”/>
<property name=”channelCacheSize” value=”8″/>
<property name=”port” value=”5672″></property>
<!– 发布确认必须配置在CachingConnectionFactory上 –>
<property name=”publisherConfirms” value=”true”/>
</bean>

<rabbit:admin connection-factory=”rabbitConnectionFactory”/>

       <!–解决保持持久化问题–>

<rabbit:queue name=”depot_queue” durable=”true”/>

<rabbit:direct-exchange name=”depot-amount-exchange”

          xmlns=”http://www.springframework.org/schema/rabbit” durable=”true”>

<rabbit:bindings>

<rabbit:binding queue=”depot_queue” key=”amount.depot” ></rabbit:binding>

</rabbit:bindings>

</rabbit:direct-exchange>

<!– 创建rabbitTemplate 消息模板类 –>

<bean id=”rabbitTemplate” class=”org.springframework.amqp.rabbit.core.RabbitTemplate”>

<constructor-arg ref=”rabbitConnectionFactory”></constructor-arg>

<!–消息确认回调 –>

<property name=”confirmCallback” ref=”confirmCallback”/>

<property name=”returnCallback” ref=”sendReturnCallback”/><!–这种情况是存在路由键,但是没有对应队列时会调用此方法,如果没有路由键则不会调用此方法–>

</bean>

下面主要实现生产者产生消息也就是我们下订单:

@Service
@Qualifier(“mq”)
public class MqMode  implements IProDepot {
    private final static String DEPOT_RK = “amount.depot”;
    private final static String DEPOT_EXCHANGE = “depot-amount-exchange”;
    @Autowired
    RabbitTemplate rabbitTemplate;
    private static Gson gson = new Gson();
    public void processDepot(String goodsId, int amount) {
        GoodTransferVo goodTransferVo = new GoodTransferVo();
        goodTransferVo.setGoodsId(goodsId);
        goodTransferVo.setChangeAmount(amount);
        goodTransferVo.setInOrOut(false);
        String goods = gson.toJson(goodTransferVo);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//设置消息的持久化
        rabbitTemplate.send(DEPOT_EXCHANGE, DEPOT_RK,
                new Message(goods.getBytes(), messageProperties));
    }

}

//使用消息确认模式:

@Service
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
    private Logger logger = LoggerFactory.getLogger(ConfirmCallback.class);

    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info(“消息确认发送给mq成功”);
        } else {
            //处理失败的消息
            logger.info(“消息发送给mq失败,考虑重发:”+cause);//在这里我是通过消息确认失败的消息存入db,然后写一个任务进行任务处理
        }
    }

}

@Service
public class SendReturnCallback implements RabbitTemplate.ReturnCallback {

    private Logger logger = LoggerFactory.getLogger(SendReturnCallback.class);

    public void returnedMessage(Message message, int replyCode,
                                String replyText, String exchange,
                                String routingKey) {
        logger.info(“Returned replyText:”+replyText);
        logger.info(“Returned exchange:”+exchange);
        logger.info(“Returned routingKey:”+routingKey);
        String msgJson  = new String(message.getBody());
        logger.info(“Returned Message:”+msgJson);
    }

}

消费者进行消费主要逻辑:

@Service
public class ProcessDepot  implements ChannelAwareMessageListener {

    private static Logger logger = LoggerFactory.getLogger(ProcessDepot.class);

    @Autowired
    private DepotManager depotManager;

    private static Gson gson = new Gson();

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            String msg = new String(message.getBody());
            logger.info(“>>>>>>>>>>>>>>接收到消息:”+msg);
            GoodTransferVo goodTransferVo = gson.fromJson(msg,GoodTransferVo.class);
            try {
                depotManager.operDepot(goodTransferVo);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),
                        false);
                logger.info(“>>>>>>>>>>>>>>库存处理完成,应答Mq服务”);
            } catch (Exception e) {
                logger.error(e.getMessage());
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),
                        false,true);
                logger.info(“>>>>>>>>>>>>>>库存处理失败,拒绝消息,要求Mq重新派发”);
                throw e;
            }

        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }

}

    原文作者:博大的Java世界
    原文地址: https://blog.csdn.net/xiaocai9999/article/details/80870401
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞