【RocketMQ】RocketMQ快速入门

RocketMQ的介绍

RocketMQ版本发展

Metaq1.x是RocketMQ前身的第一个版本,本质上把Kafka做了一次java版本的重写(Kafka是scala语言开发)。

Meta2.x,主要是对存储部分进行了优化,因为kafka的数据存储,它的partition是一个全量的复制,在阿里、在淘宝的这种海量交易。Kafka这种机制的横向拓展是非常不好的。

2012年阿里同时把Meta2.0从阿里内部开源出来,取名RocketMQ,同时为了命名上的规范(版本上延续),所以这个就是RocketMQ3.0。

现在RocketMQ主要维护的是4.x的版本,也是大家使用得最多的版本,2017年从Apache顶级项目毕业。

阿里内部项目的使用

那么在阿里公司内部,原则上遵守开源共建原则。RocketMQ项目只维护核心功能,且去除了所有其他运行时依赖,核心功能最简化。每个BU(Business Unit业务单元)的个性化需求都在RocketMQ项目之上进行深度定制。RocketMQ向其他BU提供的仅仅是Jar包,例如要定制一个Broker,那么只需要依赖rocketmq-broker这jar包即可,可通过API进行交互, 如果定制client,则依赖rocketmq-client这个jar包,对其提供的api进行再封装。

在RocketMQ项目基础上几个常用的项目如下:

  • com.taobao.metaq v3.0 = RocketMQ + 淘宝个性化需求:为淘宝应用提供消息服务
  • com.alipay.zpullmsg v1.0 = RocketMQ + 支付宝个性化需求:为支付宝应用提供消息服务
  • com.alibaba.commonmq v1.0 = Notify + RocketMQ + B2B个性化需求:为B2B应用提供消息服务

展望未来

从阿里负责RocketMQ的架构核心人员的信息来看,阿里内部一直全力拓展RocketMQ。

2017年10月份,OpenMessaging项目由阿里巴巴发起,与雅虎、滴滴出行、Streamlio公司共同参与创立, 项目意在创立厂商无关、平台无关的分布式消息及流处理领域的应用开发标准。同时OpenMessaging入驻Linux基金会。

OpenMessaging项目已经开始在Apache RocketMQ中率先落地,并推广至整个阿里云平台.

另外RocketMQ5的版本也在内部推进,主要的方向是Cloud Native(云原生)。

另外还要讲一下Apache RocketMQ的商业版本,Aliware MQ在微服务、流计算、IoT、异步解耦、数据同步等场景有非常广泛的运用。

《【RocketMQ】RocketMQ快速入门》

RocketMQ的物理架构

消息队列RocketMQ是阿里巴巴集团基于高可用分布式集群技术,自主研发的云正式商用的专业消息中间件,既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,是阿里巴巴双11使用的核心产品。

RocketMQ的设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储(Broker)、消息消费,整体设计追求简单与性能第一。

《【RocketMQ】RocketMQ快速入门》

NameServer

NameServer是整个RocketMQ的“大脑”,它是RocketMQ的服务注册中心,所以RocketMQ需要先启动NameServer再启动Broker。

Broker在启动时向所有NameServer注册(主要是服务器地址等),生产者在发送消息之前先从NameServer获取Broker服务器地址列表(消费者一样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。

NameServer与每台Broker服务保持长连接,并间隔30S检查Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。这样就可以实现RocketMQ的高可用。

生产者(Producer)

生产者:也称为消息发布者,负责生产并发送消息至RocketMQ。

消费者(Consumer)

消费者:也称为消息订阅者,负责从RocketMQ接收并消费消息。

消息(Message)

消息:生产或消费的数据,对于RocketMQ来说,消息就是字节数组。

主机(Broker)

RocketMQ的核心,用于暂存和传输消息。

物理架构中的整体运转

  1. NameServer先启动
  2. Broker启动时向NameServer注册
  3. 生产者在发送某个主题的消息之前先从NamerServer获取Broker服务器地址列表(有可能是集群),然后根据负载均衡算法从列表中选择一台Broker进行消息发送。
  4. NameServer与每台Broker服务器保持长连接,并间隔30S检测Broker是否存活,如果检测到Broker宕机(使用心跳机制,如果检测超过120S),则从路由注册表中将其移除。
  5. 消费者在订阅某个主题的消息之前从NamerServer获取Broker服务器地址列表(有可能是集群),但是消费者选择从Broker中订阅消息,订阅规则由Broker配置决定。

RocketMQ的概念模型

分组(Group)

生产者:标识发送同一类消息的Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。主要作用用于事务消息,事务消息中如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其它producer,确认这条消息应该commit还是rollback。

消费者:标识一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。

消费进度以Consumer Group为粒度管理,不同Consumer Group之间消费进度彼此不受影响,即消息A被Consumer Group1消费过,也会再给Consumer Group2消费。

主题(Topic)

标识一类消息的逻辑名字,消息的逻辑管理单位。无论消息生产还是消费,都需要指定Topic。

区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息。

标签(Tag)

RocketMQ支持给在发送的时候给topic打tag,同一个topic的消息虽然逻辑管理是一样的。但是消费topic的时候,如果你消费订阅的时候指定的是tagA,那么tagB的消息将不会投递。

消息队列(Message Queue)

简称Queue或Q。消息物理管理单位。一个Topic将有若干个Q。若一个Topic创建在不同的Broker,则不同的broker上都有若干Q,消息将物理地存储落在不同Broker结点上,具有水平扩展的能力。

无论生产者还是消费者,实际的生产和消费都是针对Q级别。例如Producer发送消息的时候,会预先选择(默认轮询)好该Topic下面的某一条Q发送;Consumer消费的时候也会负载均衡地分配若干个Q,只拉取对应Q的消息。

每一条message queue均对应一个文件,这个文件存储了实际消息的索引信息。并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来。

偏移量(Offset)

RocketMQ中,有很多offset的概念。一般我们只关心暴露到客户端的offset。不指定的话,就是指Message Queue下面的offset。

Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是offset,Message queue中的max offset表示消息的最大offset。

Consumer offset可以理解为标记Consumer Group在一条逻辑Message Queue上,消息消费到哪里即消费进度。但从源码上看,这个数值是消费过的最新消费的消息offset+1,即实际上表示的是下次拉取的offset位置。

普通消息的发送与消费

消息发送

发送同步消息

同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。

《【RocketMQ】RocketMQ快速入门》

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

package com.morris.rocketmq.simple;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;
import static com.morris.rocketmq.util.Contant.PRODUCER_GROUP;

/** * 同步发送消息 */
public class SynProducer { 

    public static void main(String[] args) throws Exception { 
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer(PRODUCER_GROUP);
        // Specify name server addresses.
        producer.setNamesrvAddr(NAME_SERVER_ADDRESS);
        producer.setSendMsgTimeout(6000);
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) { 
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

运行结果如下:

SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2DFEB0000, offsetMsgId=AC12FA2E00002A9F0000000000012E3A, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=1], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0030001, offsetMsgId=AC12FA2E00002A9F0000000000012EF8, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=2], queueOffset=101]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0060002, offsetMsgId=AC12FA2E00002A9F0000000000012FB6, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=3], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0090003, offsetMsgId=AC12FA2E00002A9F0000000000013074, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=0], queueOffset=99]
... ...

Message ID:消息的全局唯一标识(内部机制的ID生成是使用机器IP和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑Key),由消息队列MQ系统自动生成,唯一标识某条消息。

SendStatus:发送的标识。成功,失败等

Queue:相当于是Topic的分区;用于并行发送和接收消息

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

《【RocketMQ】RocketMQ快速入门》

消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。

package com.morris.rocketmq.simple;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;
import static com.morris.rocketmq.util.Contant.PRODUCER_GROUP;

/** * 异步发送消息 */
public class AsyncProducer { 
    public static void main(String[] args) throws Exception { 
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        // Specify name server addresses.
        producer.setNamesrvAddr(NAME_SERVER_ADDRESS);
        //Launch the instance.
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        
        int messageCount = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) { 
            try { 
                final int index = i;
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() { 
                    @Override
                    public void onSuccess(SendResult sendResult) { 
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) { 
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) { 
                e.printStackTrace();
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}

单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

《【RocketMQ】RocketMQ快速入门》

单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

package com.morris.rocketmq.simple;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;
import static com.morris.rocketmq.util.Contant.PRODUCER_GROUP;

/** * 单向发送消息 */
public class OnewayProducer { 
    public static void main(String[] args) throws Exception{ 
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        // Specify name server addresses.
        producer.setNamesrvAddr(NAME_SERVER_ADDRESS);
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) { 
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }
        //Wait for sending to complete
        Thread.sleep(5000);        
        producer.shutdown();
    }
}

消息发送时的权衡

发送方式发送TPS发送结果反馈可靠性使用场景
同步发送可靠邮件、短信、推送
异步发送可靠视频转码
单向发送最快可能丢失日志收集

消息消费

集群消费

《【RocketMQ】RocketMQ快速入门》

消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。

实际上,每个Consumer是平均分摊Message Queue的去做拉取消费。例如某个Topic有3条Q,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的1条Q。

而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。

这种模式下,消费进度(Consumer Offset)的存储会持久化到Broker。

package com.morris.rocketmq.simple;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import static com.morris.rocketmq.util.Contant.CONSUMER_GROUP;
import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;

/** * 集群消费消息(默认) */
public class ClusterConsumer { 

    public static void main(String[] args) throws InterruptedException, MQClientException { 

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
         
        // Specify name server addresses.
        consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);
        consumer.setMessageModel(MessageModel.CLUSTERING); // default
        
        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { 
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

广播消费

《【RocketMQ】RocketMQ快速入门》

消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。

实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。

这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地。

package com.morris.rocketmq.simple;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import static com.morris.rocketmq.util.Contant.CONSUMER_GROUP;
import static com.morris.rocketmq.util.Contant.NAME_SERVER_ADDRESS;

/** * 广播消费消息(默认) */
public class BroadcastingConsumer { 

    public static void main(String[] args) throws InterruptedException, MQClientException { 

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
         
        // Specify name server addresses.
        consumer.setNamesrvAddr(NAME_SERVER_ADDRESS);
        consumer.setMessageModel(MessageModel.BROADCASTING); // default
        
        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { 
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

这种模式下消费者只能收到启动后发送MQ中的消息。

消息消费时的权衡

集群模式:

  • 消费端集群化部署,每条消息只需要被处理一次。
  • 由于消费进度在服务端维护,可靠性更高。
  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。

广播模式:

  • 广播消费模式下不支持顺序消息。
  • 广播消费模式下不支持重置消费位点。
  • 每条消息都需要被相同逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复的概率稍大于集群模式。

广播模式下,消息队列RocketMQ保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。

广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。

广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。

目前仅Java客户端支持广播模式。广播模式下服务端不维护消费进度,所以消息队列RocketMQ控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

    原文作者:morris131
    原文地址: https://blog.csdn.net/u022812849/article/details/125600649
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞