概述
模式
- 发布订阅
- 采用传统消息队列思想
- 消费者从主题中读取消息,
- 生产者向消息队列的某个主题发送消息
- 消费者订阅的主题有更新, 不通知消费者,消费者自己来读取
- 队列模式
- 多个消费者读取消息队列
- 一条消息只发给一个消费者
组成
- 消息队列
- 存储系统
- 流处理系统
kafka创新
- 顺序性
- 每个主题分区内的消息是有序的
- 消费者的读取点是任意的
- 读取点后是顺序读取的
- 设置消息持久化状态
- 持久化时间
- 持久化的消息数量
- 要求消费者管理即将处理的下一条消息的偏移量
- zookeeper来处理
- kafka是顺序列入 顺序读取的
- 相互独立的消费者的偏移量都是不相关的
概念
消息
- 数据单元称为消息
- 消息持久化到磁盘
批次
- 一组消息
消息模式
- 有格式的数据
主题
- kafka消息是通过主题来分类的
- 类比,数据库的表, 文件系统的文件夹
- 逻辑上的概念
- 主题内,无序 只有分区内有序
- 保留期
- topic中的消息在规定时间内存储, 有时间限制
- 空间保留策略
- 当空间使用大小达到阈值, 可以配置来清除消息
分区
- 一个主题一般包含数个分区
- 每个分区是主题的物理划分
- 偏移量是分区中的概念
- 不同分区的消息是不同的
- offset是在一个分区内的偏移量
- offset不跨分区
- 即: 只保证分区内有序,不是主题有序
- 保证了高吞吐量
- 分区是实现kafka并行化的一种方式
- 不同分区上的写操作是并行的
- 副本机制
- 为一个分区的消息, 提供多副本备份
- 一主多从的关系
- leader副本
- follower副本
- 基于 Quorum 的方法
- 在这种方法中, leader 只会在大多数复制收到消息时标记消息提交状态
- 如果 leader 失败 选举新 leader 只会在 followers 之间进行协调。这 里有许多算法用于选举 leader
- Kafka 的 leader 在标记消息 为提交状态之前等待所有 followers 的确认
生产者
- 创建消息
- 可以有多个
- 可以有多个副本
- 提供容灾能力
消费者
- 消费消息
- Kafka 消费端也具备一 定 的 容灾能力。
- 使用pull模式拉取
- 并且保存消费的位置, offset
- 可以有多个
broker
- 一个独立的kafka服务器被称为broker
- broker 是集群的组成部分
- 服务节点
zookeeper
- 负责kafka元数据的管理
- 控制器的选举
连接器
- connector
- 将kafka主题与已有数据源进行连接
- 数据可以互相导入导出
多集群
- 随着kafka部署数量的增加, 多个集群能更好的备灾
- 多个集群之间通过MirrorMaker复制消息
集群原理
- broker
- 一个集群由多个broker组成
- 每个broker都是无状态的
- 有一个broker会被选为控制器
- 负责管理整个急群众所有分区和副本的状态
- 某个分区的leader副本发生故障时,由控制器负责为该分区选择新的leader副本
- zookeeper
- 在没有 Zookeeper 服务器的情况下, Kafka 集群无法运行,这与 Kafka 集群安装紧密藕合
- Zookeeper 是一个 Kafka集群协调器 C Coordinator)
- 通过 Kafka 管理参与消息传输的 brokers,生产者和消费者
- 帮助管理topic相关信息
- 选举topic leader
- 功能
- 集中式服务
- 管理集群成员
- 相关配置
- 集群注册功能
- 老版本还维护topic的offset
- 每个topic分区都是有状态的
- 有的topic分区为leader
- 有的分区为fellower
生产者
- 概述
- 老的Scala生产者客户端
- 新的java生产者客户端
- ProducerRecord
- topic
- partition
- header
- k
- v
- timestamp
- 参数
- boostrap server
- key.serializer
- value.serializer
- 架构
- 主线程中kafkaProducer生成消息
- 拦截器
- 序列化器
- 分区器
- 消息累计器
- 用来缓存消息, 进行批量发送
- 是个双向队列
- 发送模块
- sender线程
- 发送到kafka中
- 执行流程
- 实例化和初始化
- 配置Kafka连接信息
- kafka metrics 用于对kafka集群相关指标进行追踪
- 实例化分区器
- 实例化 key/value序列化器
- 实例化拦截器
- 实例化消息存储的RecordAccumulator
- send过程分析
- 调用send方法进行发送
- 获取 MetaData
- 获取到 Metadata 元数据信息才能真正进行消息的投递,
- 包括集群信息, 各个TopicPartition 的leader信息
- 序列化
- 获取分区。计算 ProducerRecord 将被发往的分区对应的 partitionld
- 创建 TopicPartition 对象
- 写 BufferPool 操作。这一步是调用 RecordAccumulator. append()方法将 ProducerRecord写入 RecordAccumulator 的 BufferPool 中。
- sender发送消息
- send操作并没有发送网络请求,只是发送到了缓冲区
- 网络请求是由 KafkaProducer 实例化时创建的 Sender 线程来完成的
- 获取 Cluster 信息。从 MetaData 中获取集群 Cluster 信息。
- 获取各 TopicPartition 分区的 Leader 节点集合。
- 直接向leader写入
消费者
- 消费者和消费者组
- 消费者( Consumer )负责订阅 Kafka 中的主题( Topic ),并且从订阅的主题上拉取消息。
- 当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者 。
- 默认每个消费者, 分配一个消费者组
- 消费者组
- 提供点对点功能
- 提供再均衡功能
- Low Level Consumer低级API
- 通常针对特殊的消费逻辑(比如客消费者只想要消费某些特定的Partition),
- 低级API的客户端代码需要自己实现一些和Kafka服务端相关的底层逻辑,比如选择Partition的Leader,处理Leader的故障转移等。
- 低级API主要针对SimpleConsumer,不过选举Leader,拉取消息这些都要自己去实现
- 低级消费者流程
- 低级消费者直接与指定的代理通过BlockingChannel 创建一条 Socket 连接
- 低级消费者提供了 一种灵活控制数据消费的操作
- 同一条消息多次消费、只读取某个分区信息、消费指定位置的消息等场景
- spark streaming
- Hight Level Consumer高级API
- 概述
- 提供了一个从Kafka消费数据的高层抽象,消费者客户端代码不需要管理offset的提交
- 采用了消费组的自动负载均衡功能,确保消费者的增减不会影响消息的消费
- 高级API主要使用了ConsumerGroup语义实现消费者的自动负责均衡
- 方法
- subscribe
- 来指定订阅主区
- 由同一个消费组的leader消费组根据分区分配策略, 来分配分区
- assign
- 订阅主题的某些分区
- 指定
- poll()方法,用于拉取消息:
- 指定消费起始位置:
- seek()方法、 seekToBeginning()方法和 seekToEnd()方法
- commitSync()方法和 commitAsync()方法,分别用来以同步和异步方式提交消费偏移量
- 偏移量
- 旧版本保存在zk中
- 消费位移存储在 Kafka 内 部的 主题 consumer offsets 中
- 当前消费者需要提交的消 费位移并不是 x,而是 x+ l ,对应于图中的 position ,它表示下一条需要拉取 的消息的位置
- 自动提交
- 在 Kafka 消费的编程逻辑中位移提交是一大难点
- 消费者每隔 5 秒会将拉取到的每个分区中最大的消息位移进行提交
- 自动提交消费位移的方式非常简便,它 免去了复杂的位移提交逻辑,让编码更简洁
- 但随之而来的是重复消费和消息丢失的问题。假 设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前, 消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现 象(对于再均衡的情况同样适用)
- Seek 指定位移消费
- 在 Kafka 中 每当消费者查找不到所记录的消费位移 时, 就会根据消费者客户端参数
- aut o . offset.reset 的配置来决定从何处开始进行消费,这个参数的默认值为“ latest ”,表示从分区末尾开始消费消息
- 有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的 seek()方法正好提供了这个功能,让我们得 以追前消费或回溯 消费
- spark集成
- Receiver-based
- 使用 Kafka 高级消费者 API 来实现 Receiver
- 从 Kafka topic分区接收的数据存储在 Spark executors
- 开启WAL 才能保证 exactly-one 的处理
客户端原理
- 生产者与分区
- 如果在发消息的时候指定了分区,则消息投递到指定的分区
- 如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区
- 如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区
- 分区分配策略
- 消费者的分配策略
- 同一时刻,一条消息只能被组中的一个消费者实例消费
- RangeAssignor
- 按照消 费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配, 以保证分区尽可能均匀地分配给所有的消费者
- RoundRobinAssignor
- 是将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。
- StickyAssignor 分配策略
- (l ) 分区的分配要尽可能均匀 。
- ( 2 )分区的分配尽可能与上次分配的保持相同。
生产者正好一次
- 0.11提供
- 幂等生产者
- 对接口的多次调用和调用一次,结果是一样的
- 避免写入重试造成的重复消息
- 流程
- 为了实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。
- PID。每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。
- Sequence Numbler。(对于每个PID,该Producer发送数据的每个都对应一个从0开始单调递增的Sequence Number
- Broker端在缓存中保存了这seq number,对于接收的每条消息,如果其序号比Broker缓存中序号大于1则接受它,否则将其丢弃。
- 这样就可以实现了消息重复提交了。但是,只能保证单个Producer对于同一个的Exactly Once语义。不能保证同一个Producer一个topic不同的partion幂等。
- 在事务属性之前先引入了生产者幂等性,它的作用为
- 生产者多次发送消息可以封装成一个原子操作,要么都成功,要么失败
- consumer-transform-producer模式下,因为消费者提交偏移量出现问题,导致在重复消费消息时,生产者重复生产消息
- 需要将这个模式下消费者提交偏移量操作和生成者一系列生成消息的操作封装成一个原子操作
事务
- kafka事务属性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务,或者说是是一个原子操作),同时成功或者失败
主题与分区
- 主题和分区都是逻辑上的概念
- 分区可以有一至多个副本
- 只有一个副本可以工作!
- 每个副本对应一个日志文件
- 每个日志文件对应一至多个日志分段
- 每个日志分段还可以细分为索引文件、日志存储文件和快照文件
- 副本
- 概述
- 分区使用多副本机制来提升可靠性,但只有 leader 副本对外提供读写服务,
- follower 副本只负责在内部进行消息的同步
- 如果一个分区的 leader 副本不可用,那么就意味着整个分区变得不可用
- 此时就需要 Kafka 从剩余的 follower 副本中挑选一个新的 leader 副本来继续对外提供服务
- ,但从某种程度上说 , broker 节点中 leader 副本个数的多少决定了这个节点负载的高低。
- 优先副本的选举
- 所谓的优先副本是指在 AR 集合列表中的第一个副本
- 理想情况下,优先副本就是该分区的 leader 副本,所 以也可以称之为 preferred leader
- Kafka 要确保所有主题的优先副本在 Kafka 集群中均匀分布,这样就保证 了所有分区 的 leader 均衡分布
- 分区重分配
- Kafka 并不会将这些失效的分区副本自动地 迁移到集群中剩余的可用 broker 节点上
- 当要对集群中的一个节点进行有计划的下线操作时,为了保证分区及副本的合理分配,我 们也希望通过某种方式能够将该节点上的分区副本迁移到其他的可用节点上。
- 为了解决上述问题,需要让分区副本再次进行合理的分配,也就是所谓的分区重分配
- 复制限流
- 数据复制会占用额外的资源,如果重分配的量太大必然会严重影响整体的性能,尤其是处于业务高峰期的时候。减小重分配的粒度, 以小批次的 方式来操作是一种可行的解决思路 。
- 如果集群中某个主题或某个分区 的流量在某段时间内特别大,那么只靠减小粒度是不足以应对的,这时就需要有一个限流的机制,可以对副本间的复制流量加以限制来保证重分配期间整体服务不会受太大的影响 。
- 分区优化
- 性能测试工具
- 生产者性能测试的 kafka-producerperιtest.sh
- 消费者性能测试的 kafka-consumer-perf-test. sh
- 对生产者而言,每一个分区的数据写入是完全可以并行化的
- 对消费者而言, Kafka 只 允许单个分区 中的消息被一个消费者线程消费, 一个消费 组的消费并行度完全依赖于所消费的分区数
日志存储
- 文件目录布局
- 一个分区对应一个日志( Log)
- Log 在物理上只以文件夹的形式存储
- Kafka 又引入了日志分段( LogSegment )的概念,将 Log 切分为多个 LogSegment
- 每个LogSegment 对应于磁盘上的一个日志文件和两个索引文件
- 以及可能的其他文件(比如以“ .txnindex ”为后缀的事务索引文件〉
- 向 Log 中追加消息时是顺序写入的,只有最后一个 LogSegment 才能执行写入操作
- 我们将最后一个 LogSegment 称为 “ activeSegment ”,即表示当前活跃的日志分段
- 每个 LogSegment 中的日志文件(以“ .log”为文件后缀)都有对应的两个索引文件
- 偏移量索 引 文件
- 时间戳索引文件
- 日志清理
- 日志删除
- 基于时间
- 基于大小
- 基于起始偏移量
- 日志压缩
- 按照key整合
- 保留最后一个版本
- 磁盘存储
- 疑问
- 为什么用磁盘, 会不会很慢?
- 磁盘顺序写入速度600MB/s
- 随机写入100KB/s
- 特性
- 高消息写入吞吐量
- 高消息读取吞吐量
- 高容量的复制速度
- 高磁盘刷新或 I/O
- 顺序消息追加
- Kafka 在设计时采用了文件追加的方式来写入消息,
- 即只能在日志文件的尾部追加新的消息,井且也不允许修改己写入的消息,这种方式属于典型的顺序写盘的操作
- 就算 Kafka使用磁盘作为存储介质,它所能承载的吞吐量也不容小觑
- 页缓存
- 页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作
- 把磁盘中的数据缓存到内存, 把对磁盘的访问转换为对内存的访问
- Kafka 中大量使用了页 缓存 ,这是 Kafka 实现高吞吐的重要因素之一。
- 消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的
- Kafka 中同样提供了同步刷盘和间断性强制刷盘功能
- 零拷贝
- 所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手
- 零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换
- 对 Linux操作系统而言,零拷贝技术依赖于底层的 sendfile() 方法实现 。 对应于 Java 语言, Fi l eChannal.transferTo()方法的底层实现就是 sendfile()方法 。
关键流程
事务
spark streaming
两种消费模式
Receiver接收固定时间间隔的数据(放在内存中的)
- Receiver是基于kafka的高层Consumer API来实现的
- Receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
- 基于Receiver方式读取数据,用户可以专注于所读数据,而不用关注或维护consumer的offsets,这减少了用户的工作以及代码量,而且相对比较简单。
直连的方式直接连接kafka的broker并且自己管理offset
- 替代使用Receiver来接收数据,这种方式会周期性地查询kafka,来获得每个topic+partition的最新的offset
- 高性能
- Spark 使用一个低级 消 费者API, 并直接从 Kafka 获取指定区 间的偏移量消息
- 并行度是由Kafka 的一个分区定义的, Spark direct approach 利用 了分区 的优势。
- 特点
- 并行度和吞吐量
- 没有使用 write-ahead log
- 没有 Zookeeper
- exactly-one 处理, 通过自己维护offset和RDD的性质来保证
分区数
- 每一个分区副本必须在一台机器上
一些问题
Kafka 分布式的情况下,如何保证消息的顺序?
- Kafka 分布式的单位是 Partition。如何保证消息有序,需要分几个情况讨论
- 同一个 Partition 用一个 write ahead log 组织,所以可以保证 FIFO 的顺序。
- 不同 Partition 之间不能保证顺序。但是绝大多数用户都可以通过 message key 来定义,因为同一个 key 的 message 可以保证只发送到同一个 Partition。比如说 key 是 user id,table row id 等等,所以同一个 user 或者同一个 record 的消息永远只会发送到同一个 Partition上,保证了同一个 user 或 record 的顺序。
- 当然,如果你有 key skewness 就有些麻烦,需要特殊处理。
什么是AR? ISR? OSR?
- AR
在Kafka中维护了一个AR列表,包括所有的分区的副本。AR又分为ISR和OSR。
AR = ISR + OSR。
AR、ISR、OSR、LEO、HW这些信息都被保存在Zookeeper中。
2.ISR
ISR中的副本都要同步leader中的数据,只有都同步完成了数据才认为是成功提交了,成功提交之后才能供外界访问。
在这个同步的过程中,数据即使已经写入也不能被外界访问,这个过程是通过LEO-HW机制来实现的。
3. OSR
OSR内的副本是否同步了leader的数据,不影响数据的提交,OSR内的follower尽力的去同步leader,可能数据版本会落后。
最开始所有的副本都在ISR中,在kafka工作的过程中,如果某个副本同步速度慢于replica.lag.time.max.ms指定的阈值,则被踢出ISR存入OSR,如果后续速度恢复可以回到ISR中。
ISR在Kafka环境中代表什么?
答:ISR指的是同步副本。这些通常被分类为一组消息副本,它们被同步为领导者。
leader会追踪和维护ISR中所有follower的滞后状态。如果滞后太多(数量滞后和时间滞后两个维度,replica.lag.time.max.ms和replica.lag.max.message可配置),leader会把该replica从ISR中移除。被移除ISR的replica一直在追赶leader。如下图,leader写入数据后并不会commit,只有ISR列表中的所有folower同步之后才会commit,把滞后的follower移除ISR主要是避免写消息延迟。设置ISR主要是为了broker宕掉之后,重新选举partition的leader从ISR列表中选择。
同步复制: 只有所有的follower把数据拿过去后才commit,一致性好,可用性不高。
异步复制: 只要leader拿到数据立即commit,等follower慢慢去复制,可用性高,立即返回,一致性差一些。
Commit:是指leader告诉客户端,这条数据写成功了。kafka尽量保证commit后立即leader挂掉,其他flower都有该条数据。
kafka不是完全同步,也不是完全异步,是一种ISR机制:
1. leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护
2. 如果一个flower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除
3. 当ISR中所有Replica都向Leader发送ACK时,leader才commit
Kafka中是怎么体现消息顺序性的?
kafka每个partition中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。
整个topic不保证有序
Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
分区器:根据键值确定消息应该处于哪个分区中,默认情况下使用轮询分区,可以自行实现分区器接口自定义分区逻辑
序列化器:键序列化器和值序列化器,将键和值都转为二进制流 还有反序列化器 将二进制流转为指定类型数据
拦截器:两个方法 doSend()方法会在序列化之前完成 onAcknowledgement()方法在消息确认或失败时调用 可以添加多个拦截器按顺序执行
调用顺序: 拦截器doSend() -> 序列化器 -> 分区器