1、Kafka中日志保留策略有2种,一种是按照时间/大小进行日志保留,还有一种是按照compact的策略进行保留。Logcleaner这个类就是按照compact策略来进行日志清理。简单来说,compact策略就是只保留最新的key,当存在多个key相同的消息时,只保留最新的key。
2、每个日志都由2部分的段组成:一部分是已经清理的的部分称为clean,紧接着的一部分是还未清理的部分称为dirty。未清理的部分可以进一步的划分成可清理的部分(cleanable)和不可清理的部分(uncleanable)。其中Uncleanable部分是不允许清理的。活跃的段(正在写入数据)肯定属于uncleanable部分,另外,因为topic可以设置comapct的延迟时间,未到达compact延迟时间的段也属于uncleanable。
3、Cleaning过程由后台的一组线程定期执行。每个线程会选择最脏(dirtiest)的日志进行清理。脏的程度日志中dirty部分的字节数/日志总的字节数计算所得,这个比例越大,就说明日志越脏。
4、为了实现清理的过程,Logcleaner会首先构造一个key=>last_offset的Map,该Map包含了dirty部分的所有消息,key就是消息的key,value就是消息的offset。
5、一旦offsetMap构建完成,cleaner就会重新拷贝日志中的段文件,但是过滤掉key在offsetMap中并且偏移量小于offsetMap中的偏移量的消息。这样拷贝完成后的段文件就会过滤掉那些过期的消息。
6、为了避免段文件多次清理过后变得太小,Cleaner在每次清理过后都会把较小的连续的段文件进行merge,变成较大的文件。
7、payload为null的消息会被Logcleaner删除掉。这类消息只会保留一段时间,这段时间在topic中进行配置,并且从消息所在的段进入到日志的clean部分开始计时,一旦达到时间,message被认为可以删除,当日志再次进行clean的时候就会被删除。
8、clean的过程也要与幂等性/事务性的生产者进行兼容:
生产者启用了幂等性,会为消息维持一个连续的序列号。所以Logcleaner必须要为每个活跃的生产者维持住最后的一批消息,即使这批消息已经过期。这批消息只有在新的消息产生后或者对应的生产者不活跃了,才能被删除。
Logcleaner只会清理被提交的或者终止的事物里的消息,未被提交的事物里的消息不会清理。
被终止的事物里的消息会被cleaner直接清理。
Cleaner会在某个transaction中的所有的消息被移除后,并且确保消费者不会再从该事物中消费消息后, 才会移除transaction marker。
Logcleaner.scala代码
下面是主要部分的源码(Kafka1.0)分析
1、 startup方法启动cleaner线程。线程数量友config.numThreads决定。
/**
* Start the background cleaning
*/
def startup() {
info("Starting the log cleaner")
(0 until config.numThreads).foreach { i =>
val cleaner = new CleanerThread(i)
cleaners += cleaner
cleaner.start()
}
}
2、CleanerThread是真正的进行日志清理的线程。每个线程会定期选择最脏的日志,清理,换入到cleaned segments中国。
清理的逻辑从cleanOrSleep方法开始,首先调用grabFilthiestCompactedLog方法选择最脏的日志,如果找到了最脏的日志,就调用cleaner.clean清理。
注意如果日志同时配置了compact、delete两种清理策略,那么在执行完了clean后,还需要执行delete。这也是为什么cleanOrSleep方法的后半部分还有一段delete的逻辑。调用cleanerManager.deletableLogs()获取需要删除的log,然后直接调用log.deleteOldSegments()删除日志。
private def cleanOrSleep() {
val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
case None =>
false
case Some(cleanable) =>
// there's a log, clean it
var endOffset = cleanable.firstDirtyOffset
try {
val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable)
recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
endOffset = nextDirtyOffset
} catch {
case _: LogCleaningAbortedException => // task can be aborted, let it go.
case _: KafkaStorageException => // partition is already offline. let it go.
case e: IOException =>
val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException"
logDirFailureChannel.maybeAddOfflineLogDir(cleanable.log.dir.getParent, msg, e)
} finally {
cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
}
true
}
val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
deletable.foreach{
case (topicPartition, log) =>
try {
log.deleteOldSegments()
} finally {
cleanerManager.doneDeleting(topicPartition)
}
}
if (!cleaned)
pause(config.backOffMs, TimeUnit.MILLISECONDS)
}
}
3、grabFilthiestCompactedLog方法选择出接下来需要被清理的日志。并且添加到in-progress中。
def grabFilthiestCompactedLog(time: Time): Option[LogToClean] = {
inLock(lock) {
val now = time.milliseconds
this.timeOfLastRun = now
val lastClean = allCleanerCheckpoints
val dirtyLogs = logs.filter {
case (_, log) => log.config.compact // match logs that are marked as compacted,只匹配标记为compact的日志
}.filterNot {
case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress , 跳过正在处理的日志
}.map {
case (topicPartition, log) => // create a LogToClean instance for each,为每一个日志创建一个LogToClean实例
val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicPartition,
lastClean, now) //计算Dirty部分的第一个偏移量,计算Uncleanable部分的第一个偏移量
LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)//生成LogToClean实例
}.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
// and must meet the minimum threshold for dirty byte ratio,一定要满足最小的dirty byte ration,才有资格被清理
val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
if(cleanableLogs.isEmpty) {
None
} else {
//选择最脏的一个,添加到inProgress中
val filthiest = cleanableLogs.max
inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
Some(filthiest)
}
}
}
4、选择了最脏的日志后,就需要调用clean方法清理了,clean在清理之前,需要先计算一个时间,用这个时间去清理掉日志中的tombstone,即那些payload为null需要删除掉的数据。
前面提到过,这些tombstone需要在进入到clean部分的段以后,并且在clean部分驻留了一定的时间(config.deleteRetentionMs)后才会被认定可以被移除。所以这个时间的计算方式为,clean部分的段文件中的最后一个段的修改时间 – config.deleteRetentionMs。
在计算完了这个时间后,再继续调用doClean方法。
private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
// figure out the timestamp below which it is safe to remove delete tombstones
// this position is defined to be a configurable time beneath the last modified time of the last clean segment
val deleteHorizonMs =
cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
case None => 0L
case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
}
doClean(cleanable, deleteHorizonMs)
}
5、doClean方法,首先构造offsetMap,将 cleanable.firstDirtyOffset到upperBoundOffset这2个偏移量之间的数据填充到Map中。调用groupSegmentsBySize方法将日志中的段文件分组,每组作为一个整体调用cleanSegements进行clean并归并成一个单独的段。
private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
info("Beginning cleaning of log %s.".format(cleanable.log.name))
val log = cleanable.log
val stats = new CleanerStats()
// build the offset map
info("Building offset map for %s...".format(cleanable.log.name))
val upperBoundOffset = cleanable.firstUncleanableOffset
buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats)
val endOffset = offsetMap.latestOffset + 1
stats.indexDone()
// determine the timestamp up to which the log will be cleaned
// this is the lower of the last active segment and the compaction lag
val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
// group the segments and clean the groups
info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset))
cleanSegments(log, group, offsetMap, deleteHorizonMs, stats)
// record buffer utilization
stats.bufferUtilization = offsetMap.utilization
stats.allDone()
(endOffset, stats)
}
6、cleanSegments清理一组段文件,并生成一个单个的段。
private[log] def cleanSegments(log: Log,
segments: Seq[LogSegment],
map: OffsetMap,
deleteHorizonMs: Long,
stats: CleanerStats) {
def deleteCleanedFileIfExists(file: File): Unit = {
Files.deleteIfExists(new File(file.getPath + Log.CleanedFileSuffix).toPath)
}
// create a new segment with a suffix appended to the name of the log and indexes,首先创建一个带后缀的新的段。
val firstSegment = segments.head
deleteCleanedFileIfExists(firstSegment.log.file)
deleteCleanedFileIfExists(firstSegment.offsetIndex.file)
deleteCleanedFileIfExists(firstSegment.timeIndex.file)
deleteCleanedFileIfExists(firstSegment.txnIndex.file)
val baseOffset = firstSegment.baseOffset
//生成新的段
val cleaned = LogSegment.open(log.dir, baseOffset, log.config, time, fileSuffix = Log.CleanedFileSuffix,
initFileSize = log.initFileSize, preallocate = log.config.preallocate)
try {
// clean segments into the new destination segment
val iter = segments.iterator
var currentSegmentOpt: Option[LogSegment] = Some(iter.next())
//循环遍历需要清理的段,挨个清理
while (currentSegmentOpt.isDefined) {
val currentSegment = currentSegmentOpt.get
val nextSegmentOpt = if (iter.hasNext) Some(iter.next()) else None
//获取清理的开始的偏移量、结束偏移量
val startOffset = currentSegment.baseOffset
val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(map.latestOffset + 1)
//获取终止的事物
val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
//获取事物的mata
val transactionMetadata = CleanedTransactionMetadata(abortedTransactions, Some(cleaned.txnIndex))
//当前的段是否需要删除数据
val retainDeletes = currentSegment.lastModified > deleteHorizonMs
info(s"Cleaning segment $startOffset in log ${log.name} (largest timestamp ${new Date(currentSegment.largestTimestamp)}) " +
s"into ${cleaned.baseOffset}, ${if(retainDeletes) "retaining" else "discarding"} deletes.")
//将当前段清理,拷贝到目的段中。
cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletes, log.config.maxMessageSize,
transactionMetadata, log.activeProducersWithLastSequence, stats)
currentSegmentOpt = nextSegmentOpt
}
cleaned.onBecomeInactiveSegment()
// flush new segment to disk before swap
cleaned.flush()
// update the modification date to retain the last modified date of the original files
val modified = segments.last.lastModified
cleaned.lastModified = modified
// swap in new segment
info(s"Swapping in cleaned segment ${cleaned.baseOffset} for segment(s) ${segments.map(_.baseOffset).mkString(",")} " +
s"in log ${log.name}")
//用新的段取代老的段。
log.replaceSegments(cleaned, segments)
} catch {
case e: LogCleaningAbortedException =>
try cleaned.deleteIfExists()
catch {
case deleteException: Exception =>
e.addSuppressed(deleteException)
} finally throw e
}
}