1. 程式人生 > 其它 >訊息佇列中介軟體 RocketMQ 原始碼分析 —— Message 儲存

訊息佇列中介軟體 RocketMQ 原始碼分析 —— Message 儲存

  • 1、概述
  • 2、CommitLog 結構
  • 3、CommitLog 儲存訊息
    • MappedFile#落盤
    • FlushRealTimeService
    • CommitRealTimeService
    • GroupCommitService
    • CommitLog#putMessage(...)
    • MappedFileQueue#getLastMappedFile(...)
    • MappedFile#appendMessage(...)
    • DefaultAppendMessageCallback#doAppend(...)
    • FlushCommitLogService

1、概述

本文接《RocketMQ 原始碼分析 —— Message 傳送與接收》。 主要解析 CommitLog

儲存訊息部分。

2、CommitLog 結構

CommitLogMappedFileQueueMappedFile 的關係如下:

CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N。

反應到系統檔案如下:

Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd/Users/yunai/store/commitlog
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -ltotal 10485760
-rw-r--r--  1 yunai  staff  1073741824  4 21 16:27 00000000000000000000
-rw-r--r--  1 yunai  staff  1073741824  4 21 16:29 00000000001073741824
-rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000002147483648
-rw-r--r--  1 yunai  staff  1073741824  4 21 16:33 00000000003221225472
-rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000004294967296

CommitLogMappedFileQueueMappedFile 的定義如下:

  • MappedFile :00000000000000000000、00000000001073741824、00000000002147483648等檔案。
  • MappedFileQueueMappedFile 所在的資料夾,對 MappedFile 進行封裝成檔案佇列,對上層提供可無限使用的檔案容量。
    • 每個 MappedFile 統一檔案大小。
    • 檔案命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 CommitLog 裡預設為 1GB。
  • CommitLog :針對 MappedFileQueue 的封裝使用。

CommitLog 目前儲存在 MappedFile 有兩種內容型別:

  1. MESSAGE :訊息。
  2. BLANK :檔案不足以儲存訊息時的空白佔位。

CommitLog 儲存在 MappedFile的結構:

MESSAGE[1]

MESSAGE[2]

...

MESSAGE[n - 1]

MESSAGE[n]

BLANK

MESSAGECommitLog 儲存結構:

第幾位

欄位

說明

資料型別

位元組數

1

MsgLen

訊息總長度

Int

4

2

MagicCode

MESSAGE_MAGIC_CODE

Int

4

3

BodyCRC

訊息內容CRC

Int

4

4

QueueId

訊息佇列編號

Int

4

5

Flag

flag

Int

4

6

QueueOffset

訊息佇列位置

Long

8

7

PhysicalOffset

物理位置。在 CommitLog 的順序儲存位置。

Long

8

8

SysFlag

MessageSysFlag

Int

4

9

BornTimestamp

生成訊息時間戳

Long

8

10

BornHost

生效訊息的地址+埠

Long

8

11

StoreTimestamp

儲存訊息時間戳

Long

8

12

StoreHost

儲存訊息的地址+埠

Long

8

13

ReconsumeTimes

重新消費訊息次數

Int

4

14

PreparedTransationOffset

Long

8

15

BodyLength + Body

內容長度 + 內容

Int + Bytes

4 + bodyLength

16

TopicLength + Topic

Topic長度 + Topic

Byte + Bytes

1 + topicLength

17

PropertiesLength + Properties

拓展欄位長度 + 拓展欄位

Short + Bytes

2 + PropertiesLength

BLANKCommitLog 儲存結構:

第幾位

欄位

說明

資料型別

位元組數

1

maxBlank

空白長度

Int

4

2

MagicCode

BLANK_MAGIC_CODE

Int

4

3、CommitLog 儲存訊息

CommitLog#putMessage(...)

// 省略程式碼
  • 說明 :儲存訊息,並返回儲存結果。
  • 第 2 行 :設定儲存時間等。
  • 第 16 至 36 行 :事務訊息相關,暫未了解。
  • 第 45 & 97 行 :獲取鎖與釋放鎖。
  • 第 52 行 :再次設定儲存時間。目前會有多處地方設定儲存時間。
  • 第 55 至 62 行 :獲取 MappedFile,若不存在或已滿,則進行建立。詳細解析見:MappedFileQueue#getLastMappedFile(...)。
  • 第 65 行 :插入訊息MappedFile,解析解析見:MappedFile#appendMessage(...)。
  • 第 69 至 80 行 :MappedFile 已滿,建立新的,再次插入訊息
  • 第 116 至 140 行 :訊息刷盤,即持久化到檔案。上面插入訊息實際未儲存到硬碟。此處,根據不同的刷盤策略,執行會有不同。詳細解析見:FlushCommitLogService。
  • 第 143 至 173 行 :Broker 主從同步。後面的文章會詳細解析?。

MappedFileQueue#getLastMappedFile(...)

// 省略程式碼
  • 說明 :獲取最後一個 MappedFile,若不存在或檔案已滿,則進行建立。
  • 第 5 至 11 行 :計算當檔案不存在或已滿時,新建立檔案的 createOffset
  • 第 14 行 :計算檔名。從此處我們可 以得知,MappedFile的檔案命名規則:
> fileName[n] = fileName[n - 1] + n * mappedFileSize
> fileName[0] = startOffset - (startOffset % this.mappedFileSize)

目前 `CommitLog` 的 `startOffset` 為 0。
此處有個**疑問**,為什麼需要 `(startOffset % this.mappedFileSize)`。例如:

| startOffset  | mappedFileSize | createOffset |
| --- | :-- | :-- |
| 5 | 1 | 5 |
| 5 | 2 | 4 |
| 5 | 3 | 3  |
| 5 | 4 | 4 |
| 5 | > 5 | 0 |

_如果有知道的同學,麻煩提示下。?_*解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 計算出來的是,以 `this.mappedFileSize` 為每個檔案大小時,`startOffset` 所在檔案的開始`offset`*
  • 第 30 至 35 行 :設定 MappedFile是否是第一個建立的檔案。該標識用於 ConsumeQueue 對應的 MappedFile ,詳見 ConsumeQueue#fillPreBlank

MappedFile#appendMessage(...)

 // 省略程式碼
  • 說明 :插入訊息MappedFile,並返回插入結果。
  • 第 8 行 :獲取需要寫入的位元組緩衝區。為什麼會有 writeBuffer != null 的判斷後,使用不同的位元組緩衝區,見:FlushCommitLogService。
  • 第 9 至 11 行 :設定寫入 position,執行寫入,更新 wrotePosition(當前寫入位置,下次開始寫入開始位置)。

DefaultAppendMessageCallback#doAppend(...)

// 省略程式碼
  • 說明 :插入訊息到位元組緩衝區。
  • 第 45 行 :計算物理位置。在 CommitLog 的順序儲存位置。
  • 第 47 至 49 行 :計算 CommitLog 裡的 offsetMsgId。這裡一定要和 msgId 區分開。

計算方式

長度

offsetMsgId

Broker儲存時生成

Hex(storeHostBytes, wroteOffset)

32

msgId

Client傳送訊息時生成

Hex(程序編號, IP, ClassLoader, startTime, currentTime, 自增序列)

32

《RocketMQ 原始碼分析 —— Message 基礎》

  • 第 51 至 61 行 :獲取佇列位置(offset)。
  • 第 78 至 95 行 :計算訊息總長度。
  • 第 98 至 112 行 :當檔案剩餘空間不足時,寫入 BLANK 佔位,返回結果。
  • 第 114 至 161 行 :寫入 MESSAGE
  • 第 173 行 :更新佇列位置(offset)。

FlushCommitLogService

執行緒服務

場景

插入訊息效能

CommitRealTimeService

非同步刷盤 && 開啟記憶體位元組緩衝區

第一

FlushRealTimeService

非同步刷盤 && 關閉記憶體位元組緩衝區

第二

GroupCommitService

同步刷盤

第三

MappedFile#落盤

方式

方式一

寫入記憶體位元組緩衝區(writeBuffer)

從記憶體位元組緩衝區(write buffer)提交(commit)到檔案通道(fileChannel)

檔案通道(fileChannel)flush

方式二

寫入對映檔案位元組緩衝區(mappedByteBuffer)

對映檔案位元組緩衝區(mappedByteBuffer)flush

flush相關程式碼

考慮到寫入效能,滿足 flushLeastPages * OS_PAGE_SIZE 才進行 flush

// 省略程式碼

commit相關程式碼:

考慮到寫入效能,滿足 commitLeastPages * OS_PAGE_SIZE 才進行 commit

// 省略程式碼

FlushRealTimeService

訊息插入成功時,非同步刷盤時使用。

// 省略程式碼 
  • 說明:實時 flush 執行緒服務,呼叫 MappedFile#flush 相關邏輯。
  • 第 23 至 29 行 :每 flushPhysicQueueThoroughInterval 週期,執行一次 flush 。因為不是每次迴圈到都能滿足 flushCommitLogLeastPages 大小,因此,需要一定週期進行一次強制 flush 。當然,不能每次迴圈都去執行強制 flush,這樣效能較差。
  • 第 33 行 至 37 行 :根據 flushCommitLogTimed 引數,可以選擇每次迴圈是固定週期還是等待喚醒。預設配置是後者,所以,每次插入訊息完成,會去呼叫 commitLogService.wakeup()
  • 第 45 行 :呼叫 MappedFile 進行 flush
  • 第 61 至 65 行 :Broker 關閉時,強制 flush,避免有未刷盤的資料。

CommitRealTimeService

訊息插入成功時,非同步刷盤時使用。 和 FlushRealTimeService 類似,效能更好。

// 省略程式碼

GroupCommitService

訊息插入成功時,同步刷盤時使用。

// 省略程式碼
  • 說明:批量寫入執行緒服務。
  • 第 16 至 25 行 :新增寫入請求。方法設定了 sync 的原因:this.requestsWrite 會和 this.requestsRead 不斷交換,無法保證穩定的同步。
  • 第 27 至 34 行 :讀寫佇列交換。
  • 第 38 至 60 行 :迴圈寫入佇列,進行 flush
    • 第 43 行 :考慮到有可能每次迴圈的訊息寫入的訊息,可能分佈在兩個 MappedFile(寫第N個訊息時,MappedFile 已滿,建立了一個新的),所以需要有迴圈2次。
    • 第 51 行 :喚醒等待寫入請求執行緒,通過 CountDownLatch 實現
  • 第 61 至 66 行 :直接刷盤。此處是由於傳送的訊息的 isWaitStoreMsgOK 未設定成 TRUE ,導致未走批量提交。
  • 第 73 至 80 行 :每 10ms 執行一次批量提交。當然,如果 wakeup() 時,則會立即進行一次批量提交。當 Broker 設定成同步落盤 && 訊息 isWaitStoreMsgOK=true,訊息需要略大於 10ms 才能傳送成功。當然,效能相對非同步落盤較差,可靠性更高,需要我們在實際使用時去取捨。