消息队列

消息队列

前言:

说实话,最近还是比较忙的,手上素材倒是一大把,但是大多只是初步整理了。但是博客这种东西还是要写的,果然后面还是要放低一下排版要求(扩展性的一些东西也少提一些)。

简介:

消息队列这个东西,其实网上的资料还是很多的。我就简单说一些自己的认识与源代码哈。

演变:

我是很喜欢了解技术演进的,因为演进的过程展现了前辈们的智慧。

最早的程序串行执行就不说了。

程序调用中的方法调用,往往调用方与被调用方都存在与同一内存空间(从Java角度说,都是在同一JVM中),所以方法调用的逻辑不会太复杂。简单来说,就是调用方(Java中其实就是目标对象)将被调用方压入Java虚拟机栈,从而执行(详见JVM)。或者等我什么时候,把我有关JVM的笔记贴出来(嘿嘿)。

后来呢,就是出现了对非本地JVM方法调用的需求(举个例子,我需要调用第三方的方法,如果每次都要双方都写一个专门的处理服务(在当时,也许接口更为准确),比较麻烦),那么就有了RPC与RMI的一个需要。那么在Java中就出现了一个stub的技术,定义好后,相关方法就像调用本地一样(详见《Head First Java》相关章节)。当然了,这个时候已经有了中间件的概念了,所以也就有了CORBA等框架。谈到中间件,感兴趣的,可以去查询一下当时主流的中间件分类(如RPC,RMI,MOM,TPM,ORB)。

那么到了现在呢,分布式系统的通信可以按照同步与异步分为两大支柱。之所以这么理解,是因为分布式系统往往同步通信与异步通信都是需要的。简单提一下,同步通信业务逻辑相对简单,实现快速,可以实时获得回应,但耦合度较高。异步通信耦合度低,并可以进行消息堆积,消峰,但无法实时获取回应,业务逻辑复杂,从而提高系统复杂度(尤其当一条业务线与多层异步逻辑)等。之后有机会,我会举例细述。

当然了,在本篇中,只简单谈一下异步通信的主流实现-消息队列。

选择:

选择方面,我就不多说了,目前只用过RabbitMq,RocketMq,Kafka。网上有关消息队列选择的文章很多,很细致,我就不赘述了。

代码实现:

这里贴出来的都是实际生产代码(如果内部版本也算的话,嘿嘿),所以如果有一些不是很熟悉的类,请查看import,是否是项目自身的类。或者也可以直接询问我。

初步实现:

这里的初步实现,是根据RabbitMq的原生方法进行编写(详细参考:《RabbitMQ实战指南》第一章的两个代码清单及第二章的相关解释)。

producer:​​


    package com.renewable.gateway.rabbitmq.producer;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    import com.renewable.gateway.pojo.Terminal;
    import com.renewable.gateway.util.JsonUtil;
    import com.renewable.gateway.util.PropertiesUtil;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import static com.renewable.gateway.common.constant.RabbitmqConstant.*;
    
    /**
     * @Description:
     * @Author: jarry
     */
    @Component("TerminalProducer")
    public class TerminalProducer {
    
        private static final String IP_ADDRESS = PropertiesUtil.getProperty(RABBITMQ_HOST);
        private static final int PORT = Integer.parseInt(PropertiesUtil.getProperty(RABBITMQ_PORT));
        private static final String USER_NAME = PropertiesUtil.getProperty(RABBITMQ_USER_NAME);
        private static final String USER_PASSWORD = PropertiesUtil.getProperty(RABBITMQ_USER_PASSWORD);
    
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE = "exchange-terminal-config-terminal2centcontrol";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE = "queue-terminal-config-terminal2centcontrol";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE = "topic";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY = "terminal.config.terminal2centcontrol";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY = "terminal.config.terminal2centcontrol";
    
        public static void sendTerminalConfig(Terminal terminal) throws IOException, TimeoutException, InterruptedException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(IP_ADDRESS);
            factory.setPort(PORT);
            factory.setUsername(USER_NAME);
            factory.setPassword(USER_PASSWORD);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE, true, false, null);
            channel.queueDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, true, false, false, null);
            channel.queueBind(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY);
    
            String terminalStr = JsonUtil.obj2String(terminal);
            channel.basicPublish(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY, MessageProperties.PERSISTENT_TEXT_PLAIN, terminalStr.getBytes());
    
            channel.close();
            connection.close();
        }
    }

consumer:


    package com.renewable.gateway.rabbitmq.consumer;

    import com.rabbitmq.client.*;
    import com.renewable.gateway.common.GuavaCache;
    import com.renewable.gateway.common.ServerResponse;
    import com.renewable.gateway.pojo.Terminal;
    import com.renewable.gateway.service.ITerminalService;
    import com.renewable.gateway.util.JsonUtil;
    import com.renewable.gateway.util.PropertiesUtil;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;

    import javax.annotation.PostConstruct;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    import static com.renewable.gateway.common.constant.CacheConstant.TERMINAL_MAC;
    import static com.renewable.gateway.common.constant.RabbitmqConstant.*;

    /**
     * @Description:
     * @Author: jarry
     */
    @Component
    public class TerminalConsumer {
    
        @Autowired
        private ITerminalService iTerminalService;
    
        private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_EXCHANGE = "exchange-terminal-config-centcontrol2terminal";
        private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_QUEUE = "queue-terminal-config-centcontrol2terminal";
        private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_ROUTINETYPE = "topic";
        private static final String TERMINAL_CONFIG_CENTCONTROL2TERMINAL_BINDINGKEY = "terminal.config.centcontrol2terminal";
    
        @PostConstruct
        public void messageOnTerminal() throws IOException, TimeoutException, InterruptedException {
            Address[] addresses = new Address[]{
                    new Address(PropertiesUtil.getProperty(RABBITMQ_HOST))
            };
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername(PropertiesUtil.getProperty(RABBITMQ_USER_NAME));
            factory.setPassword(PropertiesUtil.getProperty(RABBITMQ_USER_PASSWORD));
    
            Connection connection = factory.newConnection(addresses);
            final Channel channel = connection.createChannel();
            channel.basicQos(64);   // 设置客户端最多接收未ack的消息个数,避免客户端被冲垮(常用于限流)
            Consumer consumer = new DefaultConsumer(channel) {
    
    
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    // 1.接收数据,并反序列化出对象
                    Terminal receiveTerminalConfig = JsonUtil.string2Obj(new String(body), Terminal.class);
    
                    // 2.验证是否是该终端的消息的消息     // 避免ACK其他终端的消息
                    if (receiveTerminalConfig.getMac() == GuavaCache.getKey(TERMINAL_MAC)) {
                        // 业务代码
                        ServerResponse response = iTerminalService.receiveTerminalFromRabbitmq(receiveTerminalConfig);
                        if (response.isSuccess()) {
                            channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                    }
                }
            };
            channel.basicConsume(TERMINAL_CONFIG_CENTCONTROL2TERMINAL_QUEUE, consumer);
            // 等回调函数执行完毕后,关闭资源
            // 想了想还是不关闭资源,保持一个监听的状态,从而确保配置的实时更新
            //        TimeUnit.SECONDS.sleep(5);
            //        channel.close();
            //        connection.close();
        }
    }

小结:

这是早期写的一个demo代码,是直接参照源码的。如果是学习RabbitMq的话,还是建议手写一下这种比较原始的程序,了解其中每个方法的作用,从而理解RabbitMq的思路。如果条件允许的话,还可以查看一下RabbitMq的底层通信协议-AMQP(如果不方便下载,也可以私聊我)。

当然,此处可以通过@Value直接导入相关配置(乃至到了SpringCloud后,可以通过@Refreshscope等实现配置自动更新)。

与Spring集成:

producer:


    package com.renewable.terminal.rabbitmq.producer;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    import com.renewable.terminal.pojo.Terminal;
    import com.renewable.terminal.util.JsonUtil;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Description:
     * @Author: jarry
     */
    @Component("TerminalProducer")
    public class TerminalProducer {
    
        private static String rabbitmqHost = "47.92.249.250";
        private static String rabbitmqUser = "admin";
        private static String rabbitmqPassword = "123456";
        private static String rabbitmqPort = "5672";
    
        private static final String IP_ADDRESS = rabbitmqHost;
        private static final int PORT = Integer.parseInt(rabbitmqPort);
        private static final String USER_NAME = rabbitmqUser;
        private static final String USER_PASSWORD = rabbitmqPassword;
    
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE = "exchange-terminal-config-terminal2centcontrol";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE = "queue-terminal-config-terminal2centcontrol";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE = "topic";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY = "terminal.config.terminal2centcontrol";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY = "terminal.config.terminal2centcontrol";
    
        public static void sendTerminalConfig(Terminal terminal) throws IOException, TimeoutException, InterruptedException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(IP_ADDRESS);
            factory.setPort(PORT);
            factory.setUsername(USER_NAME);
            factory.setPassword(USER_PASSWORD);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE, true, false, null);
            channel.queueDeclare(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, true, false, false, null);
            channel.queueBind(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY);
    
            String terminalStr = JsonUtil.obj2String(terminal);
            channel.basicPublish(TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINGKEY, MessageProperties.PERSISTENT_TEXT_PLAIN, terminalStr.getBytes());
    
            channel.close();
            connection.close();
        }
    }

consumer:


    package com.renewable.terminal.rabbitmq.consumer;
    
    import com.rabbitmq.client.*;
    import com.renewable.terminal.Init.SerialSensorInit;
    import com.renewable.terminal.Init.TerminalInit;
    import com.renewable.terminal.common.GuavaCache;
    import com.renewable.terminal.common.ServerResponse;
    import com.renewable.terminal.pojo.Terminal;
    import com.renewable.terminal.service.ITerminalService;
    import com.renewable.terminal.util.JsonUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.io.IOException;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    import static com.renewable.terminal.common.constant.CacheConstant.TERMINAL_ID;
    import static com.renewable.terminal.common.constant.CacheConstant.TERMINAL_MAC;
    
    /**
     * @Description:
     * @Author: jarry
     */
    @Component
    @Slf4j
    public class TerminalConsumer {
    
        @Autowired
        private ITerminalService iTerminalService;
    
        @Autowired
        private SerialSensorInit serialSensorInit;
    
    
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE = "exchange-terminal-config-centcontrol2terminal";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE = "queue-terminal-config-centcontrol2terminal";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE = "topic";
        private static final String TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY = "terminal.config.centcontrol2terminal";
    
        //TODO_FINISHED 2019.05.16 完成终端机TerminalConfig的接收与判断(ID是否为长随机数,是否需要重新分配)
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_QUEUE, declare = "true"),
                exchange = @Exchange(value = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_EXCHANGE, declare = "true", type = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_ROUTINETYPE),
                key = TERMINAL_CONFIG_TERMINAL2CENTCONTROL_BINDINGKEY
        ))
        @RabbitHandler
        public void messageOnTerminal(@Payload String terminalStr, @Headers Map<String, Object> headers, Channel channel) throws IOException {
    
            Terminal terminal = JsonUtil.string2Obj(terminalStr, Terminal.class);
            if (terminal == null){
                log.info("consume the null terminal config !");
                Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
                channel.basicAck(deliveryTag, false);
            }
            if (!GuavaCache.getKey(TERMINAL_MAC).equals(terminal.getMac())){
                log.info("refuse target terminal with mac({}) configure to this terminal with mac({}).",terminal.getMac(), GuavaCache.getKey(TERMINAL_MAC));
                return;
            }
    
            // 2.业务逻辑
            ServerResponse response = iTerminalService.receiveTerminalFromRabbitmq(terminal);
            log.info("start serialSensorInit");
            serialSensorInit.init();
    
            // 3.确认
            if (response.isSuccess()) {
                Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
                channel.basicAck(deliveryTag, false);
            }
        }
    }

配置:


    # rabbitmq 消费端配置
    spring:
      rabbitmq:
        listener:
          simple:
            concurrency: 5
            max-concurrency: 10
            acknowledge-mode: manual
            # 限流
            prefetch: 1
        host: "localhost"
        port: 5672
        username: "admin"
        password: "123456"
        virtual-host: "/"
        connection-timeout: 15000

小结:

这里不得不赞一下Spring,它通过提供RabbitMq地封装API-ampq,极大地简化了消息队列的代码。其实上述方法就是通过ampq的注解与yml配置来迅速实现RabbitMq的使用。

当然,这里还有很多的提升空间。比如说,通过@Bean注解(建立目标配置)与公用方法提取,可以有效提高代码复用性。

简单扩展(与SpringStream集成):

这段代码并不是线上的代码,而是慕课网学习时留下的代码。主要实际生产中并没有使用SpringStream,但这确实是认识事件驱动模型的要给很好途径。

producer:

“`java

package com.imooc.order.message;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * @Description:
 * @Author: jarry
 */
public interface StreamClient {

    String INPUT = "myMessage";
    String INPUT2 = "myMessageACK";


    @Input(StreamClient.INPUT)
    SubscribableChannel input();

    @Output(StreamClient.INPUT)
    MessageChannel output();

    @Input(StreamClient.INPUT2)
    SubscribableChannel input2();

    @Output(StreamClient.INPUT2)
    MessageChannel output2();
}

​“`


    package com.imooc.order;
    
    import org.junit.Assert;
    import org.junit.Test;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * @Description:
     * @Author: jarry
     */
    @Component
    public class MqSenderTest extends OrderApplicationTests{
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        @Test
        public void send(){
            amqpTemplate.convertAndSend("myQueue", "now: " + new Date());
            Assert.assertNotNull(new Date());
        }
    }

consumer:


    package com.imooc.order.message;
    
    import com.imooc.order.dto.OrderDTO;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.messaging.handler.annotation.SendTo;
    import org.springframework.stereotype.Component;
    
    /**
     * @Description:
     * @Author: jarry
     */
    @Component
    @EnableBinding(StreamClient.class)
    @Slf4j
    public class StreamReceiver {
    
    //  @StreamListener(StreamClient.INPUT)
    //  public void process(Object message){
    //      log.info("StreamReceiver: {}", message);
    //  }
    
        @StreamListener(StreamClient.INPUT)
        // 增加以下注解,可以在INPUT消息消费后,返回一个消息。说白了就是RabbitMq对消息消费后的确认回调函数(貌似叫这个,意思就这样,之后细查)
        @SendTo(StreamClient.INPUT2)
        public String process(OrderDTO message){
            log.info("StreamReceiver: {}", message);
            return "received.";
        }
    
        @StreamListener(StreamClient.INPUT2)
        public void process2(String message){
            log.info("StreamReceiver2: {}", message);
        }
    }

总结:

在学习技术的过程中,一方面不断地感受到自己对技术了解的不足,另一方面则是发现更重要的是系统设计中技术选型的权衡。

点赞