KafkaRequestHandlerPool 是一个简易版的线程池,管理所有的 KafkaRequestHandler。
KafkaRequestHandler 从 RequestChannel 获取请求并调用 KafkaApis 的 handle() 方法处理请求。
run()
{
while(true)
{
//获取请求
val req = requestChannel.receiveRequest(300)
//KafkaApis 是 kafka 服务器处理请求的入口,它负责将请求分发到不同的 handle*() 进行处理
apis.handle(request)
}
}
消息存储
kafka 采用日志文件的方式保存生产者发送的消息。每条消息都有一个 offset 的值来表示它在分区中的偏移量,这个 offset 是逻辑值,并不是消息实际存放的物理地址,类似于数据库中的主键。为提高写入的性能,同一个分区的消息是顺序写入的。日志文件目录的命名规则为 “topicname-paritionid” , 比如 jfw-0 。 kafka 通过分段的方式将 log 分为多个 LogSegment ,包括一个日志文件和相对应的索引文件,日志文件用于记录消息,索引文件用于保存消息的索引。日志文件命名规则为 “baseoffset.log” ,其中 baseoffset 是日志文件中的第一个消息的 offset 。索引文件并没有为每条消息都建立索引,而是采用稀疏索引方式为日志中的部分消息建立了索引。
LogSegment 对应磁盘上一个日志文件和索引文件
log: FileRecords //日志文件
File file // 指向磁盘上对应的日志文件
FileChannel channel //用于读写对应的日志文件
AtomicInteger size //日志文件大小
boolean isSlice //是否分片
int start //分片起始位置
int end //分片结束位置
index: OffsetIndex //索引文件
_file: File // 指向磁盘上对应的索引文件
baseOffset: Long //第一条消息的 offset
mmap: MappedByteBuffer //用于读写索引文件
baseOffset: Long //第一条消息的 offset
kafka 使用 Message 类表示消息, Message 使用 ByteBuffer 保存数据,其格式如下:
offset|size|crc32|magic|attributes|timestamp|keylength|key|valuelength|value
kafka可以对批量消息进行压缩,减少数据大小,进而减少网络传输,提高消息处理性能.
1.当生产者创建压缩消息时,对消息设置的 offset 是内部 offset , 即 0,1,2,3…
2.服务端为消息分配外层 offset 时,会根据内层压缩消息的个数进行累加,也就是最后一个消息实际的 offset
offset|size|crc32|magic|attributes|timestamp|keylength|null|valuelength|value
offset|size|message
offset|size|message
3.消费者获取到压缩消息进行解压后,就可以根据外层 offset 、内层相对 offset 计算时间每个消息的 offset
索引项为 8 个字节,其中前 4 个字节是相对日志文件中第一条消息 baseOffset 的偏移量(节省了存储空间)。后 4 个字节是消息在日志文件中的具体位置。
LogSegment 源码分析
追加消息
append(firstOffset: Long, //第一条消息的 offset
largestOffset: Long, //最后一条消息的 offset
largestTimestamp: Long, //
shallowOffsetOfMaxTimestamp: Long, //
records: MemoryRecords //
)
{
//追加日志文件 FileRecords
val appendedBytes = log.append(records)
int written = records.writeFullyTo(channel);
//全部写入到 MemoryRecords 的 buffer 中
written += channel.write(buffer);
//追加索引文件
index.append(firstOffset, physicalPosition)
//写入 MappedByteBuffer
mmap.putInt((offset – baseOffset).toInt)
mmap.putInt(position)
}
读消息
read(startOffset: Long, //读取起始消息的 offset
maxOffset: Option[Long], // 读取结束消息的 offset
maxSize: Int, // 读取的最大字节数
maxPosition: Long = size, //读取的最大物理地址
minOneMessage: Boolean = false) //
{
将 startOffset 转换成物理地址,下面中的出现的小于或者等于的场景,主要是考虑到消息压缩,外层 offset 为最后一个消息的 offset
val startOffsetAndSize = translateOffset(startOffset)
//先从索引文件中 查找到最大的那个小于等于 startOffset 的索引列,找到日志文件中的物理地址
val mapping = index.lookup(offset)
//根据 上面定位到的 物理 position 依次向前找到日志文件中等于或者大于 offset 那个消息块
log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
//minOneMessage 为 true,表示至少要返回一个消息块,即使指定了要获取的字节数小于消息块的大小,也要返回整个消息块。
val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
else maxSize
//根据 maxOffset 计算读取消息的长度 fetchSize
// log.read(…) 返回 FileRecords 分片,并没有将数据读到内存
FetchDataInfo(offsetMetadata, log.read(startPosition, fetchSize), firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}
Log 管理多个 LogSegment
dir: File // log对应的磁盘目录
logStartOffset: Long // 暴漏给客户端最早的 offset
recoveryPoint: Long //恢复操作的起始 offset ,也就是没有刷新到磁盘的第一个 offset
nextOffsetMetadata: LogOffsetMetadata // 下一个消息的 offset 元数据
segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] //
//将消息追加到 active LogSegment (也就是最后一个正在写的 LogSegment),如果满足条件则新建一个 LogSegment 进行写入。
append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int)
//读消息, 通过 segments 调表快速定位到读取的起始 LogSegment 并从中读取消息
read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false, isolationLevel: IsolationLevel)
LogManager 管理一个 broker 上所有的 log,包括 log 的加载、创建、删除、查询等功能,并且启动了几个周期性的后台任务以及 cleaner 线程(日志清理)
logDirs: Seq[File] // server.properties 配置文件中 log.dir 指定的log 目录集合
ioThreads: Int // 完成 log 加载相关操作,每个 log 目录下分配指定的线程执行加载
scheduler: Scheduler // KafkaScheduler 对象,用于执行周期性任务的线程池
logs = new Pool[TopicPartition, Log] //用于管理 TopicPartition 与 Log 的映射关系
recoveryPointCheckpoints Map[File, OffsetCheckpoint] //管理每个 log 目录与其下的 RecoveryPointCheckpoint 文件之间的映射关系
任务
log-retention (日志保留 cleanupLogs):根据 LogSegment 的存活时间删除过期的 LogSegment;根据整个 Log的大小决定是否删除最旧的 LogSegment
log-flusher(日志刷写 flushDirtyLogs): 将超过刷新时长的数据刷新到磁盘
recovery-point-checkpoint(恢复检查点刷新 checkpointLogRecoveryOffsets): 将所有日志文件的 recovery point 写到 log 目录下的文件,避免启动时恢复所有的日志。
log-start-offset-checkpoint(起始检查点刷新checkpointLogStartOffsets): 将所有日志文件的 start offset 写到 log 目录下的文件,避免暴露已经被 DeleteRecordsRequest 删除的消息。
delete-logs(日志删除 deleteLogs): 删除已经被标记位删除的日志
LogManager 初始化过程
loadLogs()
{
//遍历所有 log 目录
for (dir <- liveLogDirs)
{
//为每个 log 目录分配一个有 ioThreads 条线程的线程池
val pool = Executors.newFixedThreadPool(ioThreads)
threadPools.append(pool)
//载入每个 Log 的 recoveryPoints
recoveryPoints = this.recoveryPointCheckpoints(dir).read
//载入每个 Log 的 StartOffsets
logStartOffsets = this.logStartOffsetCheckpoints(dir).read
//遍历所有的 log 目录的子文件,为每个 Log 文件夹创建一个 Runnable 任务
val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
} yield {
CoreUtils.runnable {
loadLog(logDir, recoveryPoints, logStartOffsets)
// 从目录名可以解析出 topic 和 分区编号
val topicPartition = Log.parseTopicPartitionName(logDir)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
//构造 Log 对象
val current = Log(…);
//添加到 logs
val previous = this.logs.put(topicPartition, current)
}
}
// 将 jobsForDir 中的所有的任务放到线程池中执行,并将 Future 形成 Seq , 保存到 jobs 中
jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)
// 等待 jobs 中的 runnable 完成
for ((cleanShutdownFile, dirJobs) <- jobs)
{
dirJobs.foreach(_.get)
}
}
}
Log的初始化的过程会调用 Log.loadSegments 加载磁盘中的日志文件,并返回 next offset
loadSegments(): Long = {
//删除临时文件(.cleaned 和 .delete)并收集 swap 文件
val swapFiles = removeTempFilesAndCollectSwapFiles()
//.cleaned 日志压缩过程中宕机,文件中数据的状态不明确,无法进行恢复
//.swap 日志压缩已经完成,但是在 swap 的过程中宕机,文件中保存了日志压缩中的完整消息,可进行恢复
//.delete 需要删除的日志文件或索引文件
//加载日志文件和索引文件,
loadSegmentFiles()
//如果索引文件没有对应的日志文件,则删除索引文件
//如果日志文件没有对应的索引文件,则重建索引文件
recoverSegment(segment)
//将 LogSegment 放到 segments 跳表中管理
addSegment(segment)
this.segments.put(segment.baseOffset, segment)
//完成之前被打断的日志文件交换
completeSwapOperations(swapFiles)
//创建 LogSegment
val swapSegment = new LogSegment(…)
//重建索引文件
recoverSegment(swapSegment)
//查找 swapSegment 对应的日志压缩前的 LogSegment 集合
val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset())
//异步删除原有 oldSegments ,将 .swap 后缀名去掉
replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true)
//如果没有 segment 则新建一个
addSegment(new LogSegment(…))
//如果有 segment并且不是删除的目录,则进行恢复,并重置 activesegemnt 索引日志的大小
val nextOffset = recoverLog()
//如果 broker 是非正常关闭,获取全部未刷新的 LogSegment ,即 recoveryPoint 后所有的 LogSegment
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
//循环处理上面的 unflushed 重建索引文件并验证日志文件,验证失败的部分则截断
recoverSegment(segment, Some(leaderEpochCache))
}