* 使用kafka, 消费生产的数据是必不可少的, 为不影响业务的正常处理, 对消费过程的积压lag的监控和报警就显得特别重要
* Kafka的lag监控工具有若干个:
1. [KafkaOffsetMonitor](quantifind/KafkaOffsetMonitor): 应试是很久没更新了;
2. [KafkaManager](yahoo/kafka-manager): 用于管理还可以, 监控报警的话需要自己添加少量接口实现;
3. [Burrow](linkedin/Burrow): 个人觉得是目前为止最好用的lag监控报警工具; 目前这个已经被作者重构了一次,下面描述的还是之前的老版本,仅供参考
###### Burrow 功能简介
* Burrow的[github主页]((linkedin/Burrow))已经对其从编译到配置使用作了很好的说明, 这里不累述;
* Burrow用Golang写成, 代码不多, 很容易读, 扩展性也很多;
* 使用Burrow作监控, 不需要预先设置lag的阈值, 他完全是基于消费过程的动态评估;
* 可以监控offset提交到broker,zk两种方式,还可以作storm的消费监控, 这部分扩展起来也很容易;
* 报警支持http, email什么的, 想要扩展个自己的短信报警什么的也是超简单, 好用的不要不要的~~~
* Burrow还贴心的提供了http接口,来获取整个集群的生产,消费等情况, 可参见[wiki](linkedin/Burrow)
* 哎呀, 我去, 又一次贴心提供了Docker镜像, 开箱即用啊~~~
###### Burrow 实现简介:
* 对lag情况进行报警, 当然首先需要获取各group的消费的topic的各个partition的broker offset,就是实际生产的msg的条数, 通过[sarama](Shopify/sarama)可以轻松获取, 当然这个需要周期性不间断获取;
* 有了broker的offset, 还需要消费的commited offset, 针对kafka 0.9及以后的版本, 提交的offset可以选择保存在broker上的[__consumer_offsets的内部topic上](Kafka的消息是如何被消费的?), Burrow还是通过[sarama](Shopify/sarama)来消费__consumer_offsets这个topic来获取;
* 还有些offset是提交到zk上, Burrow也支持从zk上来获取这个 committed offsets;
* Broker offset有了, Committed offset也有了, 剩下的就是应用各种策略来评估各个group的消费情况啦, 每个group可以消费多个topic, 每个topic也有多个paritition, 针对每个partition都有一个凭估周期的概念, 一个凭估周期包括若干个凭估窗口, 每个凭估窗口都是对broker offset和committed offset的一次采样, 然后将策略应用到这个凭估周期内, 最后作出凭估, 这个策略兼顾了broker offset的变化, committed offset的变化, 具体参考[wiki上的策略](linkedin/Burrow);
* 策略不是完美的, 我们也可以根据自身的需要增加,修改策略.
###### Burrow 使用中遇到的问题:
* Burrow只能监控在Burrow运行后提交过offset的group, 因为在通过[sarama](Shopify/sarama)消费__consumer_offsets这个topic来获取committed offset时,设置了OffsetNewest,每次都是从最新开始消费, 我也尝试过改成从最旧开始消费 ,但[sarama](Shopify/sarama)会run很多的thread起来, 撑爆了系统, 不知道是不是sarama的bug;
* 不支持topic扩展的新的partition的监控, 后来我发现最新版的Burrow里已经修了这个问题,[看这里](New topic information isn’t picked up until restart · Issue #250 · linkedin/Burrow), 但是这个修复只支持了新增的partition的broker offset的获取, 并没有支持committed offset的获取,可以在`addConsumerOffset`里加上下面这段:
“`
if int(offset.Partition) >= len(consumerTopicMap) {
// The partition count must have increased. Append enough extra partitions to our slice
for i := len(consumerTopicMap); i < partitionCount; i++ {
consumerTopicMap = append(consumerTopicMap, nil)
}
}
“`
* 偶尔遇到过针对map的读写导致竞争问题:`fatal error: concurrent map read and map write`, 查了下是在`evaluateGroup`中对clusterMap.broker的读操作和在`addBrokerOffset`中对其写操作引发, 加锁吧~,可以拷贝一份`clusterMap.broker`来读.Kafka的消费积压监控-Burrow