1.为什么说消息中间件可以进行消息解耦?
举个例子:
比如我们在京东商城买东西,我们买了一双鞋子,那么库存系统则要进行减少一双的动作操作(这里是一个复杂的过程,我们这里只是简单的描述一下应用场景,复杂的场景不仔细讲,比如下单支付,支付扣款,客户接受扣款成功然后减库存),如果传统的做法我们订单成功之后,那么我们库存进行减少,此操作我们必须保证在一个事物里,这是一个原子操作,这样做在当今高并发的场景中并不使用,效率极其的低,并且应用系统之间耦合度较高。那么消息中间件可以解决我们这类的问题,我们首先将订单消息发送给mq的服务队列中,那么库存系统可以接受对应队列中进行消息处理,那么此时我们的应用系统之间并没有什么耦合联系,而都是通过相对应的队列进行消息的接收和处理,这样我们就可以将我们的系统之间的耦合度进行拆分,并且消息机制的异步处理机制可以提高我们的效率,并且集群的使用可以进行并发访问的处理等。
2.消息处理过程中我们会遇到一下几个问题?
1)怎么保证mq消息接收成功(事物或发送确认模式)
2)怎么在消息处理期间进行消息保持(持久化)
3)怎么确认消费者进行消息的消费问题(消息确认模式)
下面就举一个简单的例子:
首先我们配置相关配置文件:
<!– 配置扫描路径 –>
<context:component-scan base-package=”com.dongnaoedu”>
<context:exclude-filter type=”annotation” expression=”org.springframework.stereotype.Controller”/>
</context:component-scan>
<!– rabbitMQ配置 –>
<bean id=”rabbitConnectionFactory”
class=”org.springframework.amqp.rabbit.connection.CachingConnectionFactory”>
<constructor-arg value=”127.0.0.1″/>
<property name=”username” value=”guest”/>
<property name=”password” value=”guest”/>
<property name=”channelCacheSize” value=”8″/>
<property name=”port” value=”5672″></property>
<!– 发布确认必须配置在CachingConnectionFactory上 –>
<property name=”publisherConfirms” value=”true”/>
</bean>
<rabbit:admin connection-factory=”rabbitConnectionFactory”/>
<!–解决保持持久化问题–>
<rabbit:queue name=”depot_queue” durable=”true”/>
<rabbit:direct-exchange name=”depot-amount-exchange”
xmlns=”http://www.springframework.org/schema/rabbit” durable=”true”>
<rabbit:bindings>
<rabbit:binding queue=”depot_queue” key=”amount.depot” ></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!– 创建rabbitTemplate 消息模板类 –>
<bean id=”rabbitTemplate” class=”org.springframework.amqp.rabbit.core.RabbitTemplate”>
<constructor-arg ref=”rabbitConnectionFactory”></constructor-arg>
<!–消息确认回调 –>
<property name=”confirmCallback” ref=”confirmCallback”/>
<property name=”returnCallback” ref=”sendReturnCallback”/><!–这种情况是存在路由键,但是没有对应队列时会调用此方法,如果没有路由键则不会调用此方法–>
</bean>
下面主要实现生产者产生消息也就是我们下订单:
@Service
@Qualifier(“mq”)
public class MqMode implements IProDepot {
private final static String DEPOT_RK = “amount.depot”;
private final static String DEPOT_EXCHANGE = “depot-amount-exchange”;
@Autowired
RabbitTemplate rabbitTemplate;
private static Gson gson = new Gson();
public void processDepot(String goodsId, int amount) {
GoodTransferVo goodTransferVo = new GoodTransferVo();
goodTransferVo.setGoodsId(goodsId);
goodTransferVo.setChangeAmount(amount);
goodTransferVo.setInOrOut(false);
String goods = gson.toJson(goodTransferVo);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//设置消息的持久化
rabbitTemplate.send(DEPOT_EXCHANGE, DEPOT_RK,
new Message(goods.getBytes(), messageProperties));
}
}
//使用消息确认模式:
@Service
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
private Logger logger = LoggerFactory.getLogger(ConfirmCallback.class);
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info(“消息确认发送给mq成功”);
} else {
//处理失败的消息
logger.info(“消息发送给mq失败,考虑重发:”+cause);//在这里我是通过消息确认失败的消息存入db,然后写一个任务进行任务处理
}
}
}
@Service
public class SendReturnCallback implements RabbitTemplate.ReturnCallback {
private Logger logger = LoggerFactory.getLogger(SendReturnCallback.class);
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange,
String routingKey) {
logger.info(“Returned replyText:”+replyText);
logger.info(“Returned exchange:”+exchange);
logger.info(“Returned routingKey:”+routingKey);
String msgJson = new String(message.getBody());
logger.info(“Returned Message:”+msgJson);
}
}
消费者进行消费主要逻辑:
@Service
public class ProcessDepot implements ChannelAwareMessageListener {
private static Logger logger = LoggerFactory.getLogger(ProcessDepot.class);
@Autowired
private DepotManager depotManager;
private static Gson gson = new Gson();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
String msg = new String(message.getBody());
logger.info(“>>>>>>>>>>>>>>接收到消息:”+msg);
GoodTransferVo goodTransferVo = gson.fromJson(msg,GoodTransferVo.class);
try {
depotManager.operDepot(goodTransferVo);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),
false);
logger.info(“>>>>>>>>>>>>>>库存处理完成,应答Mq服务”);
} catch (Exception e) {
logger.error(e.getMessage());
channel.basicNack(message.getMessageProperties().getDeliveryTag(),
false,true);
logger.info(“>>>>>>>>>>>>>>库存处理失败,拒绝消息,要求Mq重新派发”);
throw e;
}
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}