困扰许久的Kafka Rebalance问题

此文已由作者丁伟伟授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。

前提

本文的分析基于kafka 0.9版本的client, 0.10.1.0中已经修改心跳线程为后台线程,并支持设置max.poll.records,参见ChangeLog

使用场景

Kafka是一个高吞吐量的分布式消息系统,在APM的移动端请求数据的处理中,使用了Kafka。Kafka数据使用多线程阻塞的方式进行消费,即每个线程通过poll()的形式消费一个或者多个partition, 每次得到的消息集处理完成之后才会继续进行下一次poll()操作,同时使用了自动提交offset的模式。Rebalance发生的原因有可能是集群的问题,但大部分都在客户端,一旦服务端在设定的超时时间内没有收到消费者发起的心跳,则认为这个消费者已经死掉,就会执行Rebalance动作。

从源码上,我们一路从KafkaConsumer.poll(timeout)跟进来可以看到

  /**
     * Do one round of polling. In addition to checking for new data, this does any needed
     * heart-beating, auto-commits, and offset updates.
     * @param timeout The maximum time to block in the underlying poll
     * @return The fetched records (may be empty)
     */
    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        ...
        // 上面是一些检查动作
        fetcher.initFetches(cluster);
        client.poll(timeout);
        return fetcher.fetchedRecords();
    }

从注释中,我们可以看出poll动作会发出一些列的心跳、自动offset提交和更新的动作。这是我们设定了自动提交的时候,我们的消费者发出心跳和offset的地方。

再进client.poll(timeout)方法中可以看到

   //ConsumerNetworkClient.java
    private void poll(long timeout, long now, boolean executeDelayedTasks) {
        ...
        //一些前置的判断

        // execute scheduled tasks
        if (executeDelayedTasks)
            delayedTasks.poll(now);

        ...
        //其他动作
    }

从源码里面可以看到会吧delayedTask里面的所有任务执行掉,其中就有我们的心跳任务。 那么,很明显,如果我们在两次poll()调用的间隔做了太多的事情,也就是消费拉取下来的数据花了过长的时间,而没有及时发出心跳,则我们会被判定为死掉的节点,这个时候集群就会发起Rebalance。

Rebalance有什么影响

Rebalance本身是Kafka集群的一个保护设定,用于剔除掉无法消费或者过慢的消费者,然后由于我们的数据量较大,同时后续消费后的数据写入需要走网络IO,很有可能存在依赖的第三方服务存在慢的情况而导致我们超时。
Rebalance对我们数据的影响主要有以下几点:

  1. 数据重复消费: 消费过的数据由于提交offset任务也会失败,在partition被分配给其他消费者的时候,会造成重复消费,数据重复且增加集群压力
  2. Rebalance扩散到整个ConsumerGroup的所有消费者,因为一个消费者的退出,导致整个Group进行了Rebalance,并在一个比较慢的时间内达到稳定状态,影响面较大
  3. 频繁的Rebalance反而降低了消息的消费速度,大部分时间都在重复消费和Rebalance
  4. 数据不能及时消费,会累积lag,在Kafka的TTL之后会丢弃数据

上面的影响对于我们系统来说,都是致命的。

我们遇到Rebalance的场景

首先为了看下我们的rebalance有多么严重,我们增加了ConsumerRebalanceListener,并计算Rebalance发生的频率,同时将Rebalance的信息上报到监控平台上。
我们可以看到,Rebalance出现的非常频繁,一旦开始Rebalance则通常是多个机器多个消费线程同时开始Rebalance,并在一定时间后达到稳定。

《困扰许久的Kafka Rebalance问题》
《困扰许久的Kafka Rebalance问题》

同时加了一些日志看看每个partition rebalance需要多长的时间,每个partition rebalance完成都需要20秒左右(当然有些partition会被rebalance到其他消费者去,因为没有响应partition的成对的开始和结束日志),可想而知很频繁的rebalance会有很严重的问题。

2017-05-15 18:00:20,343-consumer:consumer-pool-3 ,topic:foreground.-_-.apm.online,partition:37 rebalance end, rebalance time:14818...
2017-05-15 18:00:20,343-consumer:consumer-pool-3 ,topic:foreground.-_-.apm.online,partition:36 rebalance end, rebalance time:14818...
2017-05-15 18:00:20,344-consumer:consumer-pool-9 ,topic:foreground.-_-.apm.interaction.online,partition:7 rebalance end, rebalance time:20035...
2017-05-15 18:00:20,344-consumer:consumer-pool-9 ,topic:foreground.-_-.apm.interaction.online,partition:6 rebalance end, rebalance time:20035...
2017-05-15 18:00:20,344-consumer:consumer-pool-8 ,topic:foreground.-_-.apm.interaction.online,partition:11 rebalance end, rebalance time:20322...
2017-05-15 18:00:20,344-consumer:consumer-pool-8 ,topic:foreground.-_-.apm.interaction.online,partition:10 rebalance end, rebalance time:20322...
2017-05-15 18:00:20,344-consumer:consumer-pool-1 ,topic:foreground.-_-.apm.web.online,partition:8 rebalance end, rebalance time:15162...
2017-05-15 18:00:20,344-consumer:consumer-pool-1 ,topic:foreground.-_-.apm.web.online,partition:9 rebalance end, rebalance time:15162...
2017-05-15 18:00:20,344-consumer:consumer-pool-6 ,topic:foreground.-_-.apm.diagnose.online,partition:22 rebalance end, rebalance time:15162...
2017-05-15 18:00:20,344-consumer:consumer-pool-6 ,topic:foreground.-_-.apm.diagnose.online,partition:23 rebalance end, rebalance time:15162...

常见且简单的Rebalance场景

我们的业务数据会写入Hbase,最经典的场景就是Hbase集群服务抖动或者我们写入数据造成Hbase RegionServer过热会造成消费到的消息过慢触发心跳超时。这种场景下,我们可以在日志里面明显看到Hbase写入抛出的异常。例如:

  1. 由于集群的抖动,导致我们无法正常写入数据,会造成Rebalance
 2017-03-08 14:31:45,593 411615 [htable-pool1-t99] (AsyncProcess.java:713) INFO org.apache.hadoop.hbase.client.AsyncProcess - #5, table=mam:MobileNetData, attempt=2/10 failed 56 ops, last exception: org.apache.hadoop.hbase.RegionTooBusyException: org.apache.hadoop.hbase.RegionTooBusyException: Above memstore limit, regionName=mam:MobileNetData,5852818ca18dc5fdf63bec8eded1db3c_9223370556190336483,1483673115059.19745fc99b3370aab016af1c8cc70d69., server=hbase8.photo.163.org,60020,1488738168459, memstoreSize=1077613864, blockingMemStoreSize=1073741824
 at org.apache.hadoop.hbase.regionserver.HRegion.checkResources(HRegion.java:2937)
 at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2249)
 at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2216)
 at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2220)
 at org.apache.hadoop.hbase.regionserver.HRegionServer.doBatchOp(HRegionServer.java:4478)
 at org.apache.hadoop.hbase.regionserver.HRegionServer.doNonAtomicRegionMutation(HRegionServer.java:3661)
 at org.apache.hadoop.hbase.regionserver.HRegionServer.multi(HRegionServer.java:3550)
 at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29949)
 at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2027)
 at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)
 at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:110)
 at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:90)
 at java.lang.Thread.run(Thread.java:662)

2.由于写入热点的问题导致的Rebalance

 2017-04-13 13:18:09,809 35209 [htable-pool10-t16] INFO org.apache.hadoop.hbase.client.AsyncProcess - #9, table=mam:MobileDiagnoseData, attempt=3/10 failed 86 ops, last exception: org.apache.hadoop.hbase.NotServingRegionException: org.apache.hadoop.hbase.NotServingRegionException: mam:MobileDiagnoseData,7d56fba948c044a0c7b95709a1d9084e_9223370546420477958_487c7012778ee696320ac91264d33fd1,1491464751441.e2d1589b4b227a56789cf4a5e0d6ec21. is closing
 at org.apache.hadoop.hbase.regionserver.HRegion.startRegionOperation(HRegion.java:5906)
 at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2254)
 at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2216)
 at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2220)
 at org.apache.hadoop.hbase.regionserver.HRegionServer.doBatchOp(HRegionServer.java:4478)
 at org.apache.hadoop.hbase.regionserver.HRegionServer.doNonAtomicRegionMutation(HRegionServer.java:3661)
 at org.apache.hadoop.hbase.regionserver.HRegionServer.multi(HRegionServer.java:3550)
 at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29949)
 at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2027)
 at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:108)
 at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:110)
 at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:90)
 at java.lang.Thread.run(Thread.java:745)
 on hbase12.xs.163.org,60020,1491464637625, tracking started Thu Apr 13 13:18:06 CST 2017, retrying after 3004 ms, replay 86 ops.

上面两种都是很容易解决的Rebalance,这种处理起来也简单,从发生的源头,要么查看是否集群出现问题,要么解决一下写入热点问题就可以了。

然而难解决的是没有出错日志,但是依旧会频繁的Rebalance。

需要进一步分析的异常

为了减少消费Kafka后写入到Hbase的数据不会产生明显的峰谷,我们采取了限流的策略,在写入Hbase的时候,使用RateLimiter获取令牌后写入到Hbase中,这加剧了Rebalance问题的产生,因为每次消费的时间会加上等待令牌产生的时间。 从统计上来看,左边是增加了限流后的Rebalance情况,右侧这部分红框中,则是去掉了限流之后的情况,21点左右Rebalance情况减少。

《困扰许久的Kafka Rebalance问题》
《困扰许久的Kafka Rebalance问题》

前面提到了Rebalance的原因就是同步消费poll()操作得到的数据的时间过长导致的,我们解决了这些简单的Case之后,发现还是很经常发生Rebalance,同时为什么限流会加剧Rebalance,只能增加日志来看poll()动作得到的数据的消费时间到底是多长。
这个日志中,poll record size是每次渠道的数据的条数,consumer time是消费这部分数据的时间,poll interval是消费线程两次poll的间隔。
从日志中,我们可以看到每次poll,在数据较多时会poll到1w多条数据,消费时间是3秒多,两次poll之间的间隔是12秒左右。而我们的参数中heartbeat.interval.ms设置的是10秒,而session.timeout.ms设置的是30秒,两次poll间隔12秒显然已经超出了心跳的10秒的间隔。从左边的日志也可以看出,集群发生了rebalance。

《困扰许久的Kafka Rebalance问题》
《困扰许久的Kafka Rebalance问题》

上述的日志中,我们有两个疑问:

  1. 能否调整每次拉取到的数据的条数,条数少一些,每次消费也会比较快一些
  2. 消费的时间只有3秒多,总的一次poll加上消费的时间竟然达到了12秒,poll(timeout)我们指定了超时时间为1秒,中间也没有其他操作了,所以只能怀疑poll动作有问题。

对于第1个问题,kafka0.9版本没有设置每次获取的数据条数的参数,在0.10版本中新增了,但是因为集群是0.9,所以这个暂时也没有办法,对于第2个问题,我们先加一些日志看看poll动作

《困扰许久的Kafka Rebalance问题》
《困扰许久的Kafka Rebalance问题》

显然poll()的超时时间已经超过1秒。
我们再看一下poll()和超时相关的代码

 @Override
    public ConsumerRecords<K, V> poll(long timeout) {
        acquire();
        try {
            if (timeout < 0)
                throw new IllegalArgumentException("Timeout must not be negative");

            // poll for new data until the timeout expires
            long start = time.milliseconds();
            long remaining = timeout;
            //循环退出条件是超过了超时时间
            do {
                //传入的也是剩余的超时时间
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
                if (!records.isEmpty()) {
                    fetcher.initFetches(metadata.fetch());
                    client.quickPoll();
                    return new ConsumerRecords<>(records);
                }
                //减去这次poll花掉的时间
                long elapsed = time.milliseconds() - start;
                remaining = timeout - elapsed;
            } while (remaining > 0);

            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }

poll()会进行多次的pollOnce(),直到时间用尽。从这个代码片段来看,并不可能出现大于两倍的超时时间的情况,所以我们设定超时时间1秒,但是实际的poll()调用在2秒以内是正常的,可能是做了两次pollOnce()动作,但是日志中的7秒+和8秒+的调用时间是比较奇怪的。我们继续跟踪了代码。最终跟踪到了org.apache.kafka.common.network.Selector,最终poll()的超时逻辑会走到

   private int select(long ms) throws IOException {
        if (ms < 0L)
            throw new IllegalArgumentException("timeout should be >= 0");

        if (ms == 0L)
            return this.nioSelector.selectNow();
        else
            return this.nioSelector.select(ms);
    }

可以看到这个超时时间只是nio中select的超时时间,并不包含读取数据的时间,所以从kafka集群读取数据的时间过长会导致单次poll的时间超长,感觉与超时的语义不符,应该保证在设定的timeout时间内返回设计才比较合理。这个受集群负载影响,不是我们所能控制的。

回到上面的问题,为什么正常消费也会产生Rebalance以及为什么限流会加剧Rebalance也就有了解释,因为poll()的时间会受集群影响,导致单次poll的时间超长,限流则因为限流获取令牌的等待时间会导致单次消费时间较长,加剧了Rebalance。

解决方案

那么我们如何解决这种问题呢,我们期待的结果是可以Rebalance,但是不应该因为一个消费者消费较慢或者突然的波动,而影响整个集群,导致整个集群Rebalance。很容易想到的方案就是单独控制心跳任务,让Kafka集群知道消费者还活着,但是如果依旧采用目前的消费模式,是做不到的。不过还是有解决方案的,那就是关闭offset的自动提交,由我们手动的管理offset的提交和心跳。方案网上有现成的,可以使用Spring-Kafka参考简书上的文章, 当然也可以自己编码实现,因为项目的紧迫性,我们使用了现成的方案。
我们在另一个业务里面先进行了验证性的部署,修改后的效果可以如图中所示:
上线前晚上的rebalance情况,高峰时期,数据量也较多,Kafka集群负载也较大

《困扰许久的Kafka Rebalance问题》
《困扰许久的Kafka Rebalance问题》

然后我们在第二天下午15点上线了新的Kafka消费者逻辑,可以看到晚上高峰时期也没有Rebalance了

《困扰许久的Kafka Rebalance问题》
《困扰许久的Kafka Rebalance问题》

优缺点

陆陆续续经过好久,终于基本解决了Kafka消费者不断的Rebalance的问题,好处很明显,可以解决上述的会导致我们数据的各种问题,集群也不会因为某一个消费线程比较慢而影响整个集群,这是一个恶性循环的过程。但是也有缺点,比如出现过消费线程出现问题,心跳却在继续,数据不会被消费,也不会进行Rebalance。原因还在分析中,但是这种现象毕竟非常少数,可以通过报警和手动操作的方式进行重启等操作。

网易云免费体验馆,0成本体验20+款云产品!

更多网易技术、产品、运营经验分享请点击

    原文作者:网易云
    原文地址: https://zhuanlan.zhihu.com/p/46963810
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞