《Kafka:The Definitive Guide》第四章Kafka Consumer问题集

1. Kafka consumer和其它队列中间件的使用上有很大区别嘛?

有,有很多概念上的不同,比如offset。

2.为什么有consumer group的概念?

因为一个consumer不够用啊,当consumer有瓶颈的时候就需要开多个consumer,这时候这一组consumer就叫consumer group。

3.partition rebalance的过程?

如果开始有2个consumer,4个partition。

《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》
《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》

增加consumer到4个,此时进行rebalance。

《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》
《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》 如果再加1个consumer,那么这个consumer就没活干了,因为一个partition只能由一个consumer来消费。

《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》
《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》
4.如何设置partition的数量?

要大于你的consumer group中consumer的个数。

5.为什么需要多个consumer group?

因为可能有不同的系统需要消费这个topic。

《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》
《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》
6.什么时候会partition rebalance?

partition rebalalnce就是partition重新分配给consumer的过程。

a.有新的consumer加入的时候

b.有consumer挂了的时候

c.当添加了新的partition的时候

7.rebalance为系统的什么特性保驾护航?

高可用性和可扩展性。

8.consumer之间的member关系(consumerGroup)是如何维护的?

依靠GroupCoordinator来维护。consumer定时给group coordinator发心跳。

实际上在api层,poll()就是发心跳的时机。

9.如果一段时间不发心跳会怎么样?

如果长时间不发心跳,那么session timeout后,group coordinator就认为这个consumer挂了,然后进行partition rebalance。

10.consumer主动close和被动挂掉,对于rebalance有什么不同的影响?

主动close,那么group coordinator会马上rebalance,这个partition的message会马上被其它consumer处理。

而被动挂掉,需要session timeout,那么可能有一段时间这个partition的数据没有consumer处理,引起较大latency。

11.group leader是干嘛的?

group leader相当于班长,group coordinator相当于班主任。

group coordinator负责维护consumer group的列表,然后把这个列表给到group leader,group leader进行partition balance的任务。

consumer的心跳是打到group coordinator的。

12.consumer.poll()除了可以拉到message,还有什么用处?

必须不断的poll()才能保持心跳。所以message的处理一定要快!

13.poll的timeout参数有什么作用?

这个参数应该是个latency-throughout tradeoff。设的小一些可以减少latency,快速响应,增大则可以提高吞吐,但latency增大。

参数基本上都是tradeoff,要不然就不需要参数了,代码里直接写死最优参数就好了。

14.什么是commit?

We call the action of updating the current position in the partition a commit.

15.consumer是如何commit的?

发送一条message给topic __consumer_offsets,message包含每个partition上的offset。

16.如果consumer crash可能会有什么影响?

如果consumer已经消费的message没有commit,那么就会重复消费这部分message。

《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》
《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》

如果consumer没有消费的message却commit了,那么这部分message就不会被真实消费了。

《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》
《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》

17.consumer有哪些管理offset的方式?

a.automatic commit

一个控制commit interval的参数,叫auto.commit.interval.ms。

那一定是每隔interval就会commit一次嘛?

不是,commit是在poll()的时候,判断距离上次commit是否超过了inteval,超过则同时进行commit,没超过则不commit。

b.commit sync

同步commit,即调用commitSync(),该函数是阻塞的,等待commit完成后返回。

这种方式就是latency小,但是无法达到高吞吐。

c.commit async

异步commit,除了不阻塞意外,和同步commit的区别是同步commit会重试,比如网络有问题,那么同步commit会重试到成功或失败。而异步不会重试。

原因是如果重试的话,可能会出现offset 3000先commit了,然后重试的offset 2000才到,这时候就会重复消费了。(即commit order问题)

所以是否sync/async,是一种reliable-throughout的权衡,一般大数据还是高吞吐更重要。reliable可以同步业务系统的幂等性达到。

18.那是用sync好还是async好呢?

一般来说,用async的方式没有太大问题,因为即便中间的offset commit丢了,只要后边有成功的commit就行了。

但是如果这是最后一次commit,之后就close了,或者rebalance了,那么这次commit就很重,不能丢。所以最后这次commit是要同步commit的。所以一个经典的pattern是同时使用async和sync。

《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》
《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》
19.上面提到的commitAsync()和commitSync(),都是提交之前poll的所有message,那么如果我们想commit指定的offset可以嘛?

可以。

比如下面是每一千条commit一次。

《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》
《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》
20.可以消费指定offset的数据嘛?

可以seek。一般用于把offset存到外部系统的时候。

21.如何保证consume exactly once呢?

先来看这个有问题的代码,有可能consumer在store record之后挂掉,那么就重复消费两次这个消息:

《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》
《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》

如何保证只消费一次呢?就让process store commit是一个原子操作就可以了。

然后在rebalance的时候,接收rebalance event,seek到db中保存的offset就可以了。

《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》
《《Kafka:The Definitive Guide》第四章Kafka Consumer问题集》
22.如果想保证exactly consume once,那么只能把offset保存在db中嘛?commit给kafka的方式一定不行嘛?
是的,commit的方式不行,因为exactly once是利用了db的transaction实现的,目前
kafka没有transaction

但是kafka在做这项功能了。

23.consumer有哪些parameter?

fetch.min.bytes : latency-throughout tradeoff

fetch.max.wait.ms : latency-throughout tradeoff

max.partition.fetch.bytes: 如果太大的话处理时间过长,可能导致session timeout

session.timeout.ms : 默认3s

    原文作者:衫秋南
    原文地址: https://zhuanlan.zhihu.com/p/27408881
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞