1. 程式人生 > >Kafka設計解析(十三)Kafka消費組(consumer group)

Kafka設計解析(十三)Kafka消費組(consumer group)

信息 格式 eve 引擎 區分 展開 rebalance 4.5 內容

轉載自 huxihx,原文鏈接 Kafka消費組(consumer group)

一直以來都想寫一點關於kafka consumer的東西,特別是關於新版consumer的中文資料很少。最近Kafka社區郵件組已經在討論是否應該正式使用新版本consumer替換老版本,筆者也覺得時機成熟了,於是寫下這篇文章討論並總結一下新版本consumer的些許設計理念,希望能把consumer這點事說清楚,從而對廣大使用者有所幫助。

在開始之前,我想花一點時間先來明確一些概念和術語,這會極大地方便我們下面的討論。另外請原諒這文章有點長,畢竟要討論的東西很多,雖然已然刪除了很多太過細節的東西。

目錄

一、 誤區澄清與概念明確

1. Kafka的版本

2. 新版本 VS 老版本

二、消費者組 (Consumer Group)

1. 什麽是消費者組

2. 消費者位置(consumer position)

3. 位移管理(offset management)

3.1 自動VS手動

3.2 位移提交

4. Rebalance

4.1 什麽是rebalance?

4.2 什麽時候rebalance?

4.3 如何進行組內分區分配?

4.4 誰來執行rebalance和consumer group管理?

4.5 如何確定coordinator?

4.6 Rebalance Generation

4.7 協議(protocol)

4.8 liveness

4.9 Rebalance過程

4.10 consumer group狀態機

三、rebalance場景剖析

1. 新成員加入組(member join)

2. 組成員崩潰(member failure)

3. 組成員主動離組(member leave group)

4. 提交位移(member commit offset)

一、 誤區澄清與概念明確

1. Kafka的版本

很多人在Kafka中國社區(替群主做個宣傳,QQ號:162272557)提問時的開頭經常是這樣的:“我使用的kafka版本是2.10/2.11, 現在碰到一個奇怪的問題。。。。” 無意冒犯,但這裏的2.10/2.11不是kafka的版本,而是編譯kafka的Scala版本。Kafka的server端代碼是由Scala語言編寫的,目前Scala主流的3個版本分別是2.10、2.11和2.12。實際上Kafka現在每個PULL request都已經自動增加了這三個版本的檢查。下圖是我的一個PULL request,可以看到這個fix會同時使用3個scala版本做編譯檢查:

技術分享圖片

2. 新版本 VS 老版本

“我的kafkaoffsetmonitor為什麽無法監控到offset了?”——這是我在Kafka中國社區見到最多的問題,沒有之一!實際上,Kafka 0.9開始提供了新版本的consumer及consumer group,位移的管理與保存機制發生了很大的變化——新版本consumer默認將不再保存位移到zookeeper中,而目前kafkaoffsetmonitor還沒有應對這種變化(雖然已經有很多人在要求他們改了,詳見https://github.com/quantifind/KafkaOffsetMonitor/issues/79),所以很有可能是因為你使用了新版本的consumer才無法看到的。關於新舊版本,這裏統一說明一下:kafka0.9以前的consumer是使用Scala編寫的,包名結構是kafka.consumer.*,分為high-level consumer和low-level consumer兩種。我們熟知的ConsumerConnector、ZookeeperConsumerConnector以及SimpleConsumer就是這個版本提供的;自0.9版本開始,Kafka提供了java版本的consumer,包名結構是o.a.k.clients.consumer.*,熟知的類包括KafkaConsumer和ConsumerRecord等。新版本的consumer可以單獨部署,不再需要依賴server端的代碼。

二、消費者組 (Consumer Group)

1. 什麽是消費者組

其實對於這些基本概念的普及,網上資料實在太多了。我本不應該再畫蛇添足了,但為了本文的完整性,我還是要花一些篇幅來重談consumer group,至少可以說說我的理解。值得一提的是,由於我們今天基本上只探討consumer group,對於單獨的消費者不做過多討論。

什麽是consumer group? 一言以蔽之,consumer group是kafka提供的可擴展且具有容錯性的消費者機制。既然是一個組,那麽組內必然可以有多個消費者或消費者實例(consumer instance),它們共享一個公共的ID,即group ID。組內的所有消費者協調在一起來消費訂閱主題(subscribed topics)的所有分區(partition)。當然,每個分區只能由同一個消費組內的一個consumer來消費。(網上文章中說到此處各種炫目多彩的圖就會緊跟著拋出來,我這裏就不畫了,請原諒)。個人認為,理解consumer group記住下面這三個特性就好了:

  • consumer group下可以有一個或多個consumer instance,consumer instance可以是一個進程,也可以是一個線程
  • group.id是一個字符串,唯一標識一個consumer group
  • consumer group下訂閱的topic下的每個分區只能分配給某個group下的一個consumer(當然該分區還可以被分配給其他group)

2. 消費者位置(consumer position)

消費者在消費的過程中需要記錄自己消費了多少數據,即消費位置信息。在Kafka中這個位置信息有個專門的術語:位移(offset)。很多消息引擎都把這部分信息保存在服務器端(broker端)。這樣做的好處當然是實現簡單,但會有三個主要的問題:1. broker從此變成有狀態的,會影響伸縮性;2. 需要引入應答機制(acknowledgement)來確認消費成功。3. 由於要保存很多consumer的offset信息,必然引入復雜的數據結構,造成資源浪費。而Kafka選擇了不同的方式:每個consumer group保存自己的位移信息,那麽只需要簡單的一個整數表示位置就夠了;同時可以引入checkpoint機制定期持久化,簡化了應答機制的實現。

3. 位移管理(offset management)

3.1 自動VS手動

Kafka默認是定期幫你自動提交位移的(enable.auto.commit = true),你當然可以選擇手動提交位移實現自己控制。另外kafka會定期把group消費情況保存起來,做成一個offset map,如下圖所示:

技術分享圖片

上圖中表明了test-group這個組當前的消費情況。

3.2 位移提交

老版本的位移是提交到zookeeper中的,圖就不畫了,總之目錄結構是:/consumers/<group.id>/offsets/<topic>/<partitionId>,但是zookeeper其實並不適合進行大批量的讀寫操作,尤其是寫操作。因此kafka提供了另一種解決方案:增加__consumeroffsets topic,將offset信息寫入這個topic,擺脫對zookeeper的依賴(指保存offset這件事情)。__consumer_offsets中的消息保存了每個consumer group某一時刻提交的offset信息。依然以上圖中的consumer group為例,格式大概如下:

技術分享圖片

__consumers_offsets topic配置了compact策略,使得它總是能夠保存最新的位移信息,既控制了該topic總體的日誌容量,也能實現保存最新offset的目的。compact的具體原理請參見:Log Compaction

至於每個group保存到__consumers_offsets的哪個分區,如何查看的問題請參見這篇文章:Kafka設計解析(十二)Kafka 如何讀取offset topic內容 (__consumer_offsets)

4. Rebalance

4.1 什麽是rebalance?

rebalance本質上是一種協議,規定了一個consumer group下的所有consumer如何達成一致來分配訂閱topic的每個分區。比如某個group下有20個consumer,它訂閱了一個具有100個分區的topic。正常情況下,Kafka平均會為每個consumer分配5個分區。這個分配的過程就叫rebalance。

4.2 什麽時候rebalance?

這也是經常被提及的一個問題。rebalance的觸發條件有三種:

  • 組成員發生變更(新consumer加入組、已有consumer主動離開組或已有consumer崩潰了——這兩者的區別後面會談到)
  • 訂閱主題數發生變更——這當然是可能的,如果你使用了正則表達式的方式進行訂閱,那麽新建匹配正則表達式的topic就會觸發rebalance
  • 訂閱主題的分區數發生變更

4.3 如何進行組內分區分配?

之前提到了group下的所有consumer都會協調在一起共同參與分配,這是如何完成的?Kafka新版本consumer默認提供了兩種分配策略:range和round-robin。當然Kafka采用了可插拔式的分配策略,你可以創建自己的分配器以實現不同的分配策略。實際上,由於目前range和round-robin兩種分配器都有一些弊端,Kafka社區已經提出第三種分配器來實現更加公平的分配策略,只是目前還在開發中。我們這裏只需要知道consumer group默認已經幫我們把訂閱topic的分區分配工作做好了就行了。

簡單舉個例子,假設目前某個consumer group下有兩個consumer: A和B,當第三個成員加入時,kafka會觸發rebalance並根據默認的分配策略重新為A、B和C分配分區,如下圖所示:

技術分享圖片

4.4 誰來執行rebalance和consumer group管理?

Kafka提供了一個角色:coordinator來執行對於consumer group的管理。坦率說kafka對於coordinator的設計與修改是一個很長的故事。最新版本的coordinator也與最初的設計有了很大的不同。這裏我只想提及兩次比較大的改變。

首先是0.8版本的coordinator,那時候的coordinator是依賴zookeeper來實現對於consumer group的管理的。Coordinator監聽zookeeper的/consumers/<group>/ids的子節點變化以及/brokers/topics/<topic>數據變化來判斷是否需要進行rebalance。group下的每個consumer都自己決定要消費哪些分區,並把自己的決定搶先在zookeeper中的/consumers/<group>/owners/<topic>/<partition>下註冊。很明顯,這種方案要依賴於zookeeper的幫助,而且每個consumer是單獨做決定的,沒有那種“大家屬於一個組,要協商做事情”的精神。

基於這些潛在的弊端,0.9版本的kafka改進了coordinator的設計,提出了group coordinator——每個consumer group都會被分配一個這樣的coordinator用於組管理和位移管理。這個group coordinator比原來承擔了更多的責任,比如組成員管理、位移提交保護機制等。當新版本consumer group的第一個consumer啟動的時候,它會去和kafka server確定誰是它們組的coordinator。之後該group內的所有成員都會和該coordinator進行協調通信。顯而易見,這種coordinator設計不再需要zookeeper了,性能上可以得到很大的提升。後面的所有部分我們都將討論最新版本的coordinator設計。

4.5 如何確定coordinator?

上面簡單討論了新版coordinator的設計,那麽consumer group如何確定自己的coordinator是誰呢? 簡單來說分為兩步:

  • 確定consumer group位移信息寫入__consumers_offsets的哪個分區。具體計算公式:
    •   __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) 註意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默認是50個分區。
  • 該分區leader所在的broker就是被選定的coordinator

4.6 Rebalance Generation

JVM GC的分代收集就是這個詞(嚴格來說是generational),我這裏把它翻譯成“屆”好了,它表示了rebalance之後的一屆成員,主要是用於保護consumer group,隔離無效offset提交的。比如上一屆的consumer成員是無法提交位移到新一屆的consumer group中。我們有時候可以看到ILLEGAL_GENERATION的錯誤,就是kafka在抱怨這件事情。每次group進行rebalance之後,generation號都會加1,表示group進入到了一個新的版本,如下圖所示: Generation 1時group有3個成員,隨後成員2退出組,coordinator觸發rebalance,consumer group進入Generation 2,之後成員4加入,再次觸發rebalance,group進入Generation 3。

技術分享圖片

4.7 協議(protocol)

前面說過了, rebalance本質上是一組協議。group與coordinator共同使用它來完成group的rebalance。目前kafka提供了5個協議來處理與consumer group coordination相關的問題:

  • Heartbeat請求:consumer需要定期給coordinator發送心跳來表明自己還活著
  • LeaveGroup請求:主動告訴coordinator我要離開consumer group
  • SyncGroup請求:group leader把分配方案告訴組內所有成員
  • JoinGroup請求:成員請求加入組
  • DescribeGroup請求:顯示組的所有信息,包括成員信息,協議名稱,分配方案,訂閱信息等。通常該請求是給管理員使用

Coordinator在rebalance的時候主要用到了前面4種請求。

4.8 liveness

consumer如何向coordinator證明自己還活著? 通過定時向coordinator發送Heartbeat請求。如果超過了設定的超時時間,那麽coordinator就認為這個consumer已經掛了。一旦coordinator認為某個consumer掛了,那麽它就會開啟新一輪rebalance,並且在當前其他consumer的心跳response中添加“REBALANCE_IN_PROGRESS”,告訴其他consumer:不好意思各位,你們重新申請加入組吧!

4.9 Rebalance過程

終於說到consumer group執行rebalance的具體流程了。很多用戶估計對consumer內部的工作機制也很感興趣。下面就跟大家一起討論一下。當然我必須要明確表示,rebalance的前提是coordinator已經確定了。

總體而言,rebalance分為2步:Join和Sync

1 Join, 顧名思義就是加入組。這一步中,所有成員都向coordinator發送JoinGroup請求,請求入組。一旦所有成員都發送了JoinGroup請求,coordinator會從中選擇一個consumer擔任leader的角色,並把組成員信息以及訂閱信息發給leader——註意leader和coordinator不是一個概念。leader負責消費分配方案的制定。

2 Sync,這一步leader開始分配消費方案,即哪個consumer負責消費哪些topic的哪些partition。一旦完成分配,leader會將這個方案封裝進SyncGroup請求中發給coordinator,非leader也會發SyncGroup請求,只是內容為空。coordinator接收到分配方案之後會把方案塞進SyncGroup的response中發給各個consumer。這樣組內的所有成員就都知道自己應該消費哪些分區了。

還是拿幾張圖來說明吧,首先是加入組的過程:

技術分享圖片

值得註意的是, 在coordinator收集到所有成員請求前,它會把已收到請求放入一個叫purgatory(煉獄)的地方。記得國內有篇文章以此來證明kafka開發人員都是很有文藝範的,寫得也是比較有趣,有興趣可以去搜搜。
然後是分發分配方案的過程,即SyncGroup請求:

技術分享圖片

註意!! consumer group的分區分配方案是在客戶端執行的!Kafka將這個權利下放給客戶端主要是因為這樣做可以有更好的靈活性。比如這種機制下我可以實現類似於Hadoop那樣的機架感知(rack-aware)分配方案,即為consumer挑選同一個機架下的分區數據,減少網絡傳輸的開銷。Kafka默認為你提供了兩種分配策略:range和round-robin。由於這不是本文的重點,這裏就不再詳細展開了,你只需要記住你可以覆蓋consumer的參數:partition.assignment.strategy來實現自己分配策略就好了。

4.10 consumer group狀態機

和很多kafka組件一樣,group也做了個狀態機來表明組狀態的流轉。coordinator根據這個狀態機會對consumer group做不同的處理,如下圖所示(完全是根據代碼註釋手動畫的,多見諒吧)

技術分享圖片

簡單說明下圖中的各個狀態:

  • Dead:組內已經沒有任何成員的最終狀態,組的元數據也已經被coordinator移除了。這種狀態響應各種請求都是一個response: UNKNOWN_MEMBER_ID
  • Empty:組內無成員,但是位移信息還沒有過期。這種狀態只能響應JoinGroup請求
  • PreparingRebalance:組準備開啟新的rebalance,等待成員加入
  • AwaitingSync:正在等待leader consumer將分配方案傳給各個成員
  • Stable:rebalance完成!可以開始消費了~

至於各個狀態之間的流程條件以及action,這裏就不具體展開了。

三、rebalance場景剖析

上面詳細闡述了consumer group是如何執行rebalance的,可能依然有些雲裏霧裏。這部分對其中的三個重要的場景做詳盡的時序展開,進一步加深對於consumer group內部原理的理解。由於圖比較直觀,所有的描述都將以圖的方式給出,不做過多的文字化描述了。

1. 新成員加入組(member join)

技術分享圖片

2. 組成員崩潰(member failure)

前面說過了,組成員崩潰和組成員主動離開是兩個不同的場景。因為在崩潰時成員並不會主動地告知coordinator此事,coordinator有可能需要一個完整的session.timeout周期才能檢測到這種崩潰,這必然會造成consumer的滯後。可以說離開組是主動地發起rebalance;而崩潰則是被動地發起rebalance。okay,直接上圖:

技術分享圖片

3. 組成員主動離組(member leave group)

技術分享圖片

4. 提交位移(member commit offset)

技術分享圖片

總結一下,本文著重討論了一下新版本的consumer group的內部設計原理,特別是consumer group與coordinator之間的交互過程,希望對各位有所幫助。

Kafka設計解析(十三)Kafka消費組(consumer group)