Kafka Logcleaner源码分析

1、Kafka中日志保留策略有2种,一种是按照时间/大小进行日志保留,还有一种是按照compact的策略进行保留。Logcleaner这个类就是按照compact策略来进行日志清理。简单来说,compact策略就是只保留最新的key,当存在多个key相同的消息时,只保留最新的key。

2、每个日志都由2部分的段组成:一部分是已经清理的的部分称为clean,紧接着的一部分是还未清理的部分称为dirty。未清理的部分可以进一步的划分成可清理的部分(cleanable)和不可清理的部分(uncleanable)。其中Uncleanable部分是不允许清理的。活跃的段(正在写入数据)肯定属于uncleanable部分,另外,因为topic可以设置comapct的延迟时间,未到达compact延迟时间的段也属于uncleanable。

3、Cleaning过程由后台的一组线程定期执行。每个线程会选择最脏(dirtiest)的日志进行清理。脏的程度日志中dirty部分的字节数/日志总的字节数计算所得,这个比例越大,就说明日志越脏。

4、为了实现清理的过程,Logcleaner会首先构造一个key=>last_offset的Map,该Map包含了dirty部分的所有消息,key就是消息的key,value就是消息的offset。

5、一旦offsetMap构建完成,cleaner就会重新拷贝日志中的段文件,但是过滤掉key在offsetMap中并且偏移量小于offsetMap中的偏移量的消息。这样拷贝完成后的段文件就会过滤掉那些过期的消息。

6、为了避免段文件多次清理过后变得太小,Cleaner在每次清理过后都会把较小的连续的段文件进行merge,变成较大的文件。

7、payload为null的消息会被Logcleaner删除掉。这类消息只会保留一段时间,这段时间在topic中进行配置,并且从消息所在的段进入到日志的clean部分开始计时,一旦达到时间,message被认为可以删除,当日志再次进行clean的时候就会被删除。

8、clean的过程也要与幂等性/事务性的生产者进行兼容:

生产者启用了幂等性,会为消息维持一个连续的序列号。所以Logcleaner必须要为每个活跃的生产者维持住最后的一批消息,即使这批消息已经过期。这批消息只有在新的消息产生后或者对应的生产者不活跃了,才能被删除。

Logcleaner只会清理被提交的或者终止的事物里的消息,未被提交的事物里的消息不会清理。

被终止的事物里的消息会被cleaner直接清理。

Cleaner会在某个transaction中的所有的消息被移除后,并且确保消费者不会再从该事物中消费消息后, 才会移除transaction marker。

Logcleaner.scala代码

下面是主要部分的源码(Kafka1.0)分析

1、 startup方法启动cleaner线程。线程数量友config.numThreads决定。

/**
   * Start the background cleaning
   */
 def startup() {
 info("Starting the log cleaner")
    (0 until config.numThreads).foreach { i =>
 val cleaner = new CleanerThread(i)
 cleaners += cleaner
 cleaner.start()
    }
  }

2、CleanerThread是真正的进行日志清理的线程。每个线程会定期选择最脏的日志,清理,换入到cleaned segments中国。

清理的逻辑从cleanOrSleep方法开始,首先调用grabFilthiestCompactedLog方法选择最脏的日志,如果找到了最脏的日志,就调用cleaner.clean清理。

注意如果日志同时配置了compact、delete两种清理策略,那么在执行完了clean后,还需要执行delete。这也是为什么cleanOrSleep方法的后半部分还有一段delete的逻辑。调用cleanerManager.deletableLogs()获取需要删除的log,然后直接调用log.deleteOldSegments()删除日志。

private def cleanOrSleep() {
 val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
 case None =>
 false
 case Some(cleanable) =>
 // there's a log, clean it
 var endOffset = cleanable.firstDirtyOffset
 try {
 val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable)
 recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
 endOffset = nextDirtyOffset
          } catch {
 case _: LogCleaningAbortedException => // task can be aborted, let it go.
 case _: KafkaStorageException => // partition is already offline. let it go.
 case e: IOException =>
 val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException"
 logDirFailureChannel.maybeAddOfflineLogDir(cleanable.log.dir.getParent, msg, e)
          } finally {
 cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
          }
 true
      }
 val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
 deletable.foreach{
 case (topicPartition, log) =>
 try {
 log.deleteOldSegments()
          } finally {
 cleanerManager.doneDeleting(topicPartition)
          }
      }
 if (!cleaned)
 pause(config.backOffMs, TimeUnit.MILLISECONDS)
    }
}

3、grabFilthiestCompactedLog方法选择出接下来需要被清理的日志。并且添加到in-progress中。

  def grabFilthiestCompactedLog(time: Time): Option[LogToClean] = {
    inLock(lock) {
      val now = time.milliseconds
      this.timeOfLastRun = now
      val lastClean = allCleanerCheckpoints
      val dirtyLogs = logs.filter {
        case (_, log) => log.config.compact  // match logs that are marked as compacted,只匹配标记为compact的日志
      }.filterNot {
        case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress , 跳过正在处理的日志
      }.map {
        case (topicPartition, log) => // create a LogToClean instance for each,为每一个日志创建一个LogToClean实例
          val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicPartition,
            lastClean, now) //计算Dirty部分的第一个偏移量,计算Uncleanable部分的第一个偏移量
          LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)//生成LogToClean实例
      }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
 
      this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0 
      // and must meet the minimum threshold for dirty byte ratio,一定要满足最小的dirty byte ration,才有资格被清理
      val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
      if(cleanableLogs.isEmpty) { 
        None
      } else {
       //选择最脏的一个,添加到inProgress中
        val filthiest = cleanableLogs.max
        inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
        Some(filthiest)
      }
    }
  }

4、选择了最脏的日志后,就需要调用clean方法清理了,clean在清理之前,需要先计算一个时间,用这个时间去清理掉日志中的tombstone,即那些payload为null需要删除掉的数据。

前面提到过,这些tombstone需要在进入到clean部分的段以后,并且在clean部分驻留了一定的时间(config.deleteRetentionMs)后才会被认定可以被移除。所以这个时间的计算方式为,clean部分的段文件中的最后一个段的修改时间 – config.deleteRetentionMs。

在计算完了这个时间后,再继续调用doClean方法。

  private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
    // figure out the timestamp below which it is safe to remove delete tombstones
    // this position is defined to be a configurable time beneath the last modified time of the last clean segment
    val deleteHorizonMs =
      cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
        case None => 0L
        case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
    }
 
    doClean(cleanable, deleteHorizonMs)
  } 

5、doClean方法,首先构造offsetMap,将 cleanable.firstDirtyOffset到upperBoundOffset这2个偏移量之间的数据填充到Map中。调用groupSegmentsBySize方法将日志中的段文件分组,每组作为一个整体调用cleanSegements进行clean并归并成一个单独的段。

private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
    info("Beginning cleaning of log %s.".format(cleanable.log.name))
 
    val log = cleanable.log
    val stats = new CleanerStats()
 
    // build the offset map
    info("Building offset map for %s...".format(cleanable.log.name))
    val upperBoundOffset = cleanable.firstUncleanableOffset
    buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats)
    val endOffset = offsetMap.latestOffset + 1
    stats.indexDone()
 
    // determine the timestamp up to which the log will be cleaned
    // this is the lower of the last active segment and the compaction lag
    val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L)
 
 
    // group the segments and clean the groups
    info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs)))
    for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize, cleanable.firstUncleanableOffset))
      cleanSegments(log, group, offsetMap, deleteHorizonMs, stats)
 
    // record buffer utilization
    stats.bufferUtilization = offsetMap.utilization
 
    stats.allDone()
 
    (endOffset, stats)
  }

6、cleanSegments清理一组段文件,并生成一个单个的段。

private[log] def cleanSegments(log: Log,
                                 segments: Seq[LogSegment],
                                 map: OffsetMap,
                                 deleteHorizonMs: Long,
                                 stats: CleanerStats) {
 
    def deleteCleanedFileIfExists(file: File): Unit = {
      Files.deleteIfExists(new File(file.getPath + Log.CleanedFileSuffix).toPath)
    }
 
    // create a new segment with a suffix appended to the name of the log and indexes,首先创建一个带后缀的新的段。
    val firstSegment = segments.head
    deleteCleanedFileIfExists(firstSegment.log.file)
    deleteCleanedFileIfExists(firstSegment.offsetIndex.file)
    deleteCleanedFileIfExists(firstSegment.timeIndex.file)
    deleteCleanedFileIfExists(firstSegment.txnIndex.file)
 
    val baseOffset = firstSegment.baseOffset
   //生成新的段 
 val cleaned = LogSegment.open(log.dir, baseOffset, log.config, time, fileSuffix = Log.CleanedFileSuffix,
      initFileSize = log.initFileSize, preallocate = log.config.preallocate)
 
    try {
      // clean segments into the new destination segment
      val iter = segments.iterator
      var currentSegmentOpt: Option[LogSegment] = Some(iter.next())
 //循环遍历需要清理的段,挨个清理
while (currentSegmentOpt.isDefined) {
        val currentSegment = currentSegmentOpt.get
        val nextSegmentOpt = if (iter.hasNext) Some(iter.next()) else None
         //获取清理的开始的偏移量、结束偏移量
        val startOffset = currentSegment.baseOffset
        val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(map.latestOffset + 1)
 //获取终止的事物
val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
//获取事物的mata 
        val transactionMetadata = CleanedTransactionMetadata(abortedTransactions, Some(cleaned.txnIndex))
        //当前的段是否需要删除数据  
        val retainDeletes = currentSegment.lastModified > deleteHorizonMs
        info(s"Cleaning segment $startOffset in log ${log.name} (largest timestamp ${new Date(currentSegment.largestTimestamp)}) " +
          s"into ${cleaned.baseOffset}, ${if(retainDeletes) "retaining" else "discarding"} deletes.")
        //将当前段清理,拷贝到目的段中。
        cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletes, log.config.maxMessageSize,
          transactionMetadata, log.activeProducersWithLastSequence, stats)
 
        currentSegmentOpt = nextSegmentOpt
      }
 
      cleaned.onBecomeInactiveSegment()
      // flush new segment to disk before swap
      cleaned.flush()
 
      // update the modification date to retain the last modified date of the original files
      val modified = segments.last.lastModified
      cleaned.lastModified = modified
 
      // swap in new segment
      info(s"Swapping in cleaned segment ${cleaned.baseOffset} for segment(s) ${segments.map(_.baseOffset).mkString(",")} " +
        s"in log ${log.name}")
      //用新的段取代老的段。
      log.replaceSegments(cleaned, segments)
    } catch {
      case e: LogCleaningAbortedException =>
        try cleaned.deleteIfExists()
        catch {
          case deleteException: Exception =>
            e.addSuppressed(deleteException)
        } finally throw e
    }
  }

    原文作者:wang
    原文地址: https://zhuanlan.zhihu.com/p/69315171
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞