消息队列
前言:
说实话,最近还是比较忙的,手上素材倒是一大把,但是大多只是初步整理了。但是博客这种东西还是要写的,果然后面还是要放低一下排版要求(扩展性的一些东西也少提一些)。
简介:
消息队列这个东西,其实网上的资料还是很多的。我就简单说一些自己的认识与源代码哈。
演变:
我是很喜欢了解技术演进的,因为演进的过程展现了前辈们的智慧。
最早的程序串行执行就不说了。
程序调用中的方法调用,往往调用方与被调用方都存在与同一内存空间(从Java角度说,都是在同一JVM中),所以方法调用的逻辑不会太复杂。简单来说,就是调用方(Java中其实就是目标对象)将被调用方压入Java虚拟机栈,从而执行(详见JVM)。或者等我什么时候,把我有关JVM的笔记贴出来(嘿嘿)。
后来呢,就是出现了对非本地JVM方法调用的需求(举个例子,我需要调用第三方的方法,如果每次都要双方都写一个专门的处理服务(在当时,也许接口更为准确),比较麻烦),那么就有了RPC与RMI的一个需要。那么在Java中就出现了一个stub的技术,定义好后,相关方法就像调用本地一样(详见《Head First Java》相关章节)。当然了,这个时候已经有了中间件的概念了,所以也就有了CORBA等框架。谈到中间件,感兴趣的,可以去查询一下当时主流的中间件分类(如RPC,RMI,MOM,TPM,ORB)。
那么到了现在呢,分布式系统的通信可以按照同步与异步分为两大支柱。之所以这么理解,是因为分布式系统往往同步通信与异步通信都是需要的。简单提一下,同步通信业务逻辑相对简单,实现快速,可以实时获得回应,但耦合度较高。异步通信耦合度低,并可以进行消息堆积,消峰,但无法实时获取回应,业务逻辑复杂,从而提高系统复杂度(尤其当一条业务线与多层异步逻辑)等。之后有机会,我会举例细述。
当然了,在本篇中,只简单谈一下异步通信的主流实现-消息队列。
选择:
选择方面,我就不多说了,目前只用过RabbitMq,RocketMq,Kafka。网上有关消息队列选择的文章很多,很细致,我就不赘述了。
代码实现:
这里贴出来的都是实际生产代码(如果内部版本也算的话,嘿嘿),所以如果有一些不是很熟悉的类,请查看import,是否是项目自身的类。或者也可以直接询问我。
初步实现:
这里的初步实现,是根据RabbitMq的原生方法进行编写(详细参考:《RabbitMQ实战指南》第一章的两个代码清单及第二章的相关解释)。
producer:
package com.renewable.gateway.rabbitmq.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.renewable.gateway.pojo.Terminal;
import com.renewable.gateway.util.JsonUtil;
import com.renewable.gateway.util.PropertiesUtil;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import static com.renewable.gateway.common.constant.RabbitmqConstant.*;
/**
* @Description:
* @Author: jarry
*/
@Component("TerminalProducer")
public class TerminalProducer {
private static final String IP_ADDRESS = PropertiesUtil.getProperty(RABBITMQ_HOST);
private static final int PORT = Integer.parseInt(PropertiesUtil.getProperty(RABBITMQ_PORT));
private static final String USER_NAME = PropertiesUtil.getProperty(RABBITMQ_USER_NAME);
private static final String USER_PASSWORD = PropertiesUtil.getProperty(RABBITMQ_USER_PASSWORD);
private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE = "exchange-terminal-config-terminal2centcontrol";
private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE = "queue-terminal-config-terminal2centcontrol";
private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE = "topic";
private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY = "terminal.config.terminal2centcontrol";
private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY = "terminal.config.terminal2centcontrol";
public static void sendTerminalConfig(Terminal terminal) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername(USER_NAME);
factory.setPassword(USER_PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE, true, false, null);
channel.queueDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, true, false, false, null);
channel.queueBind(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY);
String terminalStr = JsonUtil.obj2String(terminal);
channel.basicPublish(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY, MessageProperties.PERSISTENT_TEXT_PLAIN, terminalStr.getBytes());
channel.close();
connection.close();
}
}
consumer:
package com.renewable.gateway.rabbitmq.consumer;
import com.rabbitmq.client.*;
import com.renewable.gateway.common.GuavaCache;
import com.renewable.gateway.common.ServerResponse;
import com.renewable.gateway.pojo.Terminal;
import com.renewable.gateway.service.ITerminalService;
import com.renewable.gateway.util.JsonUtil;
import com.renewable.gateway.util.PropertiesUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import static com.renewable.gateway.common.constant.CacheConstant.TERMINAL_MAC;
import static com.renewable.gateway.common.constant.RabbitmqConstant.*;
/**
* @Description:
* @Author: jarry
*/
@Component
public class TerminalConsumer {
@Autowired
private ITerminalService iTerminalService;
private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_EXCHANGE = "exchange-terminal-config-centcontrol2terminal";
private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_QUEUE = "queue-terminal-config-centcontrol2terminal";
private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_ROUTINETYPE = "topic";
private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_BINDINGKEY = "terminal.config.centcontrol2terminal";
@PostConstruct
public void messageOnTerminal() throws IOException, TimeoutException, InterruptedException {
Address[] addresses = new Address[]{
new Address(PropertiesUtil.getProperty(RABBITMQ_HOST))
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(PropertiesUtil.getProperty(RABBITMQ_USER_NAME));
factory.setPassword(PropertiesUtil.getProperty(RABBITMQ_USER_PASSWORD));
Connection connection = factory.newConnection(addresses);
final Channel channel = connection.createChannel();
channel.basicQos(64); // 设置客户端最多接收未ack的消息个数,避免客户端被冲垮(常用于限流)
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
// 1.接收数据,并反序列化出对象
Terminal receiveTerminalConfig = JsonUtil.string2Obj(new String(body), Terminal.class);
// 2.验证是否是该终端的消息的消息 // 避免ACK其他终端的消息
if (receiveTerminalConfig.getMac() == GuavaCache.getKey(TERMINAL_MAC)) {
// 业务代码
ServerResponse response = iTerminalService.receiveTerminalFromRabbitmq(receiveTerminalConfig);
if (response.isSuccess()) {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
};
channel.basicConsume(TERMINAL_CONFIG_CENTCONTROL2TERMINAL_QUEUE, consumer);
// 等回调函数执行完毕后,关闭资源
// 想了想还是不关闭资源,保持一个监听的状态,从而确保配置的实时更新
// TimeUnit.SECONDS.sleep(5);
// channel.close();
// connection.close();
}
}
小结:
这是早期写的一个demo代码,是直接参照源码的。如果是学习RabbitMq的话,还是建议手写一下这种比较原始的程序,了解其中每个方法的作用,从而理解RabbitMq的思路。如果条件允许的话,还可以查看一下RabbitMq的底层通信协议-AMQP(如果不方便下载,也可以私聊我)。
当然,此处可以通过@Value直接导入相关配置(乃至到了SpringCloud后,可以通过@Refreshscope等实现配置自动更新)。
与Spring集成:
producer:
package com.renewable.terminal.rabbitmq.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.renewable.terminal.pojo.Terminal;
import com.renewable.terminal.util.JsonUtil;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Description:
* @Author: jarry
*/
@Component("TerminalProducer")
public class TerminalProducer {
private static String rabbitmqHost = "47.92.249.250";
private static String rabbitmqUser = "admin";
private static String rabbitmqPassword = "123456";
private static String rabbitmqPort = "5672";
private static final String IP_ADDRESS = rabbitmqHost;
private static final int PORT = Integer.parseInt(rabbitmqPort);
private static final String USER_NAME = rabbitmqUser;
private static final String USER_PASSWORD = rabbitmqPassword;
private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE = "exchange-terminal-config-terminal2centcontrol";
private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE = "queue-terminal-config-terminal2centcontrol";
private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE = "topic";
private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY = "terminal.config.terminal2centcontrol";
private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY = "terminal.config.terminal2centcontrol";
public static void sendTerminalConfig(Terminal terminal) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername(USER_NAME);
factory.setPassword(USER_PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE, true, false, null);
channel.queueDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, true, false, false, null);
channel.queueBind(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY);
String terminalStr = JsonUtil.obj2String(terminal);
channel.basicPublish(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY, MessageProperties.PERSISTENT_TEXT_PLAIN, terminalStr.getBytes());
channel.close();
connection.close();
}
}
consumer:
package com.renewable.terminal.rabbitmq.consumer;
import com.rabbitmq.client.*;
import com.renewable.terminal.Init.SerialSensorInit;
import com.renewable.terminal.Init.TerminalInit;
import com.renewable.terminal.common.GuavaCache;
import com.renewable.terminal.common.ServerResponse;
import com.renewable.terminal.pojo.Terminal;
import com.renewable.terminal.service.ITerminalService;
import com.renewable.terminal.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import static com.renewable.terminal.common.constant.CacheConstant.TERMINAL_ID;
import static com.renewable.terminal.common.constant.CacheConstant.TERMINAL_MAC;
/**
* @Description:
* @Author: jarry
*/
@Component
@Slf4j
public class TerminalConsumer {
@Autowired
private ITerminalService iTerminalService;
@Autowired
private SerialSensorInit serialSensorInit;
private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE = "exchange-terminal-config-centcontrol2terminal";
private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE = "queue-terminal-config-centcontrol2terminal";
private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE = "topic";
private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY = "terminal.config.centcontrol2terminal";
//TODO_FINISHED 2019.05.16 完成终端机TerminalConfig的接收与判断(ID是否为长随机数,是否需要重新分配)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, declare = "true"),
exchange = @Exchange(value = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, declare = "true", type = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE),
key = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY
))
@RabbitHandler
public void messageOnTerminal(@Payload String terminalStr, @Headers Map<String, Object> headers, Channel channel) throws IOException {
Terminal terminal = JsonUtil.string2Obj(terminalStr, Terminal.class);
if (terminal == null){
log.info("consume the null terminal config !");
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
if (!GuavaCache.getKey(TERMINAL_MAC).equals(terminal.getMac())){
log.info("refuse target terminal with mac({}) configure to this terminal with mac({}).",terminal.getMac(), GuavaCache.getKey(TERMINAL_MAC));
return;
}
// 2.业务逻辑
ServerResponse response = iTerminalService.receiveTerminalFromRabbitmq(terminal);
log.info("start serialSensorInit");
serialSensorInit.init();
// 3.确认
if (response.isSuccess()) {
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
}
}
配置:
# rabbitmq 消费端配置
spring:
rabbitmq:
listener:
simple:
concurrency: 5
max-concurrency: 10
acknowledge-mode: manual
# 限流
prefetch: 1
host: "localhost"
port: 5672
username: "admin"
password: "123456"
virtual-host: "/"
connection-timeout: 15000
小结:
这里不得不赞一下Spring,它通过提供RabbitMq地封装API-ampq,极大地简化了消息队列的代码。其实上述方法就是通过ampq的注解与yml配置来迅速实现RabbitMq的使用。
当然,这里还有很多的提升空间。比如说,通过@Bean注解(建立目标配置)与公用方法提取,可以有效提高代码复用性。
简单扩展(与SpringStream集成):
这段代码并不是线上的代码,而是慕课网学习时留下的代码。主要实际生产中并没有使用SpringStream,但这确实是认识事件驱动模型的要给很好途径。
producer:
“`java
package com.imooc.order.message;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
/**
* @Description:
* @Author: jarry
*/
public interface StreamClient {
String INPUT = "myMessage";
String INPUT2 = "myMessageACK";
@Input(StreamClient.INPUT)
SubscribableChannel input();
@Output(StreamClient.INPUT)
MessageChannel output();
@Input(StreamClient.INPUT2)
SubscribableChannel input2();
@Output(StreamClient.INPUT2)
MessageChannel output2();
}
“`
package com.imooc.order;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @Description:
* @Author: jarry
*/
@Component
public class MqSenderTest extends OrderApplicationTests{
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void send(){
amqpTemplate.convertAndSend("myQueue", "now: " + new Date());
Assert.assertNotNull(new Date());
}
}
consumer:
package com.imooc.order.message;
import com.imooc.order.dto.OrderDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
/**
* @Description:
* @Author: jarry
*/
@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {
// @StreamListener(StreamClient.INPUT)
// public void process(Object message){
// log.info("StreamReceiver: {}", message);
// }
@StreamListener(StreamClient.INPUT)
// 增加以下注解,可以在INPUT消息消费后,返回一个消息。说白了就是RabbitMq对消息消费后的确认回调函数(貌似叫这个,意思就这样,之后细查)
@SendTo(StreamClient.INPUT2)
public String process(OrderDTO message){
log.info("StreamReceiver: {}", message);
return "received.";
}
@StreamListener(StreamClient.INPUT2)
public void process2(String message){
log.info("StreamReceiver2: {}", message);
}
}
总结:
在学习技术的过程中,一方面不断地感受到自己对技术了解的不足,另一方面则是发现更重要的是系统设计中技术选型的权衡。