1. 程式人生 > >Kafka 日誌清理機制——LogCompact(七)

Kafka 日誌清理機制——LogCompact(七)

文章目錄

一. 日誌清理是幹什麼的?

kafka的日誌清理機制主要用於縮減日誌的大小,它並不是指通過壓縮演算法對日誌檔案進行壓縮,而是對重複的日誌進行清理來達到目的。在日誌清理過程中,會清理重複的key,最後只會保留最後一條key,可以理解為map的put方法。在清理完後,一些segment的檔案大小就會變小,這時候,kafka會將那些小的檔案再合併成一個大的segment檔案。

另外,通過日誌清理功能,我們可以做到刪除某個key的功能。推送value為null的key到kafka,kafka在做日誌清理時就會將這條key從日誌中刪去。

在這裡插入圖片描述

二. 清理相關原理

對於每一個kafka partition的日誌,以segment為單位,都會被分為兩部分,已清理未清理的部分。同時,未清理的那部分又分為可以清理的不可清理的

每個日誌目錄下都會有一個檔案cleaner-offset-checkpoint來記錄當前清理到哪裡了,這時候kafka就知道哪部分是已經清理的,哪部分是未清理的。

接著,在未清理的segment中,找出可以清理的那部分segment。首先,active segment肯定是不能清理的。接著kafka會根據min.compaction.lag.ms

配置找出不能清理的segment,規則是根據segment最後的一條記錄的插入時間是否已經超過最小保留時間,如果沒有,這個segment就不能清理。這是為了保證日誌至少存留多長時間才會被清理。

找出可以清理的segment後,kafka會構建一個SkimpyOffsetMap物件,這個物件是一個key與offset的對映關係的雜湊表。接著會遍歷可以清理那部分的segment的每一條日誌,然後將key和offset存到SkimpyOffsetMap中。

之後,再遍歷已清理部分可以清理部分的segment的每一條日誌,根據SkimpyOffsetMap來判斷是否保留。假設一條日誌key的offset是1,但是在SkimpyOffsetMap中對應key的offset是100,那麼這條日誌就可以清楚掉了。

最後,再兩次遍歷後,可清理部分的segment已變已清理的segment了。同時cleaner checkpoint會執行已經清理的segment的最後一條offset。

三、墓碑訊息(tombstone)

對於value為null的日誌,kafka稱這種日誌為tombstone,也就是墓碑訊息。在執行日誌清理時,會刪除到期的墓碑訊息。墓碑訊息的存放時間和broker的配置log.cleaner.delete.retention.ms有關,它的預設值是24小時。

kafka做日誌清理時,會根據一些規則判斷是否要保留墓碑訊息。判斷規則如下:

所在LogSegment的lastModifiedTime + deleteRetionMs > 可清理部分中最後一個LogSegment的lastModifiedTime

所以,墓碑訊息的保留時間和已清理部分的最後一個segment有關係。

四、日誌segment合併

再經過一次次清理後,各個segment大小會慢慢變小。為了避免日誌目錄下有過多的小檔案,kafka在每次日誌清理後會進行小檔案日誌合併。kafka會保證合併後的segment大小不超過segmentSize(通過log.segments.bytes設定,預設值是1G),且對應的索引檔案佔用大小之和不超過maxIndexSize(可以通過broker端引數log.index.interval.bytes設定,預設值為10MB)。

下面是日誌合併的示意圖:

在這裡插入圖片描述

五、清理執行緒的啟動

kafka日誌清理是交給LogCleaner元件來完成的。

kafka在啟動LogManager時,如果日誌清理機制開啟的話,就會啟動LogCleaner元件開始定時的清理日誌。是否開啟日誌清理是由broker的log.cleaner.enable來決定的,預設是開啟的。

LogCleaner啟動後,會註冊n個執行緒CleanerThread,開始不斷的檢查日誌並清理。這個執行緒數量和broker的配置log.cleaner.threads有關係,預設值是1。當清理執行緒啟動後,就開始檢查是否有日誌需要清理,接著清理完再檢查是否有日誌需要清理。如果發現沒有需要清理的日誌,這個執行緒會進入休眠,休眠時間根據broker的log.cleaner.backoff.ms來決定,預設值是15000。

//LogCleaner.scala
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs)
private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
def startup() {
    info("Starting the log cleaner")
    cleaners.foreach(_.start())
}
//CleanerThread.scala
override def doWork() {
   cleanOrSleep()
}
private def cleanOrSleep() {
    //獲取哪些日誌可以清理,grabFilthiestCompactedLog方法只會返回一個partition的日誌
      val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
        case None =>
          false
        case Some(cleanable) =>
          //這裡拿到要清理的日誌
          var endOffset = cleanable.firstDirtyOffset
          try {
              //開始清理日誌
            val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable)
            recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
            endOffset = nextDirtyOffset
          } catch {
            case _: LogCleaningAbortedException => // task can be aborted, let it go.
          } finally {
            cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
          }
          true
      }
    	//刪除一些舊的日誌
      val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
      deletable.foreach{
        case (topicPartition, log) =>
          try {
            log.deleteOldSegments()
          } finally {
            cleanerManager.doneDeleting(topicPartition)
          }
      }
      //如果沒有要清理的日誌,就進入休眠
      if (!cleaned)
        backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
}

六、通過dirtyRatio獲取要清理的partition日誌

cleanerManager.grabFilthiestCompactedLog方法中,在這裡,kafka會遍歷該broker上所有partition目錄,判斷這些partition是否可以清理,然後從可以清理的那些partition中找出dirtyRatio最高的日誌,開始清理。

//CleanerManager.scala
def grabFilthiestCompactedLog(time: Time): Option[LogToClean] = {
    inLock(lock) {
      val now = time.milliseconds
      this.timeOfLastRun = now
      val lastClean = allCleanerCheckpoints
      val dirtyLogs = logs.filter {
          //判斷這個partition log是否可以清理
        case (_, log) => log.config.compact  // match logs that are marked as compacted
      }.filterNot {
          //可能其他執行緒在清理這個partition log了
        case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress
      }.map {
        case (topicPartition, log) => // create a LogToClean instance for each
          //獲取可清理部分的第一條offset和不可清理部分的第一條offset
          val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicPartition,
            lastClean, now)
          LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)
      }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
      this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
      // 獲取dirtyRatio最高的partiton log
      val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
      if(cleanableLogs.isEmpty) {
        None
      } else {
        val filthiest = cleanableLogs.max
        inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
        Some(filthiest)
      }
    }
  }

  def cleanableOffsets(log: Log, topicPartition: TopicPartition, lastClean: immutable.Map[TopicPartition, Long], now: Long): (Long, Long) = {
    val lastCleanOffset: Option[Long] = lastClean.get(topicPartition)

    // 找出之前清理到哪個offset了,從而找到未清理部分的第一條offset
    val logStartOffset = log.logSegments.head.baseOffset
    val firstDirtyOffset = {
      val offset = lastCleanOffset.getOrElse(logStartOffset)
      if (offset < logStartOffset) {
        // don't bother with the warning if compact and delete are enabled.
        if (!isCompactAndDelete(log))
          warn(s"Resetting first dirty offset to log start offset $logStartOffset since the checkpointed offset $offset is invalid.")
        logStartOffset
      } else {
        offset
      }
    }

    // 先把active segment排除出去
    val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)

    val compactionLagMs = math.max(log.config.compactionLagMs, 0L)

    //找出不可清理部分的第一條offset,其中active segment
      //再通過compactionLagMs過濾掉那些不能清理的segment
    val firstUncleanableDirtyOffset: Long = Seq (

        Option(log.activeSegment.baseOffset),
        if (compactionLagMs > 0) {
          dirtyNonActiveSegments.find {
            s =>
              val isUncleanable = s.largestTimestamp > now - compactionLagMs
              isUncleanable
          } map(_.baseOffset)
        } else None
      ).flatten.min

    (firstDirtyOffset, firstUncleanableDirtyOffset)
  }

注意以下幾點:

  1. 是否開啟topic的日誌清理機制和broker的log.cleanup.policy有關。這個配置的預設值是[delete],也就是沒有開啟。但是並不是所有的partition log都會根據這個配置來判斷是否開啟日誌清理。因為每個topic在建立的時候,也會指定是否開啟日誌清理(會覆蓋broker的那個配置)。所以需要遍歷所有的partiton,排除掉那些不用清理的partition。
  2. dirtyRatio的計算規則為dirtyRatio = dirtyBytes / (cleanBytes + dirtyBytes)。其中dirtyBytes表示可清理部分的日誌大小,cleanBytes表示已清理部分的日誌大小。