kafka2

KafkaRequestHandlerPool 是一个简易版的线程池,管理所有的 KafkaRequestHandler。

KafkaRequestHandler 从 RequestChannel 获取请求并调用 KafkaApis 的 handle() 方法处理请求。

run()

{

while(true)

{

//获取请求

val req = requestChannel.receiveRequest(300)

//KafkaApis 是 kafka 服务器处理请求的入口,它负责将请求分发到不同的 handle*() 进行处理

apis.handle(request)

}

}

消息存储

kafka 采用日志文件的方式保存生产者发送的消息。每条消息都有一个 offset 的值来表示它在分区中的偏移量,这个 offset 是逻辑值,并不是消息实际存放的物理地址,类似于数据库中的主键。为提高写入的性能,同一个分区的消息是顺序写入的。日志文件目录的命名规则为 “topicname-paritionid” , 比如 jfw-0 。 kafka 通过分段的方式将 log 分为多个 LogSegment ,包括一个日志文件和相对应的索引文件,日志文件用于记录消息,索引文件用于保存消息的索引。日志文件命名规则为 “baseoffset.log” ,其中 baseoffset 是日志文件中的第一个消息的 offset 。索引文件并没有为每条消息都建立索引,而是采用稀疏索引方式为日志中的部分消息建立了索引。

LogSegment 对应磁盘上一个日志文件和索引文件

log: FileRecords //日志文件

File file // 指向磁盘上对应的日志文件

FileChannel channel //用于读写对应的日志文件

AtomicInteger size //日志文件大小

boolean isSlice //是否分片

int start //分片起始位置

int end //分片结束位置

index: OffsetIndex //索引文件

_file: File // 指向磁盘上对应的索引文件

baseOffset: Long //第一条消息的 offset

mmap: MappedByteBuffer //用于读写索引文件

baseOffset: Long //第一条消息的 offset

kafka 使用 Message 类表示消息, Message 使用 ByteBuffer 保存数据,其格式如下:

offset|size|crc32|magic|attributes|timestamp|keylength|key|valuelength|value

kafka可以对批量消息进行压缩,减少数据大小,进而减少网络传输,提高消息处理性能.

1.当生产者创建压缩消息时,对消息设置的 offset 是内部 offset , 即 0,1,2,3…

2.服务端为消息分配外层 offset 时,会根据内层压缩消息的个数进行累加,也就是最后一个消息实际的 offset

offset|size|crc32|magic|attributes|timestamp|keylength|null|valuelength|value

offset|size|message

offset|size|message

3.消费者获取到压缩消息进行解压后,就可以根据外层 offset 、内层相对 offset 计算时间每个消息的 offset

索引项为 8 个字节,其中前 4 个字节是相对日志文件中第一条消息 baseOffset 的偏移量(节省了存储空间)。后 4 个字节是消息在日志文件中的具体位置。

LogSegment 源码分析

追加消息

append(firstOffset: Long, //第一条消息的 offset

largestOffset: Long, //最后一条消息的 offset

largestTimestamp: Long, //

shallowOffsetOfMaxTimestamp: Long, //

records: MemoryRecords //

)

{

//追加日志文件 FileRecords

val appendedBytes = log.append(records)

int written = records.writeFullyTo(channel);

//全部写入到 MemoryRecords 的 buffer 中

written += channel.write(buffer);

//追加索引文件

index.append(firstOffset, physicalPosition)

//写入 MappedByteBuffer

mmap.putInt((offset – baseOffset).toInt)

mmap.putInt(position)

}

读消息

read(startOffset: Long, //读取起始消息的 offset

maxOffset: Option[Long], // 读取结束消息的 offset

maxSize: Int, // 读取的最大字节数

maxPosition: Long = size, //读取的最大物理地址

minOneMessage: Boolean = false) //

{

将 startOffset 转换成物理地址,下面中的出现的小于或者等于的场景,主要是考虑到消息压缩,外层 offset 为最后一个消息的 offset

val startOffsetAndSize = translateOffset(startOffset)

//先从索引文件中 查找到最大的那个小于等于 startOffset 的索引列,找到日志文件中的物理地址

val mapping = index.lookup(offset)

//根据 上面定位到的 物理 position 依次向前找到日志文件中等于或者大于 offset 那个消息块

log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))

//minOneMessage 为 true,表示至少要返回一个消息块,即使指定了要获取的字节数小于消息块的大小,也要返回整个消息块。

val adjustedMaxSize =

if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)

else maxSize

//根据 maxOffset 计算读取消息的长度 fetchSize

// log.read(…) 返回 FileRecords 分片,并没有将数据读到内存

FetchDataInfo(offsetMetadata, log.read(startPosition, fetchSize), firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)

}

Log 管理多个 LogSegment

dir: File // log对应的磁盘目录

logStartOffset: Long // 暴漏给客户端最早的 offset

recoveryPoint: Long //恢复操作的起始 offset ,也就是没有刷新到磁盘的第一个 offset

nextOffsetMetadata: LogOffsetMetadata // 下一个消息的 offset 元数据

segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] //

//将消息追加到 active LogSegment (也就是最后一个正在写的 LogSegment),如果满足条件则新建一个 LogSegment 进行写入。

append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int)

//读消息, 通过 segments 调表快速定位到读取的起始 LogSegment 并从中读取消息

read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false, isolationLevel: IsolationLevel)

LogManager 管理一个 broker 上所有的 log,包括 log 的加载、创建、删除、查询等功能,并且启动了几个周期性的后台任务以及 cleaner 线程(日志清理)

logDirs: Seq[File] // server.properties 配置文件中 log.dir 指定的log 目录集合

ioThreads: Int // 完成 log 加载相关操作,每个 log 目录下分配指定的线程执行加载

scheduler: Scheduler // KafkaScheduler 对象,用于执行周期性任务的线程池

logs = new Pool[TopicPartition, Log] //用于管理 TopicPartition 与 Log 的映射关系

recoveryPointCheckpoints Map[File, OffsetCheckpoint] //管理每个 log 目录与其下的 RecoveryPointCheckpoint 文件之间的映射关系

任务

log-retention (日志保留 cleanupLogs):根据 LogSegment 的存活时间删除过期的 LogSegment;根据整个 Log的大小决定是否删除最旧的 LogSegment

log-flusher(日志刷写 flushDirtyLogs): 将超过刷新时长的数据刷新到磁盘

recovery-point-checkpoint(恢复检查点刷新 checkpointLogRecoveryOffsets): 将所有日志文件的 recovery point 写到 log 目录下的文件,避免启动时恢复所有的日志。

log-start-offset-checkpoint(起始检查点刷新checkpointLogStartOffsets): 将所有日志文件的 start offset 写到 log 目录下的文件,避免暴露已经被 DeleteRecordsRequest 删除的消息。

delete-logs(日志删除 deleteLogs): 删除已经被标记位删除的日志

LogManager 初始化过程

loadLogs()

{

//遍历所有 log 目录

for (dir <- liveLogDirs)

{

//为每个 log 目录分配一个有 ioThreads 条线程的线程池

val pool = Executors.newFixedThreadPool(ioThreads)

threadPools.append(pool)

//载入每个 Log 的 recoveryPoints

recoveryPoints = this.recoveryPointCheckpoints(dir).read

//载入每个 Log 的 StartOffsets

logStartOffsets = this.logStartOffsetCheckpoints(dir).read

//遍历所有的 log 目录的子文件,为每个 Log 文件夹创建一个 Runnable 任务

val jobsForDir = for {

dirContent <- Option(dir.listFiles).toList

logDir <- dirContent if logDir.isDirectory

} yield {

CoreUtils.runnable {

loadLog(logDir, recoveryPoints, logStartOffsets)

// 从目录名可以解析出 topic 和 分区编号

val topicPartition = Log.parseTopicPartitionName(logDir)

val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)

val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)

val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)

//构造 Log 对象

val current = Log(…);

//添加到 logs

val previous = this.logs.put(topicPartition, current)

}

}

// 将 jobsForDir 中的所有的任务放到线程池中执行,并将 Future 形成 Seq , 保存到 jobs 中

jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)

// 等待 jobs 中的 runnable 完成

for ((cleanShutdownFile, dirJobs) <- jobs)

{

dirJobs.foreach(_.get)

}

}

}

Log的初始化的过程会调用 Log.loadSegments 加载磁盘中的日志文件,并返回 next offset

loadSegments(): Long = {

//删除临时文件(.cleaned 和 .delete)并收集 swap 文件

val swapFiles = removeTempFilesAndCollectSwapFiles()

//.cleaned 日志压缩过程中宕机,文件中数据的状态不明确,无法进行恢复

//.swap 日志压缩已经完成,但是在 swap 的过程中宕机,文件中保存了日志压缩中的完整消息,可进行恢复

//.delete 需要删除的日志文件或索引文件

//加载日志文件和索引文件,

loadSegmentFiles()

//如果索引文件没有对应的日志文件,则删除索引文件

//如果日志文件没有对应的索引文件,则重建索引文件

recoverSegment(segment)

//将 LogSegment 放到 segments 跳表中管理

addSegment(segment)

this.segments.put(segment.baseOffset, segment)

//完成之前被打断的日志文件交换

completeSwapOperations(swapFiles)

//创建 LogSegment

val swapSegment = new LogSegment(…)

//重建索引文件

recoverSegment(swapSegment)

//查找 swapSegment 对应的日志压缩前的 LogSegment 集合

val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset())

//异步删除原有 oldSegments ,将 .swap 后缀名去掉

replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true)

//如果没有 segment 则新建一个

addSegment(new LogSegment(…))

//如果有 segment并且不是删除的目录,则进行恢复,并重置 activesegemnt 索引日志的大小

val nextOffset = recoverLog()

//如果 broker 是非正常关闭,获取全部未刷新的 LogSegment ,即 recoveryPoint 后所有的 LogSegment

val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator

//循环处理上面的 unflushed 重建索引文件并验证日志文件,验证失败的部分则截断

recoverSegment(segment, Some(leaderEpochCache))

}

    原文作者:jfwseu
    原文地址: https://zhuanlan.zhihu.com/p/35308499
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞