1. Kafka consumer和其它队列中间件的使用上有很大区别嘛?
有,有很多概念上的不同,比如offset。
2.为什么有consumer group的概念?
因为一个consumer不够用啊,当consumer有瓶颈的时候就需要开多个consumer,这时候这一组consumer就叫consumer group。
3.partition rebalance的过程?
如果开始有2个consumer,4个partition。
增加consumer到4个,此时进行rebalance。
如果再加1个consumer,那么这个consumer就没活干了,因为一个partition只能由一个consumer来消费。
4.如何设置partition的数量?
要大于你的consumer group中consumer的个数。
5.为什么需要多个consumer group?
因为可能有不同的系统需要消费这个topic。
6.什么时候会partition rebalance?
partition rebalalnce就是partition重新分配给consumer的过程。
a.有新的consumer加入的时候
b.有consumer挂了的时候
c.当添加了新的partition的时候
7.rebalance为系统的什么特性保驾护航?
高可用性和可扩展性。
8.consumer之间的member关系(consumerGroup)是如何维护的?
依靠GroupCoordinator来维护。consumer定时给group coordinator发心跳。
实际上在api层,poll()就是发心跳的时机。
9.如果一段时间不发心跳会怎么样?
如果长时间不发心跳,那么session timeout后,group coordinator就认为这个consumer挂了,然后进行partition rebalance。
10.consumer主动close和被动挂掉,对于rebalance有什么不同的影响?
主动close,那么group coordinator会马上rebalance,这个partition的message会马上被其它consumer处理。
而被动挂掉,需要session timeout,那么可能有一段时间这个partition的数据没有consumer处理,引起较大latency。
11.group leader是干嘛的?
group leader相当于班长,group coordinator相当于班主任。
group coordinator负责维护consumer group的列表,然后把这个列表给到group leader,group leader进行partition balance的任务。
consumer的心跳是打到group coordinator的。
12.consumer.poll()除了可以拉到message,还有什么用处?
必须不断的poll()才能保持心跳。所以message的处理一定要快!
13.poll的timeout参数有什么作用?
这个参数应该是个latency-throughout tradeoff。设的小一些可以减少latency,快速响应,增大则可以提高吞吐,但latency增大。
参数基本上都是tradeoff,要不然就不需要参数了,代码里直接写死最优参数就好了。
14.什么是commit?
We call the action of updating the current position in the partition a commit.
15.consumer是如何commit的?
发送一条message给topic __consumer_offsets,message包含每个partition上的offset。
16.如果consumer crash可能会有什么影响?
如果consumer已经消费的message没有commit,那么就会重复消费这部分message。
如果consumer没有消费的message却commit了,那么这部分message就不会被真实消费了。
17.consumer有哪些管理offset的方式?
a.automatic commit
一个控制commit interval的参数,叫auto.commit.interval.ms。
那一定是每隔interval就会commit一次嘛?
不是,commit是在poll()的时候,判断距离上次commit是否超过了inteval,超过则同时进行commit,没超过则不commit。
b.commit sync
同步commit,即调用commitSync(),该函数是阻塞的,等待commit完成后返回。
这种方式就是latency小,但是无法达到高吞吐。
c.commit async
异步commit,除了不阻塞意外,和同步commit的区别是同步commit会重试,比如网络有问题,那么同步commit会重试到成功或失败。而异步不会重试。
原因是如果重试的话,可能会出现offset 3000先commit了,然后重试的offset 2000才到,这时候就会重复消费了。(即commit order问题)
所以是否sync/async,是一种reliable-throughout的权衡,一般大数据还是高吞吐更重要。reliable可以同步业务系统的幂等性达到。
18.那是用sync好还是async好呢?
一般来说,用async的方式没有太大问题,因为即便中间的offset commit丢了,只要后边有成功的commit就行了。
但是如果这是最后一次commit,之后就close了,或者rebalance了,那么这次commit就很重,不能丢。所以最后这次commit是要同步commit的。所以一个经典的pattern是同时使用async和sync。
19.上面提到的commitAsync()和commitSync(),都是提交之前poll的所有message,那么如果我们想commit指定的offset可以嘛?
可以。
比如下面是每一千条commit一次。
20.可以消费指定offset的数据嘛?
可以seek。一般用于把offset存到外部系统的时候。
21.如何保证consume exactly once呢?
先来看这个有问题的代码,有可能consumer在store record之后挂掉,那么就重复消费两次这个消息:
如何保证只消费一次呢?就让process store commit是一个原子操作就可以了。
然后在rebalance的时候,接收rebalance event,seek到db中保存的offset就可以了。
22.如果想保证exactly consume once,那么只能把offset保存在db中嘛?commit给kafka的方式一定不行嘛?
是的,commit的方式不行,因为exactly once是利用了db的transaction实现的,目前
kafka没有transaction。
23.consumer有哪些parameter?
fetch.min.bytes : latency-throughout tradeoff
fetch.max.wait.ms : latency-throughout tradeoff
max.partition.fetch.bytes: 如果太大的话处理时间过长,可能导致session timeout
session.timeout.ms : 默认3s