kafka Coordinator 源码分析-1

原文发表在

kafka Coordinator 源码分析-1jefffrey.farbox.com

Coordinator 负责管理组的状态以及 offset 的提交和持久化. 本文主要分析了 kafka 1.1.0 版本的 broker 代码。kafka Coordinator 源码分析-1Coordinator 负责管理组的状态以及 offset 的提交和持久化. 本文主要分析了 kafka 1.1.0 版本的 broker 代码。

代码目录

Broker 相关的代码在 core/src/main/scala/kafka/coordinator 目录下, Consumer 相关的代码在 clients/src/main/java/org.apache.kafka/clients/consumer/internals 里,主要为 AbstractCoordinator, ConsumerCoordinator 两个类。

消费组管理

Kafka 的消费操作是以消费组为单位的而不是以消费者为单位的,消费者只负责消费指定 partition 上的消息,而整个消费组则完整消费了 topic 上的所有消息。所以如何给消费者分配 partition 是一个重要的问题,kafka 将这个问题交给 coordinator 来完成。

源码结构

KafkaApis 是 broker 所有请求的入口,和组相关的请求都会由 GroupCoordinator 来处理, GroupCoordinator 是 Coordinator 模块在 broker 端的主要代码实现,主要处理组成员变更的请求,并执行 rebalance 操作。 GroupMetadata 是一个组的实例,包含了组的状态,内部的成员信息。 MemberMetadata 是一个成员的实例,包含成员的信息,是否是 leader 等等。 GroupMetadataManager 则是组的管理员,负责管理所有的组并持有组对象。

Rebalance

给消费者分配 partition 的行为叫做 rebalance,rebalance 只有在消费组状态发生改变时才会被触发,包括 成员加入组,离开组,成员metadata 修改等。 而 Coordinator 本质上是要处理成员变动的请求,并在合适的时机触发 rebalance 操作,并将 rebalance 的结果分发给组员(消费者). 需要注意的是,rebalance 的计算过程并不发生在 broker 上,而是发生在被选为 leader 的consumer 上。下面会详细介绍整个过程。

加入组的简单流程 (joinGroup, syncGroup)

一次典型的加入组过程包括以下几个步骤 1. consumer 向 broker 发送 joinGroup 请求, (joinGroup send) 2. broker 收到 joinGroup 请求, 判断该 consumer 是否为 leader[1], 若为 leader 则返回当前所有成员的信息,若为 follower,则返回空。(joinGroup respond), group 进入 PreparingRebalance 状态。[2] 3. consumer 收到请求后,若发现自己为 leader,则根据返回的信息,执行 rebalance 的计算[3],计算完成后将 rebalance 结果通过 syncGroup 请求发送给 broker。若自己为 follower,则直接发送 syncGroup 请求。(joinGroup recv, syncGroup send) 4. broker 收到 syncGroup 请求,判断请求方是否为 leader,如果不是 leader,则等待直到 leader 将 rebalance 结果送达,如果请求方是 leader,则给所有的 syncGroup 请求返回 rebalance 的结果。 (syncGroup respond) 5. consumer 收到 syncGroup 结果,则调用相应的回调方法,(onPartitionRebalance) 按照最新的 rebalance 结果进行消费。(syncGroup recv)

note: [1] leader 选举的逻辑比较简单,如果当前没有 leader 则选该 consumer 为 leader [2] 读完上面的过程,读者肯定会有一个问题,如果有新成员加入,broker 怎么通知其他的 consumer?其实除了上述的主线,还有一条旁路,就是 heartbeat [3] 需要注意的是,并不是每个 joinGroup 请求都会触发一次 rebalance 操作,在一定的时间范围内,coordiantor 会把所有 joinGroup 请求合并为一次 rebalance. 一次 rebalance 的行为可能包含多次 joinGroup 请求,但只会有一次 rebalance 的计算。

不可不提的 heartbeat

在 consumer 和 coordinator 之间,除了正常的 joinGroup, syncGroup 请求外,还有heartbeat 机制来保证两者的联系,heartbeat 机制的作用有二: 1. 让 consumer 收到 coordinator 的最新状态,如果发现 coordinator 的状态为正在执行 rebalance 操作,consumer 需要赶紧发送 joinGroup 请求 (rejoin),否则他就收不到最新 rebalance 结果,消费组里可能就没他的位置了。 2. 让 coordinator 了解 consumer 的状态,如果 coordinator 发现某个 consumer 在超时时间内没有发送心跳,则 coordinator 认为该 consumer 已下线,将该 consumer 无情排除出去,并触发 rebalance 操作。

组内部的状态转移

coordinator 内的每个消费组都有自己都状态,在 GroupMetadata 里有定义,一共有以下五个状态:Empty, PreparingRebalance, CompletingRebalance, Stable, Dead. 每个状态表达的语义如下: Empty: 此时组内没有任何成员 PreparingRebalance: 正在准备 rebalance,此时可以接收 joinGroup 请求. CompletingRebalance: 正在等待 leader 的分配结果. Stable: coordinator 已收到 leader 发送的 rebalance 结果, 处于稳定状态. Dead: 此时组里没有任何成员并且 metadata 都被删除了.

组的状态转移表格如下:

| table |Empty | PreparingRebalance | CompletingRebalance | Stable | Dead | | —– | —- | ——————- | —— | —- | —–| | Empty | | 新成员加入 | | | 组被迁移走 | | PreparingRebalance | | | 在超时时间内,某些成员已加入 | | 组被迁移走 | | CompletingRebalance | | 有成员变动或者某些成员未通过心跳 | |从 leader 处收到 syncGroup 的请求,即 coordinator 拿到了 rebalance 的结果 | 组被迁移走 | | Stable | | 有成员变动或某些成员未通过心跳 | | | 组被迁移走 |

错误处理

broker 宕机怎么办?

GroupMetadataManager 提供了将组的元数据持久化的机制,分别在 PreparingRebalance->CompletingRebalance 和 CompletingRebalance->Stable 两个状态转移处将元数据持久化。保证磁盘中有最新的元数据状态。所以一旦 broker 宕机,磁盘中将保存当前组的最新数据。 当 broker 恢复后,coordinator 并没有直接把信息从磁盘中读取出来,而是通过 leader_and_isr 请求,在 partition leader 选举的过程中将组的元数据读取出来. 这里需要再详细了解 kafka 的高可用机制。

在 rebalance 过程中有成员”失联”

若是 leader 在 rebalance 过程中失联,会被 cooridnator 检测到,其效果等效于从组内移除一个成员。

offset 管理

TODO

    原文作者:cloud9
    原文地址: https://zhuanlan.zhihu.com/p/45526398
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞