RocketMQ中PullConsumer的启动源码分析

通过DefaultMQPullConsumer作为默认实现,这里的启动过程和Producer很相似,但相比复杂一些

【RocketMQ中Producer的启动源码分析】

 

DefaultMQPullConsumer的构造方法:

1 public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {
2     this.consumerGroup = consumerGroup;
3     defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
4 }

这里会封装一个DefaultMQPullConsumerImpl,类似于Producer中DefaultMQProducerImpl

DefaultMQPullConsumerImpl:

 1 public class DefaultMQPullConsumerImpl implements MQConsumerInner {
 2     private final InternalLogger log = ClientLogger.getLog();
 3     private final DefaultMQPullConsumer defaultMQPullConsumer;
 4     private final long consumerStartTimestamp = System.currentTimeMillis();
 5     private final RPCHook rpcHook;
 6     private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
 7     private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
 8     private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
 9     private MQClientInstance mQClientFactory;
10     private PullAPIWrapper pullAPIWrapper;
11     private OffsetStore offsetStore;
12     private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
13 
14     public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
15         this.defaultMQPullConsumer = defaultMQPullConsumer;
16         this.rpcHook = rpcHook;
17     }
18     ......
19 }

如上会封装这些东西,在后面遇到了再详细介绍

 

而DefaultMQPullConsumer的start方法,其实际上调用的是DefaultMQPullConsumerImpl的start方法

DefaultMQPullConsumerImpl的start方法:

 1 public synchronized void start() throws MQClientException {
 2     switch (this.serviceState) {
 3         case CREATE_JUST:
 4             this.serviceState = ServiceState.START_FAILED;
 5 
 6             this.checkConfig();
 7 
 8             this.copySubscription();
 9 
10             if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
11                 this.defaultMQPullConsumer.changeInstanceNameToPID();
12             }
13 
14             this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);
15 
16             this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
17             this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
18             this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
19             this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
20 
21             this.pullAPIWrapper = new PullAPIWrapper(
22                 mQClientFactory,
23                 this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
24             this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
25 
26             if (this.defaultMQPullConsumer.getOffsetStore() != null) {
27                 this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
28             } else {
29                 switch (this.defaultMQPullConsumer.getMessageModel()) {
30                     case BROADCASTING:
31                         this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
32                         break;
33                     case CLUSTERING:
34                         this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
35                         break;
36                     default:
37                         break;
38                 }
39                 this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);
40             }
41 
42             this.offsetStore.load();
43 
44             boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
45             if (!registerOK) {
46                 this.serviceState = ServiceState.CREATE_JUST;
47 
48                 throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
49                     + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
50                     null);
51             }
52 
53             mQClientFactory.start();
54             log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
55             this.serviceState = ServiceState.RUNNING;
56             break;
57         case RUNNING:
58         case START_FAILED:
59         case SHUTDOWN_ALREADY:
60             throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
61                 + this.serviceState
62                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
63                 null);
64         default:
65             break;
66     }
67 
68 }

首先checkConfig方法会对配置做检查

接着copySubscription方法:

 1 private void copySubscription() throws MQClientException {
 2     try {
 3         Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();
 4         if (registerTopics != null) {
 5             for (final String topic : registerTopics) {
 6                 SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
 7                     topic, SubscriptionData.SUB_ALL);
 8                 this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
 9             }
10         }
11     } catch (Exception e) {
12         throw new MQClientException("subscription exception", e);
13     }
14 }

这里的registerTopics是由用户调用setRegisterTopics方法注册进来的Topic集合
在这里会将集合中的Topic包装成SubscriptionData保存在rebalanceImpl中

SubscriptionData:

 1 public class SubscriptionData implements Comparable<SubscriptionData> {
 2     public final static String SUB_ALL = "*";
 3     private boolean classFilterMode = false;
 4     private String topic;
 5     private String subString;
 6     private Set<String> tagsSet = new HashSet<String>();
 7     private Set<Integer> codeSet = new HashSet<Integer>();
 8     private long subVersion = System.currentTimeMillis();
 9     private String expressionType = ExpressionType.TAG;
10     ......
11 }

RebalanceImpl:

 1 public abstract class RebalanceImpl {
 2     protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
 3     protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
 4         new ConcurrentHashMap<String, Set<MessageQueue>>();
 5     protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
 6         new ConcurrentHashMap<String, SubscriptionData>();
 7     protected String consumerGroup;
 8     protected MessageModel messageModel;
 9     protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
10     protected MQClientInstance mQClientFactory;
11     ......
12 }

 

回到start方法,接着和Producer中一样通过MQClientManager获取一个MQClientInstance
然后会完成对rebalanceImpl属性的填充

接着会实例化一个PullAPIWrapper,同时向其注册过滤器的钩子,这个对象在之后分析消息拉取时详细介绍

接下来会根据消息的模式,决定使用不同方式的OffsetStore

 1 public enum MessageModel {
 2     /**
 3      * broadcast
 4      */
 5     BROADCASTING("BROADCASTING"),
 6     /**
 7      * clustering
 8      */
 9     CLUSTERING("CLUSTERING");
10     ......
11 }

分别是广播模式和集群模式
广播模式(BROADCASTING):同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费
集群模式(CLUSTERING):同一个ConsumerGroup里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容的整体

采用广播模式,消费者的消费进度offset会被保存在本地;而采用集群模式,消费者的消费进度offset会被保存在远端(broker)上
故广播模式使用LocalFileOffsetStore,集群模式使用RemoteBrokerOffsetStore

在采用广播模式,即LocalFileOffsetStore,调用load方法会对其配置文件offsets.json进行加载,而RemoteBrokerOffsetStore时没意义的异步操作
LocalFileOffsetStore的load方法:

 1 public void load() throws MQClientException {
 2     OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
 3     if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
 4         offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
 5 
 6         for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
 7             AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
 8             log.info("load consumer's offset, {} {} {}",
 9                 this.groupName,
10                 mq,
11                 offset.get());
12         }
13     }
14 }

readLocalOffset方法会将offsets.json文件中的json字符串转换成OffsetSerializeWrapper对象封装

 1 public class OffsetSerializeWrapper extends RemotingSerializable {
 2     private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
 3         new ConcurrentHashMap<MessageQueue, AtomicLong>();
 4 
 5     public ConcurrentMap<MessageQueue, AtomicLong> getOffsetTable() {
 6         return offsetTable;
 7     }
 8 
 9     public void setOffsetTable(ConcurrentMap<MessageQueue, AtomicLong> offsetTable) {
10         this.offsetTable = offsetTable;
11     }
12 }

从这里就可里大致理解json文件中的内容,其中AtomicLong就对应MessageQueue下具体的Offset
之后在load方法中,会将该map保存在LocalFileOffsetStore中的offsetTable中

接着会调用mQClientFactory的start方法,这个方法在【RocketMQ中Producer的启动源码分析】中进行过分析

 

 1 public void start() throws MQClientException {
 2     synchronized (this) {
 3         switch (this.serviceState) {
 4             case CREATE_JUST:
 5                 this.serviceState = ServiceState.START_FAILED;
 6                 // If not specified,looking address from name server
 7                 if (null == this.clientConfig.getNamesrvAddr()) {
 8                     this.mQClientAPIImpl.fetchNameServerAddr();
 9                 }
10                 // Start request-response channel
11                 this.mQClientAPIImpl.start();
12                 // Start various schedule tasks
13                 this.startScheduledTask();
14                 // Start pull service
15                 this.pullMessageService.start();
16                 // Start rebalance service
17                 this.rebalanceService.start();
18                 // Start push service
19                 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
20                 log.info("the client factory [{}] start OK", this.clientId);
21                 this.serviceState = ServiceState.RUNNING;
22                 break;
23             case RUNNING:
24                 break;
25             case SHUTDOWN_ALREADY:
26                 break;
27             case START_FAILED:
28                 throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
29             default:
30                 break;
31         }
32     }
33 }

首先若是没有设置NameServer的地址,会调用fetchNameServerAddr方法进行自动寻址,详见Producer的启动

之后mQClientAPIImpl的start方法会完成对Netty客户端的绑定操作,详见Producer的启动

startScheduledTask方法则会设置五个定时任务:
①若是名称服务地址namesrvAddr不存在,则调用前面的fetchNameServerAddr方法,定时更新名称服务
②定时更新Topic所对应的路由信息
③定时清除离线的Broker,以及向当前在线的Broker发送心跳包
(以上详见Producer的启动)

④定时持久化消费者队列的消费进度
DefaultMQPullConsumerImpl中的实现:

 1 public void persistConsumerOffset() {
 2     try {
 3         this.makeSureStateOK();
 4         Set<MessageQueue> mqs = new HashSet<MessageQueue>();
 5         Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
 6         mqs.addAll(allocateMq);
 7         this.offsetStore.persistAll(mqs);
 8     } catch (Exception e) {
 9         log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
10     }
11 }

首先从rebalanceImpl中取出所有处理的消费队列MessageQueue集合
然后调用offsetStore的persistAll方法进一步处理该集合

由于广播模式和集群模式,所以这里有两种实现:
广播模式LocalFileOffsetStore的persistAll方法:

 1 public void persistAll(Set<MessageQueue> mqs) {
 2     if (null == mqs || mqs.isEmpty())
 3         return;
 4 
 5     OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
 6     for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
 7         if (mqs.contains(entry.getKey())) {
 8             AtomicLong offset = entry.getValue();
 9             offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
10         }
11     }
12 
13     String jsonString = offsetSerializeWrapper.toJson(true);
14     if (jsonString != null) {
15         try {
16             MixAll.string2File(jsonString, this.storePath);
17         } catch (IOException e) {
18             log.error("persistAll consumer offset Exception, " + this.storePath, e);
19         }
20     }
21 }

这里和之前的load方法相反,会将MessageQueue对应的offset信息替换掉原来的json文件中的内容
这样就完成了广播模式下定时持久化消费者队列的消费进度

集群模式RemoteBrokerOffsetStore的persistAll方法的实现:

 1 public void persistAll(Set<MessageQueue> mqs) {
 2     if (null == mqs || mqs.isEmpty())
 3         return;
 4 
 5     final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
 6     if (!mqs.isEmpty()) {
 7         for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
 8             MessageQueue mq = entry.getKey();
 9             AtomicLong offset = entry.getValue();
10             if (offset != null) {
11                 if (mqs.contains(mq)) {
12                     try {
13                         this.updateConsumeOffsetToBroker(mq, offset.get());
14                         log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
15                             this.groupName,
16                             this.mQClientFactory.getClientId(),
17                             mq,
18                             offset.get());
19                     } catch (Exception e) {
20                         log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
21                     }
22                 } else {
23                     unusedMQ.add(mq);
24                 }
25             }
26         }
27     }
28 
29     if (!unusedMQ.isEmpty()) {
30         for (MessageQueue mq : unusedMQ) {
31             this.offsetTable.remove(mq);
32             log.info("remove unused mq, {}, {}", mq, this.groupName);
33         }
34     }
35 }

和上面类似,遍历offsetTable中的内容,只不过不是保存在了本地,而是通过updateConsumeOffsetToBroker向Broker发送
updateConsumeOffsetToBroker方法:

 1 private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
 2     MQBrokerException, InterruptedException, MQClientException {
 3     updateConsumeOffsetToBroker(mq, offset, true);
 4 }
 5 
 6 public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
 7     MQBrokerException, InterruptedException, MQClientException {
 8     FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
 9     if (null == findBrokerResult) {
10 
11         this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
12         findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
13     }
14 
15     if (findBrokerResult != null) {
16         UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
17         requestHeader.setTopic(mq.getTopic());
18         requestHeader.setConsumerGroup(this.groupName);
19         requestHeader.setQueueId(mq.getQueueId());
20         requestHeader.setCommitOffset(offset);
21 
22         if (isOneway) {
23             this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
24                 findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
25         } else {
26             this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
27                 findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
28         }
29     } else {
30         throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
31     }
32 }

首先根据BrokerName查找Broker的路由信息:

 1 public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
 2     String brokerAddr = null;
 3     boolean slave = false;
 4     boolean found = false;
 5 
 6     HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
 7     if (map != null && !map.isEmpty()) {
 8         for (Map.Entry<Long, String> entry : map.entrySet()) {
 9             Long id = entry.getKey();
10             brokerAddr = entry.getValue();
11             if (brokerAddr != null) {
12                 found = true;
13                 if (MixAll.MASTER_ID == id) {
14                     slave = false;
15                 } else {
16                     slave = true;
17                 }
18                 break;
19 
20             }
21         } // end of for
22     }
23 
24     if (found) {
25         return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
26     }
27 
28     return null;
29 }

brokerAddrTable中的borker的路由信息会由 ②定时更新Topic所对应的路由信息 ,来完成更新,在brokerAddrTable中只要找的一个Broker的信息后,将其封装为FindBrokerResult返回

若是没有找到会执行updateTopicRouteInfoFromNameServer方法,也就是执行了一次定时任务中的方法,立即更新一次,再通过findBrokerAddressInAdmin方法,重新查找

找到之后,实例化一个请求头 UpdateConsumerOffsetRequestHeader,将相应信息封装,由于使用的是Oneway模式,所以这里采用updateConsumerOffsetOneway方法,通过Netty向Broker发送

 1 public void updateConsumerOffsetOneway(
 2     final String addr,
 3     final UpdateConsumerOffsetRequestHeader requestHeader,
 4     final long timeoutMillis
 5 ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,
 6     InterruptedException {
 7     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
 8 
 9     this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
10 }

其实这里就非常简单地调用了invokeOneway方法,完成向Broker的消息单向发送

【RocketMQ中Producer消息的发送源码分析】

非OneWay则采用同步发送
这样,在集群模式下,消费进度也就交给了Broker管理,之后的负载均衡以此为基础

⑤定时调整消费者端的线程池的大小
这里针对的是PushConsumer,后续博客再介绍

对于PullConsumer来说rebalanceService服务的开启才是最重要的

RebalanceService:

 1 public void run() {
 2     log.info(this.getServiceName() + " service started");
 3 
 4     while (!this.isStopped()) {
 5         this.waitForRunning(waitInterval);
 6         this.mqClientFactory.doRebalance();
 7     }
 8 
 9     log.info(this.getServiceName() + " service end");
10 }

这里的waitForRunning和Broker的刷盘以及主从复制类似,会进行超时阻塞(默认20s),也可以通过Broker发送的NOTIFY_CONSUMER_IDS_CHANGED请求将其唤醒,之后会调用doRebalance方法

RebalanceImpl的doRebalance方法:

 1 public void doRebalance(final boolean isOrder) {
 2    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
 3     if (subTable != null) {
 4         for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
 5             final String topic = entry.getKey();
 6             try {
 7                 this.rebalanceByTopic(topic, isOrder);
 8             } catch (Throwable e) {
 9                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
10                     log.warn("rebalanceByTopic Exception", e);
11                 }
12             }
13         }
14     }
15 
16     this.truncateMessageQueueNotMyTopic();
17 }

这里就会取得copySubscription方法中说过的订阅Topic集合,这个集合会在②中的定时任务会通过NameServer来进行更新

通过rebalanceByTopic方法,处理订阅的Topic:

 1 private void rebalanceByTopic(final String topic, final boolean isOrder) {
 2     switch (messageModel) {
 3         case BROADCASTING: {
 4             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
 5             if (mqSet != null) {
 6                 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
 7                 if (changed) {
 8                     this.messageQueueChanged(topic, mqSet, mqSet);
 9                     log.info("messageQueueChanged {} {} {} {}",
10                         consumerGroup,
11                         topic,
12                         mqSet,
13                         mqSet);
14                 }
15             } else {
16                 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
17             }
18             break;
19         }
20         case CLUSTERING: {
21             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
22             List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
23             if (null == mqSet) {
24                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
25                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
26                 }
27             }
28 
29             if (null == cidAll) {
30                 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
31             }
32 
33             if (mqSet != null && cidAll != null) {
34                 List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
35                 mqAll.addAll(mqSet);
36 
37                 Collections.sort(mqAll);
38                 Collections.sort(cidAll);
39 
40                 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
41 
42                 List<MessageQueue> allocateResult = null;
43                 try {
44                     allocateResult = strategy.allocate(
45                         this.consumerGroup,
46                         this.mQClientFactory.getClientId(),
47                         mqAll,
48                         cidAll);
49                 } catch (Throwable e) {
50                     log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
51                         e);
52                     return;
53                 }
54 
55                 Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
56                 if (allocateResult != null) {
57                     allocateResultSet.addAll(allocateResult);
58                 }
59 
60                 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
61                 if (changed) {
62                     log.info(
63                         "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
64                         strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
65                         allocateResultSet.size(), allocateResultSet);
66                     this.messageQueueChanged(topic, mqSet, allocateResultSet);
67                 }
68             }
69             break;
70         }
71         default:
72             break;
73     }
74 }

这里会根据广播模式和集群模式做不同的处理

 

广播模式:
先根据Topic取得对应的所有消息队列的集合

然后先通过updateProcessQueueTableInRebalance方法处理:

 1 private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
 2     final boolean isOrder) {
 3     boolean changed = false;
 4 
 5     Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
 6     while (it.hasNext()) {
 7         Entry<MessageQueue, ProcessQueue> next = it.next();
 8         MessageQueue mq = next.getKey();
 9         ProcessQueue pq = next.getValue();
10 
11         if (mq.getTopic().equals(topic)) {
12             if (!mqSet.contains(mq)) {
13                 pq.setDropped(true);
14                 if (this.removeUnnecessaryMessageQueue(mq, pq)) {
15                     it.remove();
16                     changed = true;
17                     log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
18                 }
19             } else if (pq.isPullExpired()) {
20                 switch (this.consumeType()) {
21                     case CONSUME_ACTIVELY:
22                         break;
23                     case CONSUME_PASSIVELY:
24                         pq.setDropped(true);
25                         if (this.removeUnnecessaryMessageQueue(mq, pq)) {
26                             it.remove();
27                             changed = true;
28                             log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
29                                 consumerGroup, mq);
30                         }
31                         break;
32                     default:
33                         break;
34                 }
35             }
36         }
37     }
38 
39     List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
40     for (MessageQueue mq : mqSet) {
41         if (!this.processQueueTable.containsKey(mq)) {
42             if (isOrder && !this.lock(mq)) {
43                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
44                 continue;
45             }
46 
47             this.removeDirtyOffset(mq);
48             ProcessQueue pq = new ProcessQueue();
49             long nextOffset = this.computePullFromWhere(mq);
50             if (nextOffset >= 0) {
51                 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
52                 if (pre != null) {
53                     log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
54                 } else {
55                     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
56                     PullRequest pullRequest = new PullRequest();
57                     pullRequest.setConsumerGroup(consumerGroup);
58                     pullRequest.setNextOffset(nextOffset);
59                     pullRequest.setMessageQueue(mq);
60                     pullRequest.setProcessQueue(pq);
61                     pullRequestList.add(pullRequest);
62                     changed = true;
63                 }
64             } else {
65                 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
66             }
67         }
68     }
69 
70     this.dispatchPullRequest(pullRequestList);
71 
72     return changed;
73 }

若是消息队列发生了更新,这里首先在while循环中会将处理队列中的无用的记录删除
而在for循环中则是为了添加新的处理记录,向processQueueTable添加了处理记录,computePullFromWhere方法在PullConsumer中默认返回0,作为nextOffset,会将该nextOffset作为下次拉取消息的位置保存在ProcessQueue中,进而保存在processQueueTable中,作为处理任务的记录

之后的dispatchPullRequest方法是对于PushConsumer而言的,这里没有作用

回到rebalanceByTopic方法,若是发生了更新,会调用messageQueueChanged方法:

 1 public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
 2     MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener();
 3     if (messageQueueListener != null) {
 4         try {
 5             messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
 6         } catch (Throwable e) {
 7             log.error("messageQueueChanged exception", e);
 8         }
 9     }
10 }

这里实际上就交给MessageQueueListener执行messageQueueChanged回调方法

 

集群模式:
首先还是根据Topic得到消息队列的集合
由于是集合模式,每个消费者会取得不同的消息,所以这里通过findConsumerIdList方法,得到消费者的ID列表

 1 public List<String> findConsumerIdList(final String topic, final String group) {
 2    String brokerAddr = this.findBrokerAddrByTopic(topic);
 3     if (null == brokerAddr) {
 4         this.updateTopicRouteInfoFromNameServer(topic);
 5         brokerAddr = this.findBrokerAddrByTopic(topic);
 6     }
 7 
 8     if (null != brokerAddr) {
 9         try {
10             return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
11         } catch (Exception e) {
12             log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
13         }
14     }
15 
16     return null;
17 }

findBrokerAddrByTopic方法,会根据Topic选取所在集群的一个Broker的地址(由②定时任务通过NameServer更新),若是master存在选择master,否则随机选择一个slave

若是没找到,则重新向NameServer请求更新,再找一次

当得到Broker的地址信息后,通过getConsumerIdListByGroup方法,向Broker发送请求:

 1 public List<String> getConsumerIdListByGroup(
 2     final String addr,
 3     final String consumerGroup,
 4     final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
 5     MQBrokerException, InterruptedException {
 6     GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
 7     requestHeader.setConsumerGroup(consumerGroup);
 8     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
 9 
10     RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
11         request, timeoutMillis);
12     assert response != null;
13     switch (response.getCode()) {
14         case ResponseCode.SUCCESS: {
15             if (response.getBody() != null) {
16                 GetConsumerListByGroupResponseBody body =
17                     GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);
18                 return body.getConsumerIdList();
19             }
20         }
21         default:
22             break;
23     }
24 
25     throw new MQBrokerException(response.getCode(), response.getRemark());
26 }

这里实际上就是向Broker发送了一个GET_CONSUMER_LIST_BY_GROUP请求,进行同步发送,再收到响应后,将响应中的数据,也就是消费者ID的封装成的List返回

回到rebalanceByTopic方法,得到消费者的ID列表后
会根据分配策略进行分配,这里默认使用的是AllocateMessageQueueAveragely
然后调用它的allocate方法,进行分配

 1 public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
 2     List<String> cidAll) {
 3     if (currentCID == null || currentCID.length() < 1) {
 4         throw new IllegalArgumentException("currentCID is empty");
 5     }
 6     if (mqAll == null || mqAll.isEmpty()) {
 7         throw new IllegalArgumentException("mqAll is null or mqAll empty");
 8     }
 9     if (cidAll == null || cidAll.isEmpty()) {
10         throw new IllegalArgumentException("cidAll is null or cidAll empty");
11     }
12 
13     List<MessageQueue> result = new ArrayList<MessageQueue>();
14     if (!cidAll.contains(currentCID)) {
15         log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
16             consumerGroup,
17             currentCID,
18             cidAll);
19         return result;
20     }
21 
22     int index = cidAll.indexOf(currentCID);
23     int mod = mqAll.size() % cidAll.size();
24     int averageSize =
25         mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
26             + 1 : mqAll.size() / cidAll.size());
27     int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
28     int range = Math.min(averageSize, mqAll.size() - startIndex);
29     for (int i = 0; i < range; i++) {
30         result.add(mqAll.get((startIndex + i) % mqAll.size()));
31     }
32     return result;
33 }

(关于这个ID在Producer的启动中介绍过,是在MQClientManager的getAndCreateMQClientInstance方法中,对于客户端来说是唯一的)

由于是集群模式,那么这里的Consumer也理所应当作为其中一员,所以会检查currentCID是否包含在集合中

接着会根据消费者的数量以及消息的数量,进行消息的分配,以此达到消费者端的负载均衡
这里采用的是平均分配的方式,利用消息的数量以及消费者的数量就,计算出当前消费者需要消费哪部分消息

处理之外,RocketMQ中还提供其他几种分配方式,根据需要,酌情使用

回到rebalanceByTopic方法中,在完成消息的分配后
会调用updateProcessQueueTableInRebalance方法,完成对消息队列和处理队列的更新,若是发生了更新,再通过messageQueueChanged方法,调用回调接口的方法,完成对消息队列变化的通知

至此,PullConsumer的启动结束

点赞