1. 程式人生 > 程式設計 >Streams:深入理解Redis5.0新特性

Streams:深入理解Redis5.0新特性

概述

相較於Redis4.0,Redis5.0增加了很多新的特性,而streams是其中最重要的特性之一。streams是redis 的一種基本資料結構,它是一個新的強大的支援多播的可持久化的訊息佇列,在設計上借鑑了kafaka。streams的資料型別本身非常簡單,有點類似於hash結構,但是它的額外特性異常強大且複雜:

  • 支援持久化。streams能持久化儲存資料,不同於pub/sub機制和list 訊息被消費後就會被刪除,streams消費過的資料會被持久化的儲存在歷史中。
  • 支援多播。 這一點跟 pub/sub有些類似。
  • 支援消費者組。streams 允許同一消費組內的消費者競爭訊息,並提供了一系列機制允許消費者檢視自己的歷史消費訊息。並允許監控streams的消費者組資訊,消費者組內消費者資訊,也可以監控streams內訊息的狀態。

基礎內容

資料 ID

streams 提供了預設的id模式用來唯一標識streams中的每一條資料,由兩部分組成:
<millisecondsTime>-<sequenceNumber>
millisecondsTime是redis服務所在機器的時間,sequenceNumber用於同一毫秒建立的資料。需要注意的一點是streams的id總是單調增長的,即使redis服務所在的伺服器時間異常。如果當前的毫秒數小於以前的毫秒數,就會使用歷史記錄中最大的毫秒數,然後序列號遞增。而這樣做的原因是因為streams的機制允許根據時間區間或者某一個時間節點或者某一id查詢資料。

向streams插入資料

streams 的基礎寫命令為XADD,其語法為XADD key ID field value [field value ...]

127.0.0.1:6379> XADD mystream * name dwj age 18
"1574925508730-0"
127.0.0.1:6379>
複製程式碼

上面的例子使用XADD向名為mystream的streams中添加了一條資料,ID使用*表示使用streams使用預設的ID,在本例中redis返回的1574925508730-0就是redis為我們插入的資料生成的ID。

另外streams 檢視streams長度的命令為XLEN

127.0.0.1:6379> XLEN mystream
(integer
) 3 127.0.0.1:6379> 複製程式碼

從streams中讀取資料

從streams中讀取資料會比寫資料複雜很多,用日誌檔案進行對比,我們可以檢視歷史日誌,可以根據範圍查詢日誌,我們可以通過unix的命令tail -f來監聽日誌,可以多個使用者檢視到同一份日誌,也可以多個使用者只能檢視到自己有許可權檢視的那一部分日誌。

按範圍查詢: XRANGE 和 XREVRANGE

首先來介紹一下 根據範圍查詢,這兩種操作都比較簡單,以XRANGE為例,它的語法格式為XRANGE key start end [COUNT count],我們只需要提供兩個id,startend,返回的將是一個包含startend的閉區間。兩個特殊的ID-+分別表示可能的最小ID和最大ID。

127.0.0.1:6379> XRANGE mystream - +
1) 1) "1574835253335-0"
   2) 1) "name"
      2) "bob"
      3) "age"
      4) "23"
2) 1) "1574925508730-0"
   2) 1) "name"
      2) "dwj"
      3) "age"
      4) "18"
127.0.0.1:6379>
複製程式碼

我們前邊提到過資料id中包含了建立資料的時間資訊,這意味著我們可以根據時間範圍查詢資料,為了根據時間範圍查詢,我們省略掉ID的序列號部分,如果省略,對於start ID會使用0作為預設的序列號,對於end ID會使用最大序列號作為預設值,這樣的話我們使用兩個unix時間戳去查詢資料就可以得到那個時間區間內所有的資料。

1) 1) "1574835253335-0"
   2) 1) "name"
      2) "bob"
      3) "age"
      4) "23"
127.0.0.1:6379>
複製程式碼

可能還會有同學注意到語法的最後邊還有count引數,這個引數允許我們一次只返回固定數量的資料,然後根據返回資料的last_id,作為下一次查詢的start,這樣就允許我們在一個量非常大的streams裡批量返回資料。
XREVRANGE命令與XRANGE相同,但是以相反的順序返回元素,就不重複介紹了。

通過XREAD讀取資料

XREAD允許我們從某一結點開始從streams中讀取資料,它的語法為XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...],我們在這裡主要將的是通過XREAD來訂閱到達streams新的資料。這種操作可能跟REDIS中原有的pub/sub機制或者阻塞佇列的概念有些類似,都是等待一個key然後獲取到新的資料,但是跟這兩種有著本質的差別:

  • streams跟pub/sub阻塞佇列允許多個客戶端一起等待資料,預設情況下,streams會把訊息推送給所有等待streams資料的客戶端,這個能力跟pub/sub有點類似,但是streams也允許把訊息通過競爭機制推送給其中的一個客戶端(這種模式需要用到消費者組的概念,會在後邊講到)。
  • pub/sub的訊息是fire and forget並且從不儲存,你只可以訂閱到在你訂閱時間之後產生的訊息,並且訊息只會推送給客戶端一次,不能檢視歷史記錄。以及使用阻塞佇列時,當客戶端收到訊息時,這個元素會從佇列中彈出,換句話說,不能檢視某個消費者消費訊息的歷史。而在streams中所有的訊息會被無限期的加入到streams中(訊息可以被顯式的刪除並且存在淘汰機制),客戶端需要記住收到的最後一條訊息,用於獲取到節點之後的新訊息。
  • Streams 消費者組提供了一種Pub/Sub或者阻塞列表都不能實現的控制級別,同一個Stream不同的群組,顯式地確認已經處理的專案,檢查待處理的專案的能力,申明未處理的訊息,以及每個消費者擁有連貫歷史可見性,單個客戶端只能檢視自己過去的訊息歷史記錄。
    從streams中讀取資料
    127.0.0.1:6379> XREAD COUNT 2 STREAMS mystream 0
    1) 1) "mystream"
     2) 1) 1) "1574835253335-0"
           2) 1) "name"
              2) "bob"
              3) "age"
              4) "23"
        2) 1) "1574925508730-0"
           2) 1) "name"
              2) "dwj"
              3) "age"
              4) "18"
    127.0.0.1:6379>
    複製程式碼
    同list結構一樣,streams也提供了阻塞讀取的命令
    XREAD BLOCK 0 STREAMS mystream
    複製程式碼
    在上邊的命令中指定了BLOCK選項,超時時間為0毫秒(意味著永不會過期)。此外,這個地方使用了特殊的id $,這個特殊的id代表著當前streams中最大的id,這就意味著你只會讀取streams中在你監聽時間以後的訊息。有點類似於Unix的tail -f。另外XREAD可以同時監聽多個流中的資料。

消費者組

如果我們想要的不是多個客戶端處理相同的訊息,而是多個客戶端從streams中獲取到不同的訊息進行處理。也就是我們常用的生產者-消費者模型。假如想象我們具有兩個生產者p1,p2,三個消費者c1,c2,c3以及7個商品。我們想按照下面的效果進行處理

p1 =>item1 => c1
p2 =>item2 => c2
p1 =>item3 => c3
p2 =>item4 => c1
p1 =>item5 => c2
p2 =>item6 => c3
p1 =>item7 => c1
複製程式碼

為瞭解決這種場景,redis使用了一個名為消費者的概念,有點類似於kafka,但只是表現上。消費者組就像是一個偽消費者,它從流內讀取資料,然後分發給組內的消費者,並記錄該消費者組消費了哪些資料,處理了那些資料,並提供了一系列功能。

  1. 每條訊息都提供給不同的消費者,因此不可能將相同的訊息傳遞給多個消費者。
  2. 消費者在消費者組中通過名稱來識別,該名稱是實施消費者的客戶必須選擇的區分大小寫的字串。這意味著即便斷開連線過後,消費者組仍然保留了所有的狀態,因為客戶端會重新申請成為相同的消費者。 然而,這也意味著由客戶端提供唯一的識別符號。
  3. 每一個消費者組都有一個第一個ID永遠不會被消費的概念,這樣一來,當消費者請求新訊息時,它能提供以前從未傳遞過的訊息。
  4. 消費訊息需要使用特定的命令進行顯式確認,表示:這條訊息已經被正確處理了,所以可以從消費者組中逐出。
  5. 消費者組跟蹤所有當前所有待處理的訊息,也就是,訊息被傳遞到消費者組的一些消費者,但是還沒有被確認為已處理。由於這個特性,當訪問一個Stream的歷史訊息的時候,每個消費者將只能看到傳遞給它的訊息。

它的模型類似於如下

| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |
複製程式碼

從上邊的模型中我們可以看出消費者組記錄處理的最後一條訊息,將訊息分發給不同的消費者,每個消費者只能看到自己的訊息。如果把消費者組看做streams的輔助資料結構,我們可以看出一個streams可以擁有多個消費者組,一個消費者組內可以擁有多個消費者。實際上,一個streams允許客戶端使用XREAD讀取的同時另一個客戶端通過消費者群組讀取資料。

建立一個消費者群組

我們首先建立一個包含了一些資料的streams

127.0.0.1:6379> XADD fruit * message apple
"1574935311149-0"
127.0.0.1:6379> XADD fruit * message banada
"1574935315886-0"
127.0.0.1:6379> XADD fruit * message pomelo
"1574935323628-0"
複製程式碼

然後建立一個消費者組

127.0.0.1:6379> XGROUP CREATE fruit mygroup $
OK
複製程式碼

注意我們需要指定一個id,這裡我們使用的是特殊id$,我們也可以使用0或者一個unix時間戳,這樣,消費者組只會讀取這個節點之後的訊息。

現在消費者組建立好了,我們可以使用XREADGROUP命令立即開始嘗試通過消費者組讀取訊息。
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...],與XREAD類似,提供了BLOCK選項。假設指定消費者分別是Alice和Bob,來看看系統會怎樣返回不同訊息給Alice和Bob。

127.0.0.1:6379> XREADGROUP GROUP  mygroup Alice COUNT 1 STREAMS fruit >
1) 1) "fruit"
   2) 1) 1) "1574936034258-0"
         2) 1) "message"
            2) "apple"
127.0.0.1:6379>
複製程式碼

上邊命令代表的資訊是:我要通過mygroup讀取streams fruit中的資料,我在群組中的身份是Alice,請給我一條資料。 >操作符只在消費者組的上線文中有效,代表訊息到目前為止沒有交給其它消費者處理過。
我們也可以使用一個有效的id,在這種情況下,消費者組會告訴我們的歷史待處理訊息,而不會告訴我們新的訊息。這個特性也是很有用的,當消費者因為某些原因重新啟動後,我們可以檢視自己的歷史待處理訊息,處理完待處理訊息後再去處理新的訊息。
我們可以通過XACK命令告訴消費者組某條訊息已經被正確處理,不要顯示在我的歷史待處理訊息列表中。XACK的語法為XACK key group ID [ID ...]

127.0.0.1:6379> XACK fruit mygroup 1574936034258-0
(integer) 1
複製程式碼

有幾件事需要記住:

  1. 消費者是在他們第一次被提及的時候自動建立的,不需要顯式建立。
  2. 即使使用XREADGROUP,你也可以同時從多個key中讀取,但是要讓其工作,你需要給每一個Stream建立一個名稱相同的消費者組。這並不是一個常見的需求,但是需要說明的是,這個功能在技術上是可以實現的。
  3. XREADGROUP命令是一個寫命令,因為當它從Stream中讀取訊息時,消費者組被修改了,所以這個命令只能在master節點呼叫。

從永久失敗中恢復

在一個消費者群組中可能存在多個消費者消費訊息,但是也可能會存在某一個消費者永久退出消費者群組的情況,這樣我們就需要一種機制,把該消費者的待處理訊息分配給消費者群組的另一個消費者。這就需要我們具有檢視待處理訊息的能力以及把某個訊息分配給指定消費者的能力。前者是通過一個叫XPENDING的命令,它的語法為XPENDING key group [start end count] [consumer]

127.0.0.1:6379> XPENDING fruit mygroup
1) (integer) 1
2) "1574936042937-0"
3) "1574936042937-0"
4) 1) 1) "Alice"
      2) "1"
複製程式碼

上述返回結果代表的是消費者群組有1條待處理命令,待處理訊息的起始id為1574936042937-0,結束id為1574936042937-0,名為Alice的消費者有一個待處理命令,可能有人會好奇我們在前邊往fruit放入了3個水果,使用XACK處理了一個水果,消費者待處理列表中應該有兩個水果,而事實上消費者群組的待處理列表為該群組下消費者待處理訊息的合集,當有消費者通過群組獲取訊息的時候會改變消費者群組的狀態,這也是前邊提到的為什麼XREADGROUP必須在master節點進行呼叫。
我們可以使用start end count 引數來檢視某個範圍內訊息的狀態

127.0.0.1:6379> XPENDING fruit mygroup - + 10 Alice
1) 1) "1574936042937-0"
   2) "Alice"
   3) (integer) 903655
   4) (integer) 1
2) 1) "1574936052018-0"
   2) "Alice"
   3) (integer) 491035
   4) (integer) 1
複製程式碼

這樣我們就看到了一條訊息的詳細資訊,id為1574936042937-0的訊息的消費者為Alice,它的pending時間為903655,這個訊息被分配了1次。
我們會發現第一條訊息的處理時間有點長,我們懷疑Alice已經不能處理這條訊息了,於是我們想把這條訊息分配給Bob,這種場景下就需要用到了XCLAIM命令,它的語法為XCLAIM ...,其中min-idle-time為訊息的最小空閒時間,只有訊息的空閒時間大於這個值訊息才會被分配,因為訊息被分配的時候會重置訊息的空閒時間,如果有同時把一條訊息分配給兩個客戶端,只會第一條命令生效,因為當訊息分配給第一個客戶端的時候重置空閒時間,第二條命令則會失效。
我們也可以使用一個獨立的程式來不斷尋找超時的訊息,並把它分配給活躍的消費者,不過需要注意的是,如果訊息的分配次數達到某個闕值,不應該把訊息再分配出去,而是應該放到別的地方。

streams的可觀察性

streams具有不錯的可觀察性,前邊的XPENDING命令允許我們檢視streams在某個消費者群組內待處理訊息的狀態。但是我們想看的更多,比如在這個streams下有多少個group,在這個group下有多少消費者。這就要用到XINFO命令:
檢視streams資訊:

127.0.0.1:6379> XINFO STREAM mystream
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 1
 9) "last-generated-id"
10) "1574925508730-0"
11) "first-entry"
12) 1) "1574835253335-0"
    2) 1) "name"
       2) "bob"
       3) "age"
       4) "23"
13) "last-entry"
14) 1) "1574925508730-0"
    2) 1) "name"
       2) "dwj"
       3) "age"
       4) "18"
複製程式碼

輸出中會告訴我們streams的長度,群組數量,第一條和最後一條資訊的詳情。下面看一下streams下群組的資訊:

127.0.0.1:6379> XINFO GROUPS fruit
1) 1) "name"
   2) "mygroup"
   3) "consumers"
   4) (integer) 1
   5) "pending"
   6) (integer) 2
   7) "last-delivered-id"
   8) "1574936052018-0"
2) 1) "name"
   2) "mygroup-1"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "0-0"
複製程式碼

我們可以從輸出中看到fruit下有兩個群組,群組的名稱以及待處理訊息的數量,處理的最後一條訊息。我們可以在詳細的檢視下消費者群組內消費者的狀態。

127.0.0.1:6379> XINFO CONSUMERS fruit mygroup
1) 1) "name"
   2) "Alice"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 1990242
2) 1) "name"
   2) "Bob"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 9178
複製程式碼

從輸出中可以看到消費者待處理訊息的數量以及消費者的閒置時間。

設定streams上限

如果從streams可以檢視到歷史記錄,我們可能會有疑惑,如果streams無限期的加入記憶體會不會夠用,一旦訊息數量達到上限,將訊息永久刪除或者持久化到資料庫都是有必要的,redis也提供了諸如此類場景的支援。
一種方法是我們使用XADD的時候指定streams的最大長度,XADD mystream MAXLEN ~ 1000其中的數值前可以加上~標識不需要精確的將長度保持在1000,比1000多一些也可以接受。如果不使用該標識,效能會差一些。另一種方法是使用XTRIM,該命令也是使用MAXLEN選項,> XTRIM mystream MAXLEN ~ 10

一些特殊的id

前面提到了在streams API裡邊存在一些特殊的id。
首先是-+,這兩個ID在XRANGE命令中使用,分別代表最小的id和最大的id。-代表0-1+代表18446744073709551615-18446744073709551615,從使用上方便了很多。在XPENDING等範圍查詢中都可以使用。
$代表streams中當前存在的最大的id,在XREADXGROUP中代表只獲取新到的訊息。需要注意的是$+的含義並不一致。
還有一個特殊的id是>,這個id只能夠在XREADGROUP命令中使用,意味著在這個消費者群組中,從來沒有分配給其他的消費者,所以總是使用>作為群組中的last delivered ID

持久化,複製和訊息安全性

與redis的其它資料結構一樣,streams會非同步複製到從節點,並持久化到AOF和RDB檔案中,並且消費者群組的狀態也會按照此機制進行持久化。
需要注意的幾點是:

  • 如果訊息的持久化以及狀態很重要,則AOF必須使用強fsync配合(AOF記錄每一條更改redis資料的命令,有很多種持久化機制,在這個地方要用到的是appendfsync always 這樣會嚴重降低Redis的速度)
  • 預設情況下,非同步複製不能保證從節點的資料與主節點保持一致,在故障轉移以後可能會丟失一些內容,這跟從節點從主節點接受資料的能力有關。
  • WAIT命令可以用於強制將更改傳輸到一組從節點上。雖然這使得資料不太可能會丟失,但是redis的Sentinel和cluster在進行故障轉移的時候不一定會使用具有最新資料的從節點,在一些特殊故障下,反而會使用缺少一些資料的從節點。
    因此在使用redis streams和消費者群組在設計程式的時候,確保瞭解你的應用程式在故障期間的應對策略,並進行相應地配置,評估它對你的程式是否足夠安全。

從streams中刪除資料

刪除streams中的資料使用XDEL命令,其語法為XDEL key ID [ID ...],需要注意的是在當前的實現中,在巨集節點完全為空之前,記憶體並沒有真正回收,所以你不應該濫用這個特性。

streams的效能

streams的不阻塞命令,比如XRANGE或者不使用BLOCK選項的XREADXREADGROUP跟redis普通命令一致,所以沒有必要討論。如果有興趣的話可以在redis的檔案中檢視到對應命令的時間複雜度。streams命令的速度在一定範圍內跟set是一致的,XADD命令的速度非常快,在一個普通的機器上,一秒鐘可以插入50w~100w條資料。
我們感興趣的是在消費者群組的阻塞場景下,從通過XADD命令向streams中插入一條資料,到消費者通過群組讀取到這條訊息的效能。
為了測試訊息從產生到消費間的延遲,我們使用ruby程式進行測試,將訊息的產生時間作為訊息的一個欄位,然後把訊息推送到streams中,客戶端收到訊息後使用當前時間跟生產時間進行對比,從而計算出訊息的延遲時間。這個程式未進行效能優化,執行在一個雙核的機器上,同時redis也執行在這臺機器上,以此來模擬不是理想條件下的場景。訊息每秒鐘產生1w條,群組內有10個消費者消費資料。測試結果如下:

Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%
複製程式碼

99.9%的請求的延遲小於等於2毫秒,而且異常值非常接近平均值。另外需要注意的兩點:

  1. 消費者每次處理1w條訊息,這樣增加了一些延遲,這樣做是為了消費速度較慢的消費者能夠保持保持訊息流。
  2. 用來做測試的系統相比於現在的系統非常慢。


原文連結: redis.io/topics/stre…

本文譯者:Worktile工程師 杜文傑

文章來源:Worktile技術部落格

歡迎訪問交流更多關於技術及協作的問題。

文章轉載請註明出處。