Kafka 读书-1:《The Kafka definitive guide》

消费者

  • Kafka最大的特点是:Kafka集群本身不去记录消费者的确认回函(我猜是性能考虑),而是允许消费者使用offset来追踪消息的提交。这个设计是出于以下的考虑:如果一个消费者down掉,或者新的消费者加入,那么都会触发rebalance,kafka会重新分配消息的发送,那么一个消费者很有可能在rebalance之后,读取的消息来自新的分区partition。
    • 这就涉及到消息队列一个常见的问题:at-most-once, at-least-once, exactly-once
    • at-most-once:消息保证不被重复处理,但是可能会丢失
    • at-least-once:消息保证不被丢失,但是可能被处理多次
    • exactly-once:消息仅被处理一次
  • 因此,在rebalance发生后,消费者需要比较的就是当前partition的offset,和最近一次提交的offset。

消费者 commit offset

  • Automatic Commit:自动提交offset,每隔5秒(可配置),Kafka消费者可以自动的提交最后一次poll轮询时返回的offset。方便,但是无法避免重复处理消息。
  • commitSync():手动同步提交,在每一次poll循环中,一定要处理完所有的批消息,然后执行commitSync,这样就可以提交本次轮询中的最大offset。commitSync在遇到异常时会重新执行直到错误消失或此错误无法恢复。缺点是消费者需要一直等待broker的回复。
  • commitAsync():手动异步提交,不会阻塞poll循环,但是没有retry机制。如果是异步提交commitAsync中使用Callback,一般这个回调是被用来log error或者发送metric。但是如果是用来实现retry机制,那么得注意提交顺序。

两者的结合使用:一般在poll循环中使用异步提交,在结束时,使用同步提交来确保所有的消息都被处理。注意,以上两个手动提交的方法,只会默认提交本次poll轮询返回的最大offset。有时候需要精确的控制提交的offset,比如每隔1000个消息,提交一次,就需要传入参数Map<TopicPartition, OffsetAndMetadata>。

Map<TopicPartition, OffsetAndMetadata> currentOffsets;
int count = 0;
....
while (true) {
   ....
   ConsumerRecord r = ...
   currentOffsets.put(new TopicPartition(r.topic(), r.partition()), new OffsetAndMetadata(r.offset()+1, ""));
   if (count % 1000 == 0) {
      consumer.commitAsync(currentOffsets);
   }
   count++;
}

但是,以上几种方法,都无法避免重复处理或者漏掉消息,因为无论哪种方法,都无法精确的预知在哪个时间点kafka会重新划分数据或者新的消费者加入。单纯的提高提交的频率,是不可靠的

消费者 ConsumerRebalanceListener

针对上述的情况,Kafka提供了ConsumerRebalanceListener接口来告诉消费者,当数据rebalance发生前后,应该如何处理,如此以来,就可以摆脱单纯的依赖提交offset,更可靠更精确的提交offset。

到这里,仍无法保证exactly-once的保证,因为上面的方法基于假设:消费者进程不会挂掉。Kafka队列本身可以做到高冗余和高可靠,但是消费者这边,却需要开发者自己来保证可靠性。即使是在k8s上将消费者作为一个pod,当消费者挂掉后,pod会重启,但是新的问题来了:重启后的消费者,从哪里开始读取消息?

内部机制

集群成员:kafka内部使用zookeeper来注册broker,zk内部会有一个id来表示broker,当zookeeper无法联系broker时,这个broker会被移除,但是这个id仍然存在。

controller:众多broker中被用来选举leader,使用zk。

replication:数据有topic标签,不同的topic被分区,每个分区有多个复制。每个broker存储这些复制。有两种复制:leader replica,每个分区只有一个复制,被称为leader,所有的请求通过leader,保证一致性;follower replica,除去leader的复制都称为follower,follower不处理请求,只复制leader最新的数据,当leader崩溃,一个follower会被选为leader。follower需要时刻与leader通信来知道消息的复制是否是最新的。

处理请求:对于一个broker,同一个client的请求按顺序处理,客户端本身也需要维护一个metadata的信息来决定消息的发送到哪一个broker(server会发送partition信息,然后客户端做决定)。

Produce Request:当一个broker上的lead replication收到一个produce请求,检查步骤如下:1 是否有写权限 2 acks数值是否正确 3 如果acks=all,检查是否符合配置参数。如果条件符合,写入本地磁盘。如果acks=0或1,立即返回;如果acks=all,请求会缓存入purgatory,等待所有消息写入后,返回给客户端。

Fetch Request:客户端发送请求,broker检查是否符合条件,数据是否存在。zero-copy使用来提高性能。broker返回数据并不是一次性返回,而是等待buffer中到达一定的size或者一定的时间,再返回。注意high water mark,因为写入数据时需要同时复制到多个replica中,每个replica中的最短的offset设置为hwm,这也是consumer能读取到的最近的数据。

兼容性:先升级broker,再升级client。broker可以保证向下兼容,client则不会。

物理存储:kafka最基本的存储单元是partition replica,意味着不能被分割。

partition allocation:分区的分配,当创建一个topic时,broker=6,replica num=3,partition=10,也就平均5个replica per broker,0号partition的leader在1号broker上,那么0号partition的follower就在2号3号broker上。这个是partition的分配机制,但是每个broker上的磁盘大小并没有考虑!

文件管理:1 文件句柄的设置 2 文件的格式,broker上存储的数据时,把partition分为Segment,1个Segment是1个文件,存储的就是producer发送的原始数据,因此可以零拷贝直接发送给consumer,producer如果压缩了数据,那么在broker上存储的数据也是压缩的。3 索引,由于consumer可以指定offset读取,kafka的数据是有索引的,记录每个分区的segment的offset的索引。

Log Compact:1 日志压缩,本质就是对于相同的key,如果value是不停的变化的,那么只存储最近的值,从而节约空间。2 因此删除数据时,value被设置为null,在kafka内存储一段时间后,才被删除。3 compact只作用于inactive的segment,压缩会影响读写性能。

可靠的数据传递

kafka提供的基本保证:

  • 同一个producer,同一个partition,数据顺序会保证
  • 消息被认为是‘提交的’,当其被写入ISR中
  • 消息不会被丢失,只要至少一个replica还在
  • 消费者只能读取‘提交的’消息

Replication:数据的复制是kafka可靠性的来源,通过把一个partition复制到多个:leader和其follower。leader和zk定时通信,follower和leader定时同步数据。网络通信的状况和GC的参数是影响因素之一,尤其是当partition不停的在in-sync和out-of-sync之间摆动。

Replication的参数:1 replication.factor = N,一个partition在N个broker上被复制N次。一般取3。2 unclean.election 当一个partition的leader不可用时,且其follower上的数据时out-of-sync的,这个时候就得做决定:是等待原先的leader恢复or选取一个follower作为新的leader?每个方法都有优缺点:前者保证数据的一致性但是恢复时间会长,后者保证服务不中断但是数据不一致。

生产者所需提供的保证:

  • 正确的acks参数:acks=0意味着最快,但是最不安全;acks=1意味着只需等待leader的回应,但是仍不安全;acks=all意味着等待所有的数据复制完成,但是最慢。
  • 处理异常:生成者需要处理两者异常错误:可重试和不可重试的,前者可以依靠官方的producer客户端自动重试,后者需要开发者自己处理。

消费者所需提供的保证:

由于kafka自身提供的high water mark,消费者被保证能一直读到一致的数据,因此消费者只需要记住自己处理过哪些消息。这也是为什么消费者最重要的是提交offset。

至于如何保证exactly-once,一般两者方法:1 消费者把数据写入key-value数据库中,当发现有相同的key时,只需要覆盖即可。2 消费者借助事务数据库,把消息和offset在同一个事务中写入以保证同步,当重启时,从数据库中取出最后一个写入的数据的offset,利用consumer.seek来取得数据,

因此官方给出的如何确保系统可靠运转的建议:1. 检查配置,官方有现成的工具org.apache.kafka.tools中VerifiableProducer和VerifiableConsumer可以用来自动测试,官方也有extensive test suits来测试 2. 检查应用本身,要考虑各个部件,消费者(如何处理rebalance,commit offset),生成者,broker的重启,宕机等情况 3. 监控生成环境,比如常见的事件时间戳,落后处理速度等

所以构建一个可靠的系统不仅仅是kafka自己的责任,生产者,消费者的api的使用,监控,具体的使用场景,加在一起,面面俱到,考虑清楚,才能立于不败之地。

    原文作者:Yi Wang
    原文地址: https://zhuanlan.zhihu.com/p/58214176
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞