訊息佇列中介軟體 RocketMQ 原始碼分析 —— Message 儲存
- 1、概述
- 2、CommitLog 結構
- 3、CommitLog 儲存訊息
- MappedFile#落盤
- FlushRealTimeService
- CommitRealTimeService
- GroupCommitService
- CommitLog#putMessage(...)
- MappedFileQueue#getLastMappedFile(...)
- MappedFile#appendMessage(...)
- DefaultAppendMessageCallback#doAppend(...)
- FlushCommitLogService
1、概述
本文接《RocketMQ 原始碼分析 —— Message 傳送與接收》。
主要解析 CommitLog
2、CommitLog 結構
CommitLog
、MappedFileQueue
、MappedFile
的關係如下:
CommitLog
: MappedFileQueue
: MappedFile
= 1 : 1 : N。
反應到系統檔案如下:
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd/Users/yunai/store/commitlog Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -ltotal 10485760 -rw-r--r-- 1 yunai staff 1073741824 4 21 16:27 00000000000000000000 -rw-r--r-- 1 yunai staff 1073741824 4 21 16:29 00000000001073741824 -rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000002147483648 -rw-r--r-- 1 yunai staff 1073741824 4 21 16:33 00000000003221225472 -rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000004294967296
CommitLog
、MappedFileQueue
、MappedFile
的定義如下:
-
MappedFile
:00000000000000000000、00000000001073741824、00000000002147483648等檔案。 -
MappedFileQueue
:MappedFile
所在的資料夾,對MappedFile
進行封裝成檔案佇列,對上層提供可無限使用的檔案容量。- 每個
MappedFile
統一檔案大小。 - 檔案命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在
CommitLog
裡預設為 1GB。
- 每個
-
CommitLog
:針對MappedFileQueue
的封裝使用。
CommitLog
目前儲存在 MappedFile
有兩種內容型別:
- MESSAGE :訊息。
- BLANK :檔案不足以儲存訊息時的空白佔位。
CommitLog
儲存在 MappedFile
的結構:
MESSAGE[1] |
MESSAGE[2] |
... |
MESSAGE[n - 1] |
MESSAGE[n] |
BLANK |
---|
MESSAGE
在 CommitLog
儲存結構:
第幾位 |
欄位 |
說明 |
資料型別 |
位元組數 |
---|---|---|---|---|
1 |
MsgLen |
訊息總長度 |
Int |
4 |
2 |
MagicCode |
MESSAGE_MAGIC_CODE |
Int |
4 |
3 |
BodyCRC |
訊息內容CRC |
Int |
4 |
4 |
QueueId |
訊息佇列編號 |
Int |
4 |
5 |
Flag |
flag |
Int |
4 |
6 |
QueueOffset |
訊息佇列位置 |
Long |
8 |
7 |
PhysicalOffset |
物理位置。在 CommitLog 的順序儲存位置。 |
Long |
8 |
8 |
SysFlag |
MessageSysFlag |
Int |
4 |
9 |
BornTimestamp |
生成訊息時間戳 |
Long |
8 |
10 |
BornHost |
生效訊息的地址+埠 |
Long |
8 |
11 |
StoreTimestamp |
儲存訊息時間戳 |
Long |
8 |
12 |
StoreHost |
儲存訊息的地址+埠 |
Long |
8 |
13 |
ReconsumeTimes |
重新消費訊息次數 |
Int |
4 |
14 |
PreparedTransationOffset |
Long |
8 |
|
15 |
BodyLength + Body |
內容長度 + 內容 |
Int + Bytes |
4 + bodyLength |
16 |
TopicLength + Topic |
Topic長度 + Topic |
Byte + Bytes |
1 + topicLength |
17 |
PropertiesLength + Properties |
拓展欄位長度 + 拓展欄位 |
Short + Bytes |
2 + PropertiesLength |
BLANK
在 CommitLog
儲存結構:
第幾位 |
欄位 |
說明 |
資料型別 |
位元組數 |
---|---|---|---|---|
1 |
maxBlank |
空白長度 |
Int |
4 |
2 |
MagicCode |
BLANK_MAGIC_CODE |
Int |
4 |
3、CommitLog 儲存訊息
CommitLog#putMessage(...)
// 省略程式碼
- 說明 :儲存訊息,並返回儲存結果。
- 第 2 行 :設定儲存時間等。
- 第 16 至 36 行 :事務訊息相關,暫未了解。
- 第 45 & 97 行 :獲取鎖與釋放鎖。
- 第 52 行 :再次設定儲存時間。目前會有多處地方設定儲存時間。
- 第 55 至 62 行 :獲取
MappedFile
,若不存在或已滿,則進行建立。詳細解析見:MappedFileQueue#getLastMappedFile(...)。 - 第 65 行 :插入訊息到
MappedFile
,解析解析見:MappedFile#appendMessage(...)。 - 第 69 至 80 行 :
MappedFile
已滿,建立新的,再次插入訊息。 - 第 116 至 140 行 :訊息刷盤,即持久化到檔案。上面插入訊息實際未儲存到硬碟。此處,根據不同的刷盤策略,執行會有不同。詳細解析見:FlushCommitLogService。
- 第 143 至 173 行 :
Broker
主從同步。後面的文章會詳細解析?。
MappedFileQueue#getLastMappedFile(...)
// 省略程式碼
- 說明 :獲取最後一個
MappedFile
,若不存在或檔案已滿,則進行建立。 - 第 5 至 11 行 :計算當檔案不存在或已滿時,新建立檔案的
createOffset
。 - 第 14 行 :計算檔名。從此處我們可
以得知,
MappedFile
的檔案命名規則:
> fileName[n] = fileName[n - 1] + n * mappedFileSize
> fileName[0] = startOffset - (startOffset % this.mappedFileSize)
目前 `CommitLog` 的 `startOffset` 為 0。
此處有個**疑問**,為什麼需要 `(startOffset % this.mappedFileSize)`。例如:
| startOffset | mappedFileSize | createOffset |
| --- | :-- | :-- |
| 5 | 1 | 5 |
| 5 | 2 | 4 |
| 5 | 3 | 3 |
| 5 | 4 | 4 |
| 5 | > 5 | 0 |
_如果有知道的同學,麻煩提示下。?_*解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 計算出來的是,以 `this.mappedFileSize` 為每個檔案大小時,`startOffset` 所在檔案的開始`offset`*
- 第 30 至 35 行 :設定
MappedFile
是否是第一個建立的檔案。該標識用於ConsumeQueue
對應的MappedFile
,詳見ConsumeQueue#fillPreBlank
。
MappedFile#appendMessage(...)
// 省略程式碼
- 說明 :插入訊息到
MappedFile
,並返回插入結果。 - 第 8 行 :獲取需要寫入的位元組緩衝區。為什麼會有
writeBuffer != null
的判斷後,使用不同的位元組緩衝區,見:FlushCommitLogService。 - 第 9 至 11 行 :設定寫入
position
,執行寫入,更新wrotePosition
(當前寫入位置,下次開始寫入開始位置)。
DefaultAppendMessageCallback#doAppend(...)
// 省略程式碼
- 說明 :插入訊息到位元組緩衝區。
- 第 45 行 :計算物理位置。在
CommitLog
的順序儲存位置。 - 第 47 至 49 行 :計算
CommitLog
裡的offsetMsgId
。這裡一定要和msgId
區分開。
計算方式 |
長度 |
|||
---|---|---|---|---|
offsetMsgId |
Broker儲存時生成 |
Hex(storeHostBytes, wroteOffset) |
32 |
|
msgId |
Client傳送訊息時生成 |
Hex(程序編號, IP, ClassLoader, startTime, currentTime, 自增序列) |
32 |
《RocketMQ 原始碼分析 —— Message 基礎》 |
- 第 51 至 61 行 :獲取佇列位置(offset)。
- 第 78 至 95 行 :計算訊息總長度。
- 第 98 至 112 行 :當檔案剩餘空間不足時,寫入
BLANK
佔位,返回結果。 - 第 114 至 161 行 :寫入
MESSAGE
。 - 第 173 行 :更新佇列位置(offset)。
FlushCommitLogService
執行緒服務 |
場景 |
插入訊息效能 |
---|---|---|
CommitRealTimeService |
非同步刷盤 && 開啟記憶體位元組緩衝區 |
第一 |
FlushRealTimeService |
非同步刷盤 && 關閉記憶體位元組緩衝區 |
第二 |
GroupCommitService |
同步刷盤 |
第三 |
MappedFile#落盤
方式 |
|||
---|---|---|---|
方式一 |
寫入記憶體位元組緩衝區(writeBuffer) |
從記憶體位元組緩衝區(write buffer)提交(commit)到檔案通道(fileChannel) |
檔案通道(fileChannel)flush |
方式二 |
寫入對映檔案位元組緩衝區(mappedByteBuffer) |
對映檔案位元組緩衝區(mappedByteBuffer)flush |
flush相關程式碼
考慮到寫入效能,滿足 flushLeastPages * OS_PAGE_SIZE
才進行 flush
。
// 省略程式碼
commit相關程式碼:
考慮到寫入效能,滿足 commitLeastPages * OS_PAGE_SIZE
才進行 commit
。
// 省略程式碼
FlushRealTimeService
訊息插入成功時,非同步刷盤時使用。
// 省略程式碼
- 說明:實時
flush
執行緒服務,呼叫MappedFile#flush
相關邏輯。 - 第 23 至 29 行 :每
flushPhysicQueueThoroughInterval
週期,執行一次flush
。因為不是每次迴圈到都能滿足flushCommitLogLeastPages
大小,因此,需要一定週期進行一次強制flush
。當然,不能每次迴圈都去執行強制flush
,這樣效能較差。 - 第 33 行 至 37 行 :根據
flushCommitLogTimed
引數,可以選擇每次迴圈是固定週期還是等待喚醒。預設配置是後者,所以,每次插入訊息完成,會去呼叫commitLogService.wakeup()
。 - 第 45 行 :呼叫
MappedFile
進行flush
。 - 第 61 至 65 行 :
Broker
關閉時,強制flush
,避免有未刷盤的資料。
CommitRealTimeService
訊息插入成功時,非同步刷盤時使用。
和 FlushRealTimeService
類似,效能更好。
// 省略程式碼
GroupCommitService
訊息插入成功時,同步刷盤時使用。
// 省略程式碼
- 說明:批量寫入執行緒服務。
- 第 16 至 25 行 :新增寫入請求。方法設定了
sync
的原因:this.requestsWrite
會和this.requestsRead
不斷交換,無法保證穩定的同步。 - 第 27 至 34 行 :讀寫佇列交換。
- 第 38 至 60 行 :迴圈寫入佇列,進行
flush
。- 第 43 行 :考慮到有可能每次迴圈的訊息寫入的訊息,可能分佈在兩個
MappedFile
(寫第N個訊息時,MappedFile
已滿,建立了一個新的),所以需要有迴圈2次。 - 第 51 行 :喚醒等待寫入請求執行緒,通過
CountDownLatch
實現
- 第 43 行 :考慮到有可能每次迴圈的訊息寫入的訊息,可能分佈在兩個
- 第 61 至 66 行 :直接刷盤。此處是由於傳送的訊息的
isWaitStoreMsgOK
未設定成TRUE
,導致未走批量提交。 - 第 73 至 80 行 :每 10ms 執行一次批量提交。當然,如果
wakeup()
時,則會立即進行一次批量提交。當Broker
設定成同步落盤 && 訊息isWaitStoreMsgOK=true
,訊息需要略大於 10ms 才能傳送成功。當然,效能相對非同步落盤較差,可靠性更高,需要我們在實際使用時去取捨。