1. 程式人生 > 實用技巧 >kafka學習筆記(四)kafka的日誌模組

kafka學習筆記(四)kafka的日誌模組

概述

日誌段及其相關程式碼是 Kafka 伺服器原始碼中最為重要的元件程式碼之一。你可能會非常關心,在 Kafka 中,訊息是如何被儲存和組織在一起的。畢竟,不管是學習任何訊息引擎,弄明白訊息建模方式都是首要的問題。因此,你非常有必要學習日誌段這個重要的子模組的原始碼實現。今天,我會帶你詳細看下日誌段部分的原始碼。不過在此之前,你需要先了解一下 Kafka 的日誌結構日誌是 Kafka 伺服器端程式碼的重要元件之一,很多其他的核心元件都是以日誌為基礎的,比如後面要講到的狀態管理機和副本管理器等。

總的來說,Kafka 日誌物件由多個日誌段物件組成,而每個日誌段物件會在磁碟上建立一組檔案,包括訊息日誌檔案(.log)、位移索引檔案(.index)、時間戳索引檔案(.timeindex)以及已中止(Aborted)事務的索引檔案(.txnindex)。當然,如果你沒有使用 Kafka 事務,已中止事務的索引檔案是不會被創建出來的。圖中的一串數字 0 是該日誌段的起始位移值(Base Offset),也就是該日誌段中所存的第一條訊息的位移值。

日誌段核心程式碼

日誌段原始碼位於 Kafka 的 core 工程下,具體檔案位置是 core/src/main/scala/kafka/log/LogSegment.scala。實際上,所有日誌結構部分的原始碼都在 core 的 kafka.log 包下。該檔案下定義了三個 Scala 物件:LogSegment class;LogSegment object;LogFlushStats object。LogFlushStats 結尾有個 Stats,它是做統計用的,主要負責為日誌落盤進行計時。每個日誌段由兩個核心元件構成:日誌和索引。當然,這裡的索引泛指廣義的索引檔案。另外,這段註釋還給出了一個重要的事實:每個日誌段都有一個起始位移值(Base Offset),而該位移值是此日誌段所有訊息中最小的位移值,同時,該值卻又比前面任何日誌段中訊息的位移值都大。

下面,我分批次給出比較關鍵的程式碼片段,並對其進行解釋。首先,我們看下 LogSegment 的定義:

1 class LogSegment private[log] (val log: FileRecords,
2                                val lazyOffsetIndex: LazyIndex[OffsetIndex],
3                                val lazyTimeIndex: LazyIndex[TimeIndex],
4                                val txnIndex: TransactionIndex,
5 val baseOffset: Long, 6 val indexIntervalBytes: Int, 7 val rollJitterMs: Long, 8 val time: Time) extends Logging { … }

就像我前面說的,一個日誌段包含訊息日誌檔案、位移索引檔案、時間戳索引檔案、已中止事務索引檔案等。這裡的 FileRecords 就是實際儲存 Kafka 訊息的物件。專欄後面我將專門討論 Kafka 是如何儲存具體訊息的,也就是 FileRecords 及其家族的實現方式。同時,我還會給你介紹一下社群在持久化訊息這塊是怎麼演進的,你一定不要錯過那部分的內容。下面的 lazyOffsetIndex、lazyTimeIndex 和 txnIndex 分別對應於剛才所說的 3 個索引檔案。不過,在實現方式上,前兩種使用了延遲初始化的原理,降低了初始化時間成本。後面我們在談到索引的時候再詳細說。

每個日誌段物件儲存自己的起始位移 baseOffset——這是非常重要的屬性!事實上,你在磁碟上看到的檔名就是 baseOffset 的值。每個 LogSegment 物件例項一旦被建立,它的起始位移就是固定的了,不能再被更改。

對於一個日誌段而言,最重要的方法就是寫入訊息和讀取訊息了,它們分別對應著原始碼中的 append 方法和 read 方法。另外,recover 方法同樣很關鍵,它是 Broker 重啟後恢復日誌段的操作邏輯。

append 方法

我們先來看 append 方法,瞭解下寫入訊息的具體操作。append 方法接收 4 個引數,分別表示待寫入訊息批次中訊息的最大位移值、最大時間戳、最大時間戳對應訊息的位移以及真正要寫入的訊息集合。下面這張圖展示了 append 方法的完整執行流程:

第一步:在原始碼中,首先呼叫 log.sizeInBytes 方法判斷該日誌段是否為空,如果是空的話, Kafka 需要記錄要寫入訊息集合的最大時間戳,並將其作為後面新增日誌段倒計時的依據。

第二步:程式碼呼叫 ensureOffsetInRange 方法確保輸入引數最大位移值是合法的。那怎麼判斷是不是合法呢?標準就是看它與日誌段起始位移的差值是否在整數範圍內,即 largestOffset - baseOffset 的值是不是介於 [0,Int.MAXVALUE] 之間。在極個別的情況下,這個差值可能會越界,這時,append 方法就會丟擲異常,阻止後續的訊息寫入。一旦你碰到這個問題,你需要做的是升級你的 Kafka 版本,因為這是由已知的 Bug 導致的。

第三步:待這些做完之後,append 方法呼叫 FileRecords 的 append 方法執行真正的寫入。前面說過了,專欄後面我們會詳細介紹 FileRecords 類。這裡你只需要知道它的工作是將記憶體中的訊息物件寫入到作業系統的頁快取就可以了。

第四步:再下一步,就是更新日誌段的最大時間戳以及最大時間戳所屬訊息的位移值屬性。每個日誌段都要儲存當前最大時間戳資訊和所屬訊息的位移資訊。還記得 Broker 端提供定期刪除日誌的功能嗎?比如我只想保留最近 7 天的日誌,沒錯,當前最大時間戳這個值就是判斷的依據;而最大時間戳對應的訊息的位移值則用於時間戳索引項。雖然後面我會詳細介紹,這裡我還是稍微提一下:時間戳索引項儲存時間戳與訊息位移的對應關係。在這步操作中,Kafka 會更新並儲存這組對應關係。

第五步:append 方法的最後一步就是更新索引項和寫入的位元組數了。我在前面說過,日誌段每寫入 4KB 資料就要寫入一個索引項。當已寫入位元組數超過了 4KB 之後,append 方法會呼叫索引物件的 append 方法新增索引項,同時清空已寫入位元組數,以備下次重新累積計算。

read 方法

好了,append 方法我就解釋完了。下面我們來看 read 方法,瞭解下讀取日誌段的具體操作。

read 方法接收 4 個輸入引數。startOffset:要讀取的第一條訊息的位移;maxSize:能讀取的最大位元組數;maxPosition :能讀到的最大檔案位置;minOneMessage:是否允許在訊息體過大時至少返回第一條訊息。前 3 個引數的含義很好理解,我重點說下第 4 個。當這個引數為 true 時,即使出現訊息體位元組數超過了 maxSize 的情形,read 方法依然能返回至少一條訊息。引入這個引數主要是為了確保不出現消費餓死的情況。

邏輯很簡單,我們一步步來看下。第一步是呼叫 translateOffset 方法定位要讀取的起始檔案位置 (startPosition)。輸入引數 startOffset 僅僅是位移值,Kafka 需要根據索引資訊找到對應的物理檔案位置才能開始讀取訊息。待確定了讀取起始位置,日誌段程式碼需要根據這部分資訊以及 maxSize 和 maxPosition 引數共同計算要讀取的總位元組數。舉個例子,假設 maxSize=100,maxPosition=300,startPosition=250,那麼 read 方法只能讀取 50 位元組,因為 maxPosition - startPosition = 50。我們把它和 maxSize 引數相比較,其中的最小值就是最終能夠讀取的總位元組數。最後一步是呼叫 FileRecords 的 slice 方法,從指定位置讀取指定大小的訊息集合。

recover 方法

除了 append 和 read 方法,LogSegment 還有一個重要的方法需要我們關注,它就是 recover 方法,用於恢復日誌段。下面的程式碼是 recover 方法原始碼。什麼是恢復日誌段呢?其實就是說, Broker 在啟動時會從磁碟上載入所有日誌段資訊到記憶體中,並建立相應的 LogSegment 物件例項。在這個過程中,它需要執行一系列的操作。

recover 開始時,程式碼依次呼叫索引物件的 reset 方法清空所有的索引檔案,之後會開始遍歷日誌段中的所有訊息集合或訊息批次(RecordBatch)。對於讀取到的每個訊息集合,日誌段必須要確保它們是合法的,這主要體現在兩個方面:該集合中的訊息必須要符合 Kafka 定義的二進位制格式;該集合中最後一條訊息的位移值不能越界,即它與日誌段起始位移的差值必須是一個正整數值。

校驗完訊息集合之後,程式碼會更新遍歷過程中觀測到的最大時間戳以及所屬訊息的位移值。同樣,這兩個資料用於後續構建索引項。再之後就是不斷累加當前已讀取的訊息位元組數,並根據該值有條件地寫入索引項。最後是更新事務型 Producer 的狀態以及 Leader Epoch 快取。不過,這兩個並不是理解 Kafka 日誌結構所必需的元件,因此,我們可以忽略它們。遍歷執行完成後,Kafka 會將日誌段當前總位元組數和剛剛累加的已讀取位元組數進行比較,如果發現前者比後者大,說明日誌段寫入了一些非法訊息,需要執行截斷操作,將日誌段大小調整回合法的數值。同時, Kafka 還必須相應地調整索引檔案的大小。把這些都做完之後,日誌段恢復的操作也就宣告結束了。

日誌Log基礎知識

你可以認為,日誌是日誌段的容器,裡面定義了很多管理日誌段的操作。坦率地說,如果看 Kafka 原始碼卻不看 Log,就跟你買了這門課卻不知道作者是誰一樣。在我看來,Log 物件是 Kafka 原始碼(特別是 Broker 端)最核心的部分,沒有之一。

Log 原始碼結構

Log 原始碼位於 Kafka core 工程的 log 原始碼包下,檔名是 Log.scala。總體上,該檔案定義了 10 個類和物件,如下圖所示:

圖中括號裡的 C 表示 Class,O 表示 Object。還記得我在上節課提到過的伴生物件嗎?沒錯,同時定義同名的 Class 和 Object,就屬於 Scala 中的伴生物件用法。我們先來看伴生物件,也就是 LogAppendInfo、Log 和 RollParams。

LogAppendInfo

LogAppendInfo(C):儲存了一組待寫入訊息的各種元資料資訊。比如,這組訊息中第一條訊息的位移值是多少、最後一條訊息的位移值是多少;再比如,這組訊息中最大的訊息時間戳又是多少。總之,這裡面的資料非常豐富(下節課我再具體說說)。LogAppendInfo(O): 可以理解為其對應伴生類的工廠方法類,裡面定義了一些工廠方法,用於建立特定的 LogAppendInfo 例項。

Log

Log(C): Log 原始碼中最核心的程式碼。這裡我先賣個關子,一會兒細聊。Log(O):同理,Log 伴生類的工廠方法,定義了很多常量以及一些輔助方法。

RollParams

RollParams(C):定義用於控制日誌段是否切分(Roll)的資料結構。

RollParams(O):同理,RollParams 伴生類的工廠方法。

除了這 3 組伴生物件之外,還有 4 類原始碼。LogMetricNames:定義了 Log 物件的監控指標。LogOffsetSnapshot:封裝分割槽所有位移元資料的容器類。LogReadInfo:封裝讀取日誌返回的資料及其元資料。CompletedTxn:記錄已完成事務的元資料,主要用於構建事務索引。

Log Class & Object

下面,我會按照這些類和物件的重要程度,對它們一一進行拆解。首先,咱們先說說 Log 類及其伴生物件。考慮到伴生物件多用於儲存靜態變數和靜態方法(比如靜態工廠方法等),因此我們先看伴生物件(即 Log Object)的實現。

 1 object Log {
 2   val LogFileSuffix = ".log"
 3   val IndexFileSuffix = ".index"
 4   val TimeIndexFileSuffix = ".timeindex"
 5   val ProducerSnapshotFileSuffix = ".snapshot"
 6   val TxnIndexFileSuffix = ".txnindex"
 7   val DeletedFileSuffix = ".deleted"
 8   val CleanedFileSuffix = ".cleaned"
 9   val SwapFileSuffix = ".swap"
10   val CleanShutdownFile = ".kafka_cleanshutdown"
11   val DeleteDirSuffix = "-delete"
12   val FutureDirSuffix = "-future"
13 ……
14 }

這是 Log Object 定義的所有常量。如果有面試官問你 Kafka 中定義了多少種檔案型別,你可以自豪地把這些說出來。耳熟能詳的.log、.index、.timeindex 和.txnindex 我就不解釋了,我們來了解下其他幾種檔案型別。

.snapshot 是 Kafka 為冪等型或事務型 Producer 所做的快照檔案。鑑於我們現在還處於閱讀原始碼的初級階段,事務或冪等部分的原始碼我就不詳細展開講了。

.deleted 是刪除日誌段操作建立的檔案。目前刪除日誌段檔案是非同步操作,Broker 端把日誌段檔案從.log 字尾修改為.deleted 字尾。如果你看到一大堆.deleted 字尾的檔名,別慌,這是 Kafka 在執行日誌段檔案刪除。

.cleaned 和.swap 都是 Compaction 操作的產物,等我們講到 Cleaner 的時候再說。

-delete 則是應用於資料夾的。當你刪除一個主題的時候,主題的分割槽資料夾會被加上這個字尾。

-future 是用於變更主題分割槽資料夾地址的,屬於比較高階的用法。

總之,記住這些常量吧。記住它們的主要作用是,以後不要被面試官唬住!開玩笑,其實這些常量最重要的地方就在於,它們能夠讓你瞭解 Kafka 定義的各種檔案型別。Log Object 還定義了超多的工具類方法。由於它們都很簡單,這裡我只給出一個方法的原始碼,我們一起讀一下。

1 def filenamePrefixFromOffset(offset: Long): String = {
2     val nf = NumberFormat.getInstance()
3     nf.setMinimumIntegerDigits(20)
4     nf.setMaximumFractionDigits(0)
5     nf.setGroupingUsed(false)
6     nf.format(offset)
7   }

這個方法的作用是通過給定的位移值計算出對應的日誌段檔名。Kafka 日誌檔案固定是 20 位的長度,filenamePrefixFromOffset 方法就是用前面補 0 的方式,把給定位移值擴充成一個固定 20 位長度的字串。

下面我們來看 Log 原始碼部分的重頭戲:Log 類。這是一個 2000 多行的大類。放眼整個 Kafka 原始碼,像 Log 這麼大的類也不多見,足見它的重要程度。我們先來看這個類的定義:

 1 class Log(@volatile var dir: File,
 2           @volatile var config: LogConfig,
 3           @volatile var logStartOffset: Long,
 4           @volatile var recoveryPoint: Long,
 5           scheduler: Scheduler,
 6           brokerTopicStats: BrokerTopicStats,
 7           val time: Time,
 8           val maxProducerIdExpirationMs: Int,
 9           val producerIdExpirationCheckIntervalMs: Int,
10           val topicPartition: TopicPartition,
11           val producerStateManager: ProducerStateManager,
12           logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
13 ……
14 }

看著好像有很多屬性,但其實,你只需要記住兩個屬性的作用就夠了:dir 和 logStartOffset。dir 就是這個日誌所在的資料夾路徑,也就是主題分割槽的路徑。而 logStartOffset,表示日誌的當前最早位移。dir 和 logStartOffset 都是 volatile var 型別,表示它們的值是變動的,而且可能被多個執行緒更新。你可能聽過日誌的當前末端位移,也就是 Log End Offset(LEO),它是表示日誌下一條待插入訊息的位移值,而這個 Log Start Offset 是跟它相反的,它表示日誌當前對外可見的最早一條訊息的位移值。我用一張圖來標識它們的區別:

有意思的是,Log End Offset 可以簡稱為 LEO,但 Log Start Offset 卻不能簡稱為 LSO。因為在 Kafka 中,LSO 特指 Log Stable Offset,屬於 Kafka 事務的概念。

其實,除了 Log 類簽名定義的這些屬性之外,Log 類還定義了一些很重要的屬性,比如下面這段程式碼:

1     @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
2     @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
3     private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
4     @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None

第一個屬性 nextOffsetMetadata,它封裝了下一條待插入訊息的位移值,你基本上可以把這個屬性和 LEO 等同起來。

第二個屬性 highWatermarkMetadata,是分割槽日誌高水位值。

第三個屬性 segments,我認為這是 Log 類中最重要的屬性。它儲存了分割槽日誌下所有的日誌段資訊,只不過是用 Map 的資料結構來儲存的。Map 的 Key 值是日誌段的起始位移值,Value 則是日誌段物件本身。Kafka 原始碼使用 ConcurrentNavigableMap 資料結構來儲存日誌段物件,就可以很輕鬆地利用該類提供的執行緒安全和各種支援排序的方法,來管理所有日誌段物件。

第四個屬性是 Leader Epoch Cache 物件。Leader Epoch 是社群於 0.11.0.0 版本引入原始碼中的,主要是用來判斷出現 Failure 時是否執行日誌截斷操作(Truncation)。之前靠高水位來判斷的機制,可能會造成副本間資料不一致的情形。這裡的 Leader Epoch Cache 是一個快取類資料,裡面儲存了分割槽 Leader 的 Epoch 值與對應位移值的對映關係,我建議你檢視下 LeaderEpochFileCache 類,深入地瞭解下它的實現原理.

LOG物件基礎操作

我一般習慣把 Log 的常見操作分為 4 大部分。高水位管理操作:高水位的概念在 Kafka 中舉足輕重,對它的管理,是 Log 最重要的功能之一。日誌段管理:Log 是日誌段的容器。高效組織與管理其下轄的所有日誌段物件,是原始碼要解決的核心問題。關鍵位移值管理:日誌定義了很多重要的位移值,比如 Log Start Offset 和 LEO 等。確保這些位移值的正確性,是構建訊息引擎一致性的基礎。讀寫操作:所謂的操作日誌,大體上就是指讀寫日誌。讀寫操作的作用之大,不言而喻。

高水位管理操作

原始碼中日誌物件定義高水位的語句只有一行:

@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)

這行語句傳達了兩個重要的事實:高水位值是 volatile(易變型)的。因為多個執行緒可能同時讀取它,因此需要設定成 volatile,保證記憶體可見性。另外,由於高水位值可能被多個執行緒同時修改,因此原始碼使用 Java Monitor 鎖來確保併發修改的執行緒安全。高水位值的初始值是 Log Start Offset 值。每個 Log 物件都會維護一個 Log Start Offset 值。當首次構建高水位時,它會被賦值成 Log Start Offset 值。你可能會關心 LogOffsetMetadata 是什麼物件。因為它比較重要,我們一起來看下這個類的定義:

1 case class LogOffsetMetadata(messageOffset: Long,
2                              segmentBaseOffset: Long = Log.UnknownOffset, relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition)

顯然,它就是一個 POJO 類,裡面儲存了三個重要的變數。

messageOffset:訊息位移值,這是最重要的資訊。我們總說高水位值,其實指的就是這個變數的值。

segmentBaseOffset:儲存該位移值所在日誌段的起始位移。日誌段起始位移值輔助計算兩條訊息在物理磁碟檔案中位置的差值,即兩條訊息彼此隔了多少位元組。這個計算有個前提條件,即兩條訊息必須處在同一個日誌段物件上,不能跨日誌段物件。否則它們就位於不同的物理檔案上,計算這個值就沒有意義了。這裡的 segmentBaseOffset,就是用來判斷兩條訊息是否處於同一個日誌段的。

relativePositionSegment:儲存該位移值所在日誌段的物理磁碟位置。這個欄位在計算兩個位移值之間的物理磁碟位置差值時非常有用。你可以想一想,Kafka 什麼時候需要計算位置之間的位元組數呢?答案就是在讀取日誌的時候。假設每次讀取時只能讀 1MB 的資料,那麼,原始碼肯定需要關心兩個位移之間所有訊息的總位元組數是否超過了 1MB。

LogOffsetMetadata 類的所有方法,都是圍繞這 3 個變數展開的工具輔助類方法,非常容易理解。我會給出一個方法的詳細解釋,剩下的你可以舉一反三。

1 def onSameSegment(that: LogOffsetMetadata): Boolean = {
2     if (messageOffsetOnly)
3       throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")
4 
5     this.segmentBaseOffset == that.segmentBaseOffset
6   }

看名字我們就知道了,這個方法就是用來判斷給定的兩個 LogOffsetMetadata 物件是否處於同一個日誌段的。判斷方法很簡單,就是比較兩個 LogOffsetMetadata 物件的 segmentBaseOffset 值是否相等。

獲取和設定高水位值,關於獲取高水位值的方法,其實很好理解,我就不多說了。設定高水位值的方法,也就是 Setter 方法更復雜一些,為了方便你理解,我用註釋的方式來解析它的作用。

 1 // getter method:讀取高水位的位移值
 2 def highWatermark: Long = highWatermarkMetadata.messageOffset
 3 
 4 // setter method:設定高水位值
 5 private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
 6     if (newHighWatermark.messageOffset < 0) // 高水位值不能是負數
 7       throw new IllegalArgumentException("High watermark offset should be non-negative")
 8 
 9     lock synchronized { // 保護Log物件修改的Monitor鎖
10       highWatermarkMetadata = newHighWatermark // 賦值新的高水位值
11       producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset) // 處理事務狀態管理器的高水位值更新邏輯,忽略它……
12       maybeIncrementFirstUnstableOffset() // First Unstable Offset是Kafka事務機制的一部分,忽略它……
13     }
14     trace(s"Setting high watermark $newHighWatermark")
15   }

更新高水位值,除此之外,原始碼還定義了兩個更新高水位值的方法:updateHighWatermark 和 maybeIncrementHighWatermark。從名字上來看,前者是一定要更新高水位值的,而後者是可能會更新也可能不會。我們分別看下它們的實現原理。

其實,這兩個方法有著不同的用途。updateHighWatermark 方法,主要用在 Follower 副本從 Leader 副本獲取到訊息後更新高水位值。一旦拿到新的訊息,就必須要更新高水位值;而 maybeIncrementHighWatermark 方法,主要是用來更新 Leader 副本的高水位值。需要注意的是,Leader 副本高水位值的更新是有條件的——某些情況下會更新高水位值,某些情況下可能不會。就像我剛才說的,Follower 副本成功拉取 Leader 副本的訊息後必須更新高水位值,但 Producer 端向 Leader 副本寫入訊息時,分割槽的高水位值就可能不需要更新——因為它可能需要等待其他 Follower 副本同步的進度。因此,原始碼中定義了兩個更新的方法,它們分別應用於不同的場景。

讀取高水位值關於高水位值管理的最後一個操作是 fetchHighWatermarkMetadata 方法。它不僅僅是獲取高水位值,還要獲取高水位的其他元資料資訊,即日誌段起始位移和物理位置資訊。

日誌段管理

前面我反覆說過,日誌是日誌段的容器,那它究竟是如何承擔起容器一職的呢?

1 private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

可以看到,原始碼使用 Java 的 ConcurrentSkipListMap 類來儲存所有日誌段物件。ConcurrentSkipListMap 有 2 個明顯的優勢。它是執行緒安全的,這樣 Kafka 原始碼不需要自行確保日誌段操作過程中的執行緒安全;它是鍵值(Key)可排序的 Map。Kafka 將每個日誌段的起始位移值作為 Key,這樣一來,我們就能夠很方便地根據所有日誌段的起始位移值對它們進行排序和比較,同時還能快速地找到與給定位移值相近的前後兩個日誌段。

關鍵位移值管理

Log 物件維護了一些關鍵位移值資料,比如 Log Start Offset、LEO 等。其實,高水位值也算是關鍵位移值,只不過它太重要了,所以,我單獨把它拎出來作為獨立的一部分來講了。

Log 物件中的 LEO 永遠指向下一條待插入訊息,也就是說,LEO 值上面是沒有訊息的!原始碼中定義 LEO 的語句很簡單:這裡的 nextOffsetMetadata 就是我們所說的 LEO,它也是 LogOffsetMetadata 型別的物件。Log 物件初始化的時候,原始碼會載入所有日誌段物件,並由此計算出當前 Log 的下一條訊息位移值。之後,Log 物件將此位移值賦值給 LEO。

實際上,LEO 物件被更新的時機有 4 個。Log 物件初始化時:當 Log 物件初始化時,我們必須要建立一個 LEO 物件,並對其進行初始化。寫入新訊息時:這個最容易理解。以上面的圖為例,當不斷向 Log 物件插入新訊息時,LEO 值就像一個指標一樣,需要不停地向右移動,也就是不斷地增加。Log 物件發生日誌切分(Log Roll)時:日誌切分是啥呢?其實就是建立一個全新的日誌段物件,並且關閉當前寫入的日誌段物件。這通常發生在當前日誌段物件已滿的時候。一旦發生日誌切分,說明 Log 物件切換了 Active Segment,那麼,LEO 中的起始位移值和段大小資料都要被更新,因此,在進行這一步操作時,我們必須要更新 LEO 物件。日誌截斷(Log Truncation)時:這個也是顯而易見的。日誌中的部分訊息被刪除了,自然可能導致 LEO 值發生變化,從而要更新 LEO 物件。

現在,我們再來思考一下,Kafka 什麼時候需要更新 Log Start Offset 呢?我們一一來看下。Log 物件初始化時:和 LEO 類似,Log 物件初始化時要給 Log Start Offset 賦值,一般是將第一個日誌段的起始位移值賦值給它。日誌截斷時:同理,一旦日誌中的部分訊息被刪除,可能會導致 Log Start Offset 發生變化,因此有必要更新該值。Follower 副本同步時:一旦 Leader 副本的 Log 物件的 Log Start Offset 值發生變化。為了維持和 Leader 副本的一致性,Follower 副本也需要嘗試去更新該值。刪除日誌段時:這個和日誌截斷是類似的。凡是涉及訊息刪除的操作都有可能導致 Log Start Offset 值的變化。刪除訊息時:嚴格來說,這個更新時機有點本末倒置了。在 Kafka 中,刪除訊息就是通過抬高 Log Start Offset 值來實現的,因此,刪除訊息時必須要更新該值。

讀寫操作

最後,我重點說說針對 Log 物件的讀寫操作。

寫操作

在 Log 中,涉及寫操作的方法有 3 個:appendAsLeader、appendAsFollower 和 append。appendAsLeader 是用於寫 Leader 副本的,appendAsFollower 是用於 Follower 副本同步的。它們的底層都呼叫了 append 方法。

Kafka 訊息格式經歷了兩次大的變遷,目前是 0.11.0.0 版本引入的 Version 2 訊息格式。我們沒有必要詳細瞭解這些格式的變遷,你只需要知道,在 0.11.0.0 版本之後,lastOffset 和 lastOffsetOfFirstBatch 都是指向訊息集合的最後一條訊息即可。它們的區別主要體現在 0.11.0.0 之前的版本。

讀操作

read 方法的流程相對要簡單一些,首先來看它的方法簽名:

1 def read(startOffset: Long,
2            maxLength: Int,
3            isolation: FetchIsolation,
4            minOneMessage: Boolean): FetchDataInfo = {
5            ......
6 }

它接收 4 個引數,含義如下:startOffset,即從 Log 物件的哪個位移值開始讀訊息。maxLength,即最多能讀取多少位元組。isolation,設定讀取隔離級別,主要控制能夠讀取的最大位移值,多用於 Kafka 事務。minOneMessage,即是否允許至少讀一條訊息。設想如果訊息很大,超過了 maxLength,正常情況下 read 方法永遠不會返回任何訊息。但如果設定了該引數為 true,read 方法就保證至少能夠返回一條訊息。read 方法的返回值是 FetchDataInfo 類,也是一個 POJO 類,裡面最重要的資料就是讀取的訊息集合,其他資料還包括位移等元資料資訊。

日誌中的索引應用

在 Kafka 原始碼中,跟索引相關的原始碼檔案有 5 個,它們都位於 core 包的 /src/main/scala/kafka/log 路徑下。我們一一來看下。AbstractIndex.scala:它定義了最頂層的抽象類,這個類封裝了所有索引型別的公共操作。LazyIndex.scala:它定義了 AbstractIndex 上的一個包裝類,實現索引項延遲載入。這個類主要是為了提高效能。OffsetIndex.scala:定義位移索引,儲存“< 位移值,檔案磁碟物理位置 >”對。TimeIndex.scala:定義時間戳索引,儲存“< 時間戳,位移值 >”對。TransactionIndex.scala:定義事務索引,為已中止事務(Aborted Transcation)儲存重要的元資料資訊。只有啟用 Kafka 事務後,這個索引才有可能出現。這些類的關係如下圖所示:

其中,OffsetIndex、TimeIndex 和 TransactionIndex 都繼承了 AbstractIndex 類,而上層的 LazyIndex 僅僅是包裝了一個 AbstractIndex 的實現類,用於延遲載入。就像我之前說的,LazyIndex 的作用是為了提升效能,並沒有什麼功能上的改進。

AbstractIndex 程式碼結構

AbstractIndex 定義了 4 個屬性欄位。由於是一個抽象基類,它的所有子類自動地繼承了這 4 個欄位。也就是說,Kafka 所有型別的索引物件都定義了這些屬性。我先給你解釋下這些屬性的含義。

索引檔案(file)。每個索引物件在磁碟上都對應了一個索引檔案。你可能注意到了,這個欄位是 var 型,說明它是可以被修改的。難道索引物件還能動態更換底層的索引檔案嗎?是的。自 1.1.0 版本之後,Kafka 允許遷移底層的日誌路徑,所以,索引檔案自然要是可以更換的。

起始位移值(baseOffset)。索引物件對應日誌段物件的起始位移值。舉個例子,如果你檢視 Kafka 日誌路徑的話,就會發現,日誌檔案和索引檔案都是成組出現的。比如說,如果日誌檔案是 00000000000000000123.log,正常情況下,一定還有一組索引檔案 00000000000000000123.index、00000000000000000123.timeindex 等。這裡的“123”就是這組檔案的起始位移值,也就是 baseOffset 值。

索引檔案最大位元組數(maxIndexSize)。它控制索引檔案的最大長度。Kafka 原始碼傳入該引數的值是 Broker 端引數 segment.index.bytes 的值,即 10MB。這就是在預設情況下,所有 Kafka 索引檔案大小都是 10MB 的原因。

索引檔案開啟方式(writable)。“True”表示以“讀寫”方式開啟,“False”表示以“只讀”方式開啟。如果我沒記錯的話,這個引數應該是我加上去的,就是為了修復我剛剛提到的那個 Bug。

AbstractIndex 是抽象的索引物件類。可以說,它是承載索引項的容器,而每個繼承它的子類負責定義具體的索引項結構。比如,OffsetIndex 的索引項是 < 位移值,物理磁碟位置 > 對,TimeIndex 的索引項是 < 時間戳,位移值 > 對。基於這樣的設計理念,AbstractIndex 類中定義了一個抽象方法 entrySize 來表示不同索引項的大小。

子類實現該方法時需要給定自己索引項的大小,對於 OffsetIndex 而言,該值就是 8;對於 TimeIndex 而言,該值是 12。說到這兒,你肯定會問,為什麼是 8 和 12 呢?我來解釋一下。在 OffsetIndex 中,位移值用 4 個位元組來表示,物理磁碟位置也用 4 個位元組來表示,所以總共是 8 個位元組。你可能會說,位移值不是長整型嗎,應該是 8 個位元組才對啊。還記得 AbstractIndex 已經儲存了 baseOffset 了嗎?這裡的位移值,實際上是相對於 baseOffset 的相對位移值,即真實位移值減去 baseOffset 的值。下節課我會給你重點講一下它,這裡你只需要知道使用相對位移值能夠有效地節省磁碟空間就行了。而 Broker 端引數 log.segment.bytes 是整型,這說明,Kafka 中每個日誌段檔案的大小不會超過 2^32,即 4GB,這就說明同一個日誌段檔案上的位移值減去 baseOffset 的差值一定在整數範圍內。因此,原始碼只需要 4 個位元組儲存就行了。同理,TimeIndex 中的時間戳型別是長整型,佔用 8 個位元組,位移依然使用相對位移值,佔用 4 個位元組,因此總共需要 12 個位元組。

如果有人問你,Kafka 中的索引底層的實現原理是什麼?你可以大聲地告訴他:記憶體對映檔案,即 Java 中的 MappedByteBuffer。使用記憶體對映檔案的主要優勢在於,它有很高的 I/O 效能,特別是對於索引這樣的小檔案來說,由於檔案記憶體被直接對映到一段虛擬記憶體上,訪問記憶體對映檔案的速度要快於普通的讀寫檔案速度。另外,在很多作業系統中(比如 Linux),這段對映的記憶體區域實際上就是核心的頁快取(Page Cache)。這就意味著,裡面的資料不需要重複拷貝到使用者態空間,避免了很多不必要的時間、空間消耗。在 AbstractIndex 中,這個 MappedByteBuffer 就是名為 mmap 的變數。

二分查詢演算法

到目前為止,從已排序陣列中尋找某個數字最快速的演算法就是二分查找了,它能做到 O(lgN) 的時間複雜度。Kafka 的索引元件就應用了二分查詢演算法。我先給出原版的實現演算法程式碼。

 1   private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
 2         // 第1步:如果當前索引為空,直接返回<-1,-1>對
 3         if(_entries == 0)
 4           return (-1, -1)
 5     
 6     
 7         // 第2步:要查詢的位移值不能小於當前最小位移值
 8         if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
 9           return (-1, 0)
10     
11     
12         // binary search for the entry
13         // 第3步:執行二分查詢演算法
14         var lo = 0
15         var hi = _entries - 1
16         while(lo < hi) {
17           val mid = ceil(hi/2.0 + lo/2.0).toInt
18           val found = parseEntry(idx, mid)
19           val compareResult = compareIndexEntry(found, target, searchEntity)
20           if(compareResult > 0)
21             hi = mid - 1
22           else if(compareResult < 0)
23             lo = mid
24           else
25             return (mid, mid)
26         }
27     
28     
29         (lo, if (lo == _entries - 1) -1 else lo + 1)

這段程式碼的核心是,第 3 步的二分查詢演算法。熟悉 Binary Search 的話,你對這段程式碼一定不會感到陌生。講到這裡,似乎一切很完美:Kafka 索引應用二分查詢演算法快速定位待查詢索引項位置,之後呼叫 parseEntry 來讀取索引項。不過,這真的就是無懈可擊的解決方案了嗎?

改進版二分查詢演算法

大多數作業系統使用頁快取來實現記憶體對映,而目前幾乎所有的作業系統都使用 LRU(Least Recently Used)或類似於 LRU 的機制來管理頁快取。Kafka 寫入索引檔案的方式是在檔案末尾追加寫入,而幾乎所有的索引查詢都集中在索引的尾部。這麼來看的話,LRU 機制是非常適合 Kafka 的索引訪問場景的。但,這裡有個問題是,當 Kafka 在查詢索引的時候,原版的二分查詢演算法並沒有考慮到快取的問題,因此很可能會導致一些不必要的缺頁中斷(Page Fault)。此時,Kafka 執行緒會被阻塞,等待對應的索引項從物理磁碟中讀出並放入到頁快取中。下面我舉個例子來說明一下這個情況。假設 Kafka 的某個索引佔用了作業系統頁快取 13 個頁(Page),如果待查詢的位移值位於最後一個頁上,也就是 Page 12,那麼標準的二分查詢演算法會依次讀取頁號 0、6、9、11 和 12。

通常來說,一個頁上儲存了成百上千的索引項資料。隨著索引檔案不斷被寫入,Page #12 不斷地被填充新的索引項。如果此時索引查詢方都來自 ISR 副本或 Lag 很小的消費者,那麼這些查詢大多集中在對 Page #12 的查詢,因此,Page #0、6、9、11、12 一定經常性地被原始碼訪問。也就是說,這些頁一定儲存在頁快取上。後面當新的索引項填滿了 Page #12,頁快取就會申請一個新的 Page 來儲存索引項,即 Page #13。現在,最新索引項儲存在 Page #13 中。如果要查詢最新索引項,原版二分查詢演算法將會依次訪問 Page #0、7、10、12 和 13。此時,問題來了:Page 7 和 10 已經很久沒有被訪問過了,它們大概率不在頁快取中,因此,一旦索引開始徵用 Page #13,就會發生 Page Fault,等待那些冷頁資料從磁碟中載入到頁快取。根據國外使用者的測試,這種載入過程可能長達 1 秒。

顯然,這是一個普遍的問題,即每當索引檔案佔用 Page 數發生變化時,就會強行變更二分查詢的搜尋路徑,從而出現不在頁快取的冷資料必須要載入到頁快取的情形,而這種載入過程是非常耗時的。基於這個問題,社群提出了改進版的二分查詢策略,也就是快取友好的搜尋演算法。總體的思路是,程式碼將所有索引項分成兩個部分:熱區(Warm Area)和冷區(Cold Area),然後分別在這兩個區域內執行二分查詢演算法。

乍一看,該演算法並沒有什麼高大上的改進,僅僅是把搜尋區域分成了冷、熱兩個區域,然後有條件地在不同區域執行普通的二分查詢演算法罷了。實際上,這個改進版演算法提供了一個重要的保證:它能保證那些經常需要被訪問的 Page 組合是固定的。想想剛才的例子,同樣是查詢最熱的那部分資料,一旦索引佔用了更多的 Page,要遍歷的 Page 組合就會發生變化。這是導致效能下降的主要原因。這個改進版演算法的最大好處在於,查詢最熱那部分資料所遍歷的 Page 永遠是固定的,因此大概率在頁快取中,從而避免無意義的 Page Fault。

位移索引和時間戳索引

Kafka 索引型別有三大類:位移索引、時間戳索引和已中止事務索引。相比於最後一類索引,前兩類索引的出鏡率更高一些。在 Kafka 的資料路徑下,你肯定看到過很多.index 和.timeindex 字尾的檔案。不知你是否有過這樣的疑問:“這些檔案是用來做什麼的呢?” 現在我可以明確告訴你:.index 檔案就是 Kafka 中的位移索引檔案,而.timeindex 檔案則是時間戳索引檔案。

位移索引

位移索引也就是所謂的 OffsetIndex,它可是一個老資歷的元件了。如果我沒記錯的話,國內大面積使用 Kafka 應該是在 0.8 時代。從那個時候開始,OffsetIndex 就已經存在了。每當 Consumer 需要從主題分割槽的某個位置開始讀取訊息時,Kafka 就會用到 OffsetIndex 直接定位物理檔案位置,從而避免了因為從頭讀取訊息而引入的昂貴的 I/O 操作。不同索引型別儲存不同的<key, value="">對。就 OffsetIndex 而言,Key 就是訊息的相對位移,Value 是儲存該訊息的日誌段檔案中該訊息第一個位元組的物理檔案位置。

為什麼是 8 呢?相對位移是一個整型(Integer),佔用 4 個位元組,物理檔案位置也是一個整型,同樣佔用 4 個位元組,因此總共是 8 個位元組。那相對位移是什麼值呢?我們知道,Kafka 中的訊息位移值是一個長整型(Long),應該佔用 8 個位元組才對。在儲存 OffsetIndex 的<key, value="">對時,Kafka 做了一些優化。每個 OffsetIndex 物件在建立時,都已經儲存了對應日誌段物件的起始位移,因此,OffsetIndex 索引項沒必要儲存完整的 8 位元組位移值。相反地,它只需要儲存與起始位移的差值(Delta)就夠了,而這個差值是可以被整型容納的。這種設計可以讓 OffsetIndex 每個索引項都節省 4 個位元組。

當讀取 OffsetIndex 時,原始碼還需要將相對位移值還原成之前的完整位移。這個是在 parseEntry 方法中實現的。

這個方法返回一個 OffsetPosition 型別。該類有兩個方法,分別返回索引項的 Key 和 Value。這裡的 parseEntry 方法,就是要構造 OffsetPosition 所需的 Key 和 Value。Key 是索引項中的完整位移值,程式碼使用 baseOffset + relativeOffset(buffer, n) 的方式將相對位移值還原成完整位移值;Value 是這個位移值上訊息在日誌段檔案中的物理位置,程式碼呼叫 physical 方法計算這個物理位置並把它作為 Value。最後,parseEntry 方法把 Key 和 Value 封裝到一個 OffsetPosition 例項中,然後將這個例項返回。

寫入索引項好了,有了這些基礎,下面的內容就很容易理解了。我們來看下 OffsetIndex 中最重要的操作——寫入索引項 append 方法的實現。

 1 def append(offset: Long, position: Int): Unit = {
 2   inLock(lock) {
 3     // 索引檔案如果已經寫滿,直接丟擲異常
 4     require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
 5     // 要保證待寫入的位移值offset比當前索引檔案中所有現存的位移值都要大
 6     // 這主要是為了維護索引的單調增加性
 7     if (_entries == 0 || offset > _lastOffset) {
 8       trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
 9       mmap.putInt(relativeOffset(offset)) // 向mmap寫入相對位移值
10       mmap.putInt(position) // 向mmap寫入物理檔案位置
11       _entries += 1 // 更新索引項個數
12       _lastOffset = offset // 更新當前索引檔案最大位移值
13       // 確保寫入索引項格式符合要求
14       require(_entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
15     } else {
16       throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
17         s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
18     }
19   }
20 }

append 方法接收兩個引數:Long 型的位移值和 Integer 型的物理檔案位置。該方法最重要的兩步,就是分別向 mmap 寫入相對位移值和物理檔案位置。除了 append 方法,索引還有一個常見的操作:截斷操作(Truncation)。截斷操作是指,將索引檔案內容直接裁剪掉一部分。比如,OffsetIndex 索引檔案中當前儲存了 100 個索引項,我想只保留最開始的 40 個索引項。

這個方法接收 entries 引數,表示要擷取到哪個槽,主要的邏輯實現是呼叫 mmap 的 position 方法。原始碼中的 _entries * entrySize 就是 mmap 要擷取到的位元組處。下面,我來說說 OffsetIndex 的使用方式。既然 OffsetIndex 被用來快速定位訊息所在的物理檔案位置,那麼必然需要定義一個方法執行對應的查詢邏輯。這個方法就是 lookup。

方法返回的,是不大於給定位移值 targetOffset 的最大位移值,以及對應的物理檔案位置。你大致可以把這個方法,理解為位移值的 FLOOR 函式。

時間戳索引

與 OffsetIndex 不同的是,TimeIndex 儲存的是 < 時間戳,相對位移值 > 對。時間戳需要一個長整型來儲存,相對位移值使用 Integer 來儲存。因此,TimeIndex 單個索引項需要佔用 12 個位元組。這也揭示了一個重要的事實:在儲存同等數量索引項的基礎上,TimeIndex 會比 OffsetIndex 佔用更多的磁碟空間。

寫入索引項TimeIndex 也有 append 方法,只不過它叫作 maybeAppend。我們來看下它的實現邏輯。

和 OffsetIndex 類似,向 TimeIndex 寫入索引項的主體邏輯,是向 mmap 分別寫入時間戳和相對位移值。只不過,除了校驗位移值的單調增加性之外,TimeIndex 還會確保順序寫入的時間戳也是單調增加的。

我帶你詳細分析了 OffsetIndex 和 TimeIndex,以及它們的不同之處。雖然 OffsetIndex 和 TimeIndex 是不同型別的索引,但 Kafka 內部是把二者結合使用的。通常的流程是,先使用 TimeIndex 尋找滿足時間戳要求的訊息位移值,然後再利用 OffsetIndex 去定位該位移值所在的物理檔案位置。因此,它們其實是合作的關係。最後,我還想提醒你一點:不要對索引檔案做任何修改!我碰到過因使用者擅自重新命名索引檔案,從而導致 Broker 崩潰無法啟動的場景。另外,雖然 Kafka 能夠重建索引,但是隨意地刪除索引檔案依然是一個很危險的操作。在生產環境中,我建議你儘量不要執行這樣的操作。

總結

以後關於kafka系列的總結大部分來自Geek Time的課件,大家可以自行關鍵字搜尋。