概述
通过中心化采集获取kafkaconsumer端的offset指标。kafka的offset指标可以从两个地方获取
- zk目录
- kafka的内置topic
Older versions of Kafka (
pre 0.9) store offsets in ZK only, while newer version of Kafka, by default store offsets in an internal Kafka topic called
__consumer_offsets
(newer version might still commit to ZK though).The advantage of committing offsets to the broker is, that the consumer does not depend on ZK and thus clients only need to talk to brokers which simplifies the overall architecture. Also, for large deployments with a lot of consumers, ZK can become a bottleneck while Kafka can handle this load easily (committing offsets is the same thing as writing to a topic and Kafka scales very well here — in fact, by default
__consumer_offsets
is created with 50 partitions IIRC).
| KAFKA 版本 / 客户端版本 | 0.9 之前的版本 | 0.9 之后(包括0.9) | | :———————– | :—————————– | :—————————– | | 0.9 之前的版本 | Offset Storage : Zookeeper | Offset Storage : Zookeeper | | 0.9 之后(包括0.9) | Offset Storage : Zookeeper | Offset Storage : KAFKA |
如何得到积压指标?
第一步:zk 目录获取kafka积压指标
kafka积压指标在zk中保存的路径:
格式:/consumers/{CONSUMER_GROUP_ID}/offsets/{TOPIC_NAME}/{PARTITION_NUMBER}
示例:/consumers/buy_send_fragment/offsets/buy_send_fragment/5
[zk: goback-4-006.m6.momo.com:2281(CONNECTED) 10] get /consumers/buy_send_fragment/offsets/buy_send_fragment/5
768371297
cZxid = 0x72d31db19
ctime = Mon Nov 27 18:10:19 CST 2017
mZxid = 0x9a42fc9d9
mtime = Sun May 05 12:05:23 CST 2019
pZxid = 0x72d31db19
cversion = 0
dataVersion = 12950345
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
第二步:kafka内置topic
topic名称:__consumer_offsets
消息格式:[consumer group id ,topic name, partition] :: [offset, metadata, commitTimestamp, expireTimestamp]
[momobot@hubble-kafka-001.dx.momo.com kafka_2.12-0.10.2.1]$ bin/kafka-console-consumer.sh --bootstrap-server hubble-kafka-001.dx.momo.com:9092 --topic __consumer_offsets --value-deserializer org.apache.kafka.common.serialization.ByteArrayDeserializer --key-deserializer org.apache.kafka.common.serialization.ByteArrayDeserializer --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
[kafka_metric_storm_consumer,hubble-kafka-broker-monitor,1]::[OffsetMetadata[43810024168,NO_METADATA],CommitTime 1557042759641,ExpirationTime 1557129159641]
[kafka_metric_storm_consumer,hubble-kafka-broker-monitor,3]::[OffsetMetadata[72656598502,NO_METADATA],CommitTime 1557042759677,ExpirationTime 1557129159677]
结构化数据:
{
"topic":"topic-name",
"partition":11,
"group":"console-consumer-45567",
"version":2,
"offset":15,
"metadata":"",
"commitTimestamp":1501542796444,
"expireTimestamp":1501629196444
}
第三步: 获取集群topic 的LEO
通过kafka admin接口获取topic的LEO,不同版本的api有所不同。
第四步:records-lag
Records-lag = LEO - consumerOffset
总结
积压量等于topic的LEO减去消费者最新的偏移位置offset.即:
records-lag = LEO - consumerOffset