1. 程式人生 > 實用技巧 >kafka學習筆記(七)kafka的狀態機模組

kafka學習筆記(七)kafka的狀態機模組

概述

這一篇隨筆介紹kafka的狀態機模組,Kafka 原始碼中有很多狀態機和管理器,比如之前我們學過的 Controller 通道管理器 ControllerChannelManager、處理 Controller 事件的 ControllerEventManager,等等。這些管理器和狀態機,大多與各自的“宿主”元件關係密切,可以說是大小不同、功能各異。就比如 Controller 的這兩個管理器,必須要與 Controller 元件緊耦合在一起才能實現各自的功能。不過,Kafka 中還是有一些狀態機和管理器具有相對獨立的功能框架,不嚴重依賴使用方,也就是我在這個模組為你精選的 TopicDeletionManager(主題刪除管理器)、ReplicaStateMachine(副本狀態機)和 PartitionStateMachine(分割槽狀態機)。TopicDeletionManager:負責對指定 Kafka 主題執行刪除操作,清除待刪除主題在叢集上的各類“痕跡”。

ReplicaStateMachine:負責定義 Kafka 副本狀態、合法的狀態轉換,以及管理狀態之間的轉換。

PartitionStateMachine:負責定義 Kafka 分割槽狀態、合法的狀態轉換,以及管理狀態之間的轉換。

TopicDeletionManager

TopicDeletionManager.scala 這個原始檔,包括 3 個部分。

DeletionClient 介面:負責實現刪除主題以及後續的動作,比如更新元資料等。這個接口裡定義了 4 個方法,分別是 deleteTopic、deleteTopicDeletions、mutePartitionModifications 和 sendMetadataUpdate。我們後面再詳細學習它們的程式碼。

ControllerDeletionClient 類:實現 DeletionClient 介面的類,分別實現了剛剛說到的那 4 個方法。

TopicDeletionManager 類:主題刪除管理器類,定義了若干個方法維護主題刪除前後叢集狀態的正確性。比如,什麼時候才能刪除主題、什麼時候主題不能被刪除、主題刪除過程中要規避哪些操作,等等。

DeletionClient 介面及其實現

DeletionClient 介面定義的方法用於刪除主題,並將刪除主題這件事兒同步給其他 Broker。目前,DeletionClient 這個介面只有一個實現類,即 ControllerDeletionClient。我們看下這個實現類的程式碼:

 1 class ControllerDeletionClient(controller: KafkaController, zkClient: KafkaZkClient) extends DeletionClient {
 2   // 刪除給定主題
 3   override def deleteTopic(topic: String, epochZkVersion: Int): Unit = {
 4     // 刪除/brokers/topics/<topic>節點
 5     zkClient.deleteTopicZNode(topic, epochZkVersion)
 6     // 刪除/config/topics/<topic>節點
 7     zkClient.deleteTopicConfigs(Seq(topic), epochZkVersion)
 8     // 刪除/admin/delete_topics/<topic>節點
 9     zkClient.deleteTopicDeletions(Seq(topic), epochZkVersion)
10   }
11   // 刪除/admin/delete_topics下的給定topic子節點
12   override def deleteTopicDeletions(topics: Seq[String], epochZkVersion: Int): Unit = {
13     zkClient.deleteTopicDeletions(topics, epochZkVersion)
14   }
15   // 取消/brokers/topics/<topic>節點資料變更的監聽
16   override def mutePartitionModifications(topic: String): Unit = {
17     controller.unregisterPartitionModificationsHandlers(Seq(topic))
18   }
19   // 向叢集Broker傳送指定分割槽的元資料更新請求
20   override def sendMetadataUpdate(partitions: Set[TopicPartition]): Unit = {
21     controller.sendUpdateMetadataRequest(
22       controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
23   }
24 }

這個類的建構函式接收兩個欄位。同時,由於是 DeletionClient 介面的實現類,因而該類實現了 DeletionClient 介面定義的四個方法。先來說建構函式的兩個欄位:KafkaController 例項和 KafkaZkClient 例項。KafkaController 例項,我們已經很熟悉了,就是 Controller 元件物件;而 KafkaZkClient 例項,就是 Kafka 與 ZooKeeper 互動的客戶端物件。接下來,我們再結合程式碼看下 DeletionClient 介面實現類 ControllerDeletionClient 定義的 4 個方法。我來簡單介紹下這 4 個方法大致是做什麼的。

1.deleteTopic它用於刪除主題在 ZooKeeper 上的所有“痕跡”。具體方法是,分別呼叫 KafkaZkClient 的 3 個方法去刪除 ZooKeeper 下 /brokers/topics/節點、/config/topics/節點和 /admin/delete_topics/節點。2.deleteTopicDeletions它用於刪除 ZooKeeper 下待刪除主題的標記節點。具體方法是,呼叫 KafkaZkClient 的 deleteTopicDeletions 方法,批量刪除一組主題在 /admin/delete_topics 下的子節點。注意,deleteTopicDeletions 這個方法名結尾的 Deletions,表示 /admin/delete_topics 下的子節點。所以,deleteTopic 是刪除主題,deleteTopicDeletions 是刪除 /admin/delete_topics 下的對應子節點。到這裡,我們還要注意的一點是,這兩個方法裡都有一個 epochZkVersion 的欄位,代表期望的 Controller Epoch 版本號。如果你使用一箇舊的 Epoch 版本號執行這些方法,ZooKeeper 會拒絕,因為和它自己儲存的版本號不匹配。如果一個 Controller 的 Epoch 值小於 ZooKeeper 中儲存的,那麼這個 Controller 很可能是已經過期的 Controller。這種 Controller 就被稱為 Zombie Controller。epochZkVersion 欄位的作用,就是隔離 Zombie Controller 傳送的操作。

3.mutePartitionModifications它的作用是遮蔽主題分割槽資料變更監聽器,具體實現原理其實就是取消 /brokers/topics/節點資料變更的監聽。這樣當該主題的分割槽資料發生變更後,由於對應的 ZooKeeper 監聽器已經被取消了,因此不會觸發 Controller 相應的處理邏輯。那為什麼要取消這個監聽器呢?其實,主要是為了避免操作之間的相互干擾。設想下,使用者 A 發起了主題刪除,而同時使用者 B 為這個主題新增了分割槽。此時,這兩個操作就會相互衝突,如果允許 Controller 同時處理這兩個操作,勢必會造成邏輯上的混亂以及狀態的不一致。為了應對這種情況,在移除主題副本和分割槽物件前,程式碼要先執行這個方法,以確保不再響應使用者對該主題的其他操作。mutePartitionModifications 方法的實現原理很簡單,它會呼叫 unregisterPartitionModificationsHandlers,並接著呼叫 KafkaZkClient 的 unregisterZNodeChangeHandler 方法,取消 ZooKeeper 上對給定主題的分割槽節點資料變更的監聽。

4.sendMetadataUpdate它會呼叫 KafkaController 的 sendUpdateMetadataRequest 方法,給叢集所有 Broker 傳送更新請求,告訴它們不要再為已刪除主題的分割槽提供服務。

TopicDeletionManager 定義及初始化

有了這些鋪墊,我們再來看主題刪除管理器的主要入口:TopicDeletionManager 類。這個類的定義程式碼,如下:

 1 class TopicDeletionManager(
 2   // KafkaConfig類,儲存Broker端引數
 3   config: KafkaConfig, 
 4   // 叢集元資料
 5   controllerContext: ControllerContext,
 6   // 副本狀態機,用於設定副本狀態
 7   replicaStateMachine: ReplicaStateMachine,
 8   // 分割槽狀態機,用於設定分割槽狀態
 9   partitionStateMachine: PartitionStateMachine,
10   // DeletionClient介面,實現主題刪除
11   client: DeletionClient) extends Logging {
12   this.logIdent = s"[Topic Deletion Manager ${config.brokerId}] "
13   // 是否允許刪除主題
14   val isDeleteTopicEnabled: Boolean = config.deleteTopicEnable
15   ......
16 }

該類主要的屬性有 6 個,我們分別來看看。

config:KafkaConfig 例項,可以用作獲取 Broker 端引數 delete.topic.enable 的值。該引數用於控制是否允許刪除主題,預設值是 true,即 Kafka 預設允許使用者刪除主題。

controllerContext:Controller 端儲存的元資料資訊。刪除主題必然要變更叢集元資料資訊,因此 TopicDeletionManager 需要用到 controllerContext 的方法,去更新它儲存的資料。

replicaStateMachine 和 partitionStateMachine:副本狀態機和分割槽狀態機。它們各自負責副本和分割槽的狀態轉換,以保持副本物件和分割槽物件在叢集上的一致性狀態。這兩個狀態機是後面兩講的重要知識點。

client:前面介紹的 DeletionClient 介面。TopicDeletionManager 通過該介面執行 ZooKeeper 上節點的相應更新。

isDeleteTopicEnabled:表明主題是否允許被刪除。它是 Broker 端引數 delete.topic.enable 的值,預設是 true,表示 Kafka 允許刪除主題。原始碼中大量使用這個欄位判斷主題的可刪除性。前面的 config 引數的主要目的就是設定這個欄位的值。被設定之後,config 就不再被原始碼使用了。

TopicDeletionManager 重要方法

最重要的當屬 resumeDeletions 方法。它是重啟主題刪除操作過程的方法。主題因為某些事件可能一時無法完成刪除,比如主題分割槽正在進行副本重分配等。一旦這些事件完成後,主題重新具備可刪除的資格。此時,程式碼就需要呼叫 resumeDeletions 重啟刪除操作。這個方法之所以很重要,是因為它還串聯了 TopicDeletionManager 類的很多方法,如 completeDeleteTopic 和 onTopicDeletion 等。因此,你完全可以從 resumeDeletions 方法開始,逐漸深入到其他方法程式碼的學習。那我們就先學習 resumeDeletions 的實現程式碼吧。

 1 private def resumeDeletions(): Unit = {
 2   // 從元資料快取中獲取要刪除的主題列表
 3   val topicsQueuedForDeletion = Set.empty[String] ++ controllerContext.topicsToBeDeleted
 4   // 待重試主題列表
 5   val topicsEligibleForRetry = mutable.Set.empty[String]
 6   // 待刪除主題列表
 7   val topicsEligibleForDeletion = mutable.Set.empty[String]
 8   if (topicsQueuedForDeletion.nonEmpty)
 9     info(s"Handling deletion for topics ${topicsQueuedForDeletion.mkString(",")}")
10   // 遍歷每個待刪除主題
11   topicsQueuedForDeletion.foreach { topic =>
12     // 如果該主題所有副本已經是ReplicaDeletionSuccessful狀態
13     // 即該主題已經被刪除  
14     if (controllerContext.areAllReplicasInState(topic, ReplicaDeletionSuccessful)) {
15       // 呼叫completeDeleteTopic方法完成後續操作即可
16       completeDeleteTopic(topic)
17       info(s"Deletion of topic $topic successfully completed")
18      // 如果主題刪除尚未開始並且主題當前無法執行刪除的話
19     } else if (!controllerContext.isAnyReplicaInState(topic, ReplicaDeletionStarted)) {
20       if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
21         // 把該主題加到待重試主題列表中用於後續重試
22         topicsEligibleForRetry += topic
23       }
24     }
25     // 如果該主題能夠被刪除
26     if (isTopicEligibleForDeletion(topic)) {
27       info(s"Deletion of topic $topic (re)started")
28       topicsEligibleForDeletion += topic
29     }
30   }
31   // 重試待重試主題列表中的主題刪除操作
32   if (topicsEligibleForRetry.nonEmpty) {
33     retryDeletionForIneligibleReplicas(topicsEligibleForRetry)
34   }
35   // 呼叫onTopicDeletion方法,對待刪除主題列表中的主題執行刪除操作
36   if (topicsEligibleForDeletion.nonEmpty) {
37     onTopicDeletion(topicsEligibleForDeletion)
38   }
39 }

通過程式碼我們發現,這個方法首先從元資料快取中獲取要刪除的主題列表,之後定義了兩個空的主題列表,分別儲存待重試刪除主題和待刪除主題。然後,程式碼遍歷每個要刪除的主題,去看它所有副本的狀態。如果副本狀態都是 ReplicaDeletionSuccessful,就表明該主題已經被成功刪除,此時,再呼叫 completeDeleteTopic 方法,完成後續的操作就可以了。對於那些刪除操作尚未開始,並且暫時無法執行刪除的主題,原始碼會把這類主題加到待重試主題列表中,用於後續重試;如果主題是能夠被刪除的,就將其加入到待刪除列表中。最後,該方法呼叫 retryDeletionForIneligibleReplicas 方法,來重試待重試主題列表中的主題刪除操作。對於待刪除主題列表中的主題則呼叫 onTopicDeletion 刪除之。值得一提的是,retryDeletionForIneligibleReplicas 方法用於重試主題刪除。這是通過將對應主題副本的狀態,從 ReplicaDeletionIneligible 變更到 OfflineReplica 來完成的。這樣,後續再次呼叫 resumeDeletions 時,會嘗試重新刪除主題。

總結:在主題刪除過程中,Kafka 會調整叢集中三個地方的資料:ZooKeeper、元資料快取和磁碟日誌檔案。刪除主題時,ZooKeeper 上與該主題相關的所有 ZNode 節點必須被清除;Controller 端元資料快取中的相關項,也必須要被處理,並且要被同步到叢集的其他 Broker 上;而磁碟日誌檔案,更是要清理的首要目標。這三個地方必須要統一處理,就好似我們常說的原子性操作一樣。

ReplicaStateMachine

我們看下 ReplicaStateMachine 及其子類 ZKReplicaStateMachine 在程式碼中是如何定義的,請看這兩個程式碼片段:

 1 // ReplicaStateMachine抽象類定義
 2 abstract class ReplicaStateMachine(controllerContext: ControllerContext) extends Logging {
 3   ......
 4 }
 5 
 6 // ZkReplicaStateMachine具體實現類定義
 7 class ZkReplicaStateMachine(config: KafkaConfig, 
 8   stateChangeLogger: StateChangeLogger,
 9   controllerContext: ControllerContext,
10   zkClient: KafkaZkClient,
11   controllerBrokerRequestBatch: ControllerBrokerRequestBatch) 
12   extends ReplicaStateMachine(controllerContext) with Logging {
13   ......
14 }

KafkaController 物件在構建的時候,就會初始化一個 ZkReplicaStateMachine 例項,如下列程式碼所示:

1 val replicaStateMachine: ReplicaStateMachine = new   
2   ZkReplicaStateMachine(config, stateChangeLogger, 
3     controllerContext, zkClient,
4     new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))

你可能會問:“如果一個 Broker 沒有被選舉為 Controller,它也會構建 KafkaController 物件例項嗎?”沒錯!所有 Broker 在啟動時,都會建立 KafkaController 例項,因而也會建立 ZKReplicaStateMachine 例項。每個 Broker 都會建立這些例項,並不代表每個 Broker 都會啟動副本狀態機。事實上,只有在 Controller 所在的 Broker 上,副本狀態機才會被啟動。具體的啟動程式碼位於 KafkaController 的 onControllerFailover 方法。

副本狀態及狀態管理流程

副本狀態機一旦被啟動,就意味著它要行使它最重要的職責了:管理副本狀態的轉換。不過,在學習如何管理狀態之前,我們必須要弄明白,當前都有哪些狀態,以及它們的含義分別是什麼。原始碼中的 ReplicaState 定義了 7 種副本狀態。NewReplica:副本被建立之後所處的狀態。OnlineReplica:副本正常提供服務時所處的狀態。OfflineReplica:副本服務下線時所處的狀態。ReplicaDeletionStarted:副本被刪除時所處的狀態。ReplicaDeletionSuccessful:副本被成功刪除後所處的狀態。ReplicaDeletionIneligible:開啟副本刪除,但副本暫時無法被刪除時所處的狀態。NonExistentReplica:副本從副本狀態機被移除前所處的狀態。具體到程式碼而言,ReplicaState 介面及其實現物件定義了每種狀態的序號,以及合法的前置狀態。我以 OnlineReplica 程式碼為例進行說明:

 1 // ReplicaState介面
 2 sealed trait ReplicaState {
 3   def state: Byte
 4   def validPreviousStates: Set[ReplicaState] // 定義合法的前置狀態
 5 }
 6 
 7 // OnlineReplica狀態
 8 case object OnlineReplica extends ReplicaState {
 9   val state: Byte = 2
10   val validPreviousStates: Set[ReplicaState] = Set(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible)
11 }

狀態轉換方法定義

在詳細介紹 handleStateChanges 方法前,我稍微花點時間,給你簡單介紹下其他 7 個方法都是做什麼用的。就像前面說過的,這些方法主要是起輔助的作用。只有清楚了這些方法的用途,你才能更好地理解 handleStateChanges 的實現邏輯。logFailedStateChange:僅僅是記錄一條錯誤日誌,表明執行了一次無效的狀態變更。

logInvalidTransition:同樣也是記錄錯誤之用,記錄一次非法的狀態轉換。

logSuccessfulTransition:記錄一次成功的狀態轉換操作。

getTopicPartitionStatesFromZk:從 ZooKeeper 中獲取指定分割槽的狀態資訊,包括每個分割槽的 Leader 副本、ISR 集合等資料。

doRemoveReplicasFromIsr:把給定的副本物件從給定分割槽 ISR 中移除。

removeReplicasFromIsr:呼叫 doRemoveReplicasFromIsr 方法,實現將給定的副本物件從給定分割槽 ISR 中移除的功能。

doHandleStateChanges:執行狀態變更和轉換操作的主力方法。接下來,我們會詳細學習它的原始碼部分。

handleStateChanges 方法

handleStateChange 方法的作用是處理狀態的變更,是對外提供狀態轉換操作的入口方法。其方法如下:

 1 override def handleStateChanges(
 2   replicas: Seq[PartitionAndReplica], 
 3   targetState: ReplicaState): Unit = {
 4   if (replicas.nonEmpty) {
 5     try {
 6       // 清空Controller待發送請求集合
 7       controllerBrokerRequestBatch.newBatch()
 8       // 將所有副本物件按照Broker進行分組,依次執行狀態轉換操作
 9       replicas.groupBy(_.replica).foreach {
10         case (replicaId, replicas) =>
11           doHandleStateChanges(replicaId, replicas, targetState)
12       }
13       // 傳送對應的Controller請求給Broker
14       controllerBrokerRequestBatch.sendRequestsToBrokers(
15         controllerContext.epoch)
16     } catch {
17       // 如果Controller易主,則記錄錯誤日誌然後丟擲異常
18       case e: ControllerMovedException =>
19         error(s"Controller moved to another broker when moving some replicas to $targetState state", e)
20         throw e
21       case e: Throwable => error(s"Error while moving some replicas to $targetState state", e)
22     }
23   }
24 }

程式碼邏輯總體上分為兩步:第 1 步是呼叫 doHandleStateChanges 方法執行真正的副本狀態轉換;第 2 步是給叢集中的相應 Broker 批量傳送請求。在執行第 1 步的時候,它會將 replicas 按照 Broker ID 進行分組。舉個例子,如果我們使用 < 主題名,分割槽號,副本 Broker ID> 表示副本物件,假設 replicas 為集合(<test, 0,="" 0="">,<test, 0,="" 1="">,<test, 1,="" 0="">,<test, 1,="" 1="">),那麼,在呼叫 doHandleStateChanges 方法前,程式碼會將 replicas 按照 Broker ID 進行分組,即變成:Map(0 -> Set(<test, 0,="" 0="">,<test, 1,="" 0="">),1 -> Set(<test, 0,="" 1="">,<test, 1,="" 1="">))。待這些都做完之後,程式碼開始呼叫 doHandleStateChanges 方法,執行狀態轉換操作。這個方法看著很長,其實都是不同的程式碼分支。

我們可以發現,程式碼的第 1 步,會嘗試獲取給定副本物件在 Controller 端元資料快取中的當前狀態,如果沒有儲存某個副本物件的狀態,程式碼會將其初始化為 NonExistentReplica 狀態。第 2 步,程式碼根據不同 ReplicaState 中定義的合法前置狀態集合以及傳入的目標狀態(targetState),將給定的副本物件集合劃分成兩部分:能夠合法轉換的副本物件集合,以及執行非法狀態轉換的副本物件集合。doHandleStateChanges 方法會為後者中的每個副本物件記錄一條錯誤日誌。第 3 步,程式碼攜帶能夠執行合法轉換的副本物件集合,進入到不同的程式碼分支。

由於當前 Kafka 為副本定義了 7 類狀態,因此,這裡的程式碼分支總共有 7 路。我挑選幾路最常見的狀態轉換路徑詳細說明下,包括副本被建立時被轉換到 NewReplica 狀態,副本正常工作時被轉換到 OnlineReplica 狀態,副本停止服務後被轉換到 OfflineReplica 狀態。至於剩下的記錄程式碼,你可以在課後自行學習下,它們的轉換操作原理大致是相同的。

第 1 路:轉換到 NewReplica 狀態

首先,我們先來看第 1 路,即目標狀態是 NewReplica 的程式碼。程式碼如下:

 1 case NewReplica =>
 2   // 遍歷所有能夠執行轉換的副本物件
 3   validReplicas.foreach { replica =>
 4     // 獲取該副本物件的分割槽物件,即<主題名,分割槽號>資料
 5     val partition = replica.topicPartition
 6     // 獲取副本物件的當前狀態
 7     val currentState = controllerContext.replicaState(replica)
 8     // 嘗試從元資料快取中獲取該分割槽當前資訊
 9     // 包括Leader是誰、ISR都有哪些副本等資料
10     controllerContext.partitionLeadershipInfo.get(partition) match {
11       // 如果成功拿到分割槽資料資訊
12       case Some(leaderIsrAndControllerEpoch) =>
13         // 如果該副本是Leader副本
14         if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) {
15           val exception = new StateChangeFailedException(s"Replica $replicaId for partition $partition cannot be moved to NewReplica state as it is being requested to become leader")
16           // 記錄錯誤日誌。Leader副本不能被設定成NewReplica狀態
17           logFailedStateChange(replica, currentState, OfflineReplica, exception)
18         // 否則,給該副本所在的Broker傳送LeaderAndIsrRequest
19         // 向它同步該分割槽的資料, 之後給叢集當前所有Broker傳送
20         // UpdateMetadataRequest通知它們該分割槽資料發生變更
21         } else {
22           controllerBrokerRequestBatch
23             .addLeaderAndIsrRequestForBrokers(
24               Seq(replicaId),
25               replica.topicPartition,
26               leaderIsrAndControllerEpoch,
27               controllerContext.partitionFullReplicaAssignment(
28                 replica.topicPartition),
29               isNew = true)
30           if (traceEnabled)
31             logSuccessfulTransition(
32               stateLogger, replicaId, 
33               partition, currentState, NewReplica)
34           // 更新元資料快取中該副本物件的當前狀態為NewReplica
35           controllerContext.putReplicaState(replica, NewReplica)
36         }
37       // 如果沒有相應資料
38       case None =>
39         if (traceEnabled)
40           logSuccessfulTransition(
41             stateLogger, replicaId, 
42             partition, currentState, NewReplica)
43         // 僅僅更新元資料快取中該副本物件的當前狀態為NewReplica即可
44         controllerContext.putReplicaState(replica, NewReplica)
45     }
46   }

這一路主要做的事情是,嘗試從元資料快取中,獲取這些副本物件的分割槽資訊資料,包括分割槽的 Leader 副本在哪個 Broker 上、ISR 中都有哪些副本,等等。如果找不到對應的分割槽資料,就直接把副本狀態更新為 NewReplica。否則,程式碼就需要給該副本所在的 Broker 傳送請求,讓它知道該分割槽的資訊。同時,程式碼還要給叢集所有執行中的 Broker 傳送請求,讓它們感知到新副本的加入。

第 2 路:轉換到 OnlineReplica 狀態

下面我們來看第 2 路,即轉換副本物件到 OnlineReplica。剛剛我說過,這是副本物件正常工作時所處的狀態。我們來看下要轉換到這個狀態,原始碼都做了哪些事情:

 1 case OnlineReplica =>
 2   validReplicas.foreach { replica =>
 3     // 獲取副本所在分割槽
 4     val partition = replica.topicPartition
 5     // 獲取副本當前狀態
 6     val currentState = controllerContext.replicaState(replica)
 7     currentState match {
 8       // 如果當前狀態是NewReplica
 9       case NewReplica =>
10         // 從元資料快取中拿到分割槽副本列表
11         val assignment = controllerContext
12           .partitionFullReplicaAssignment(partition)
13         // 如果副本列表不包含當前副本,視為異常情況
14         if (!assignment.replicas.contains(replicaId)) {
15           error(s"Adding replica ($replicaId) that is not part of the assignment $assignment")
16           // 將該副本加入到副本列表中,並更新元資料快取中該分割槽的副本列表
17           val newAssignment = assignment.copy(
18             replicas = assignment.replicas :+ replicaId)
19           controllerContext.updatePartitionFullReplicaAssignment(
20             partition, newAssignment)
21         }
22       // 如果當前狀態是其他狀態
23       case _ =>
24         // 嘗試獲取該分割槽當前資訊資料
25         controllerContext.partitionLeadershipInfo
26           .get(partition) match {
27           // 如果存在分割槽資訊
28           // 向該副本物件所在Broker傳送請求,令其同步該分割槽資料
29           case Some(leaderIsrAndControllerEpoch) =>
30             controllerBrokerRequestBatch
31               .addLeaderAndIsrRequestForBrokers(Seq(replicaId),
32                 replica.topicPartition,
33                 leaderIsrAndControllerEpoch,
34                 controllerContext
35                   .partitionFullReplicaAssignment(partition), 
36                 isNew = false)
37           case None =>
38         }
39     }
40     if (traceEnabled)
41       logSuccessfulTransition(
42         stateLogger, replicaId, 
43         partition, currentState, OnlineReplica)
44     // 將該副本物件設定成OnlineReplica狀態
45     controllerContext.putReplicaState(replica, OnlineReplica)
46   }

程式碼依然會對副本物件進行遍歷,並依次執行下面的幾個步驟。

第 1 步,獲取元資料中該副本所屬的分割槽物件,以及該副本的當前狀態。

第 2 步,檢視當前狀態是否是 NewReplica。如果是,則獲取分割槽的副本列表,並判斷該副本是否在當前的副本列表中,假如不在,就記錄錯誤日誌,並更新元資料中的副本列表;如果狀態不是 NewReplica,就說明,這是一個已存在的副本物件,那麼,原始碼會獲取對應分割槽的詳細資料,然後向該副本物件所在的 Broker 傳送 LeaderAndIsrRequest 請求,令其同步獲知,並儲存該分割槽資料。

第 3 步,將該副本物件狀態變更為 OnlineReplica。至此,該副本處於正常工作狀態。

第 3 路:轉換到 OfflineReplica 狀態

最後,再來看下第 3 路分支。這路分支要將副本物件的狀態轉換成 OfflineReplica。我依然以程式碼註釋的方式給出主要的程式碼邏輯:

 1 case OfflineReplica =>
 2   validReplicas.foreach { replica =>
 3     // 向副本所在Broker傳送StopReplicaRequest請求,停止對應副本
 4     controllerBrokerRequestBatch
 5       .addStopReplicaRequestForBrokers(Seq(replicaId), 
 6         replica.topicPartition, deletePartition = false)
 7   }
 8   // 將副本物件集合劃分成有Leader資訊的副本集合和無Leader資訊的副本集合
 9   val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = 
10     validReplicas.partition { replica =>
11       controllerContext.partitionLeadershipInfo
12         .contains(replica.topicPartition)
13     }
14   // 對於有Leader資訊的副本集合而言從,
15   // 它們對應的所有分割槽中移除該副本物件並更新ZooKeeper節點
16   val updatedLeaderIsrAndControllerEpochs = 
17     removeReplicasFromIsr(replicaId,  
18       replicasWithLeadershipInfo.map(_.topicPartition))
19   // 遍歷每個更新過的分割槽資訊
20   updatedLeaderIsrAndControllerEpochs.foreach {
21     case (partition, leaderIsrAndControllerEpoch) =>
22       stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")
23       // 如果分割槽對應主題並未被刪除
24       if (!controllerContext.isTopicQueuedUpForDeletion(
25         partition.topic)) {
26         // 獲取該分割槽除給定副本以外的其他副本所在的Broker  
27         val recipients = controllerContext
28           .partitionReplicaAssignment(partition)
29           .filterNot(_ == replicaId)
30         // 向這些Broker傳送請求更新該分割槽更新過的分割槽LeaderAndIsr資料
31         controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(
32           recipients,
33           partition,
34           leaderIsrAndControllerEpoch,
35           controllerContext.partitionFullReplicaAssignment(partition), 
36           isNew = false)
37       }
38       val replica = PartitionAndReplica(partition, replicaId)
39       val currentState = controllerContext.replicaState(replica)
40       if (traceEnabled)
41         logSuccessfulTransition(stateLogger, replicaId, 
42           partition, currentState, OfflineReplica)
43       // 設定該分割槽給定副本的狀態為OfflineReplica
44       controllerContext.putReplicaState(replica, OfflineReplica)
45   }
46   // 遍歷無Leader資訊的所有副本物件
47   replicasWithoutLeadershipInfo.foreach { replica =>
48     val currentState = controllerContext.replicaState(replica)
49     if (traceEnabled)
50       logSuccessfulTransition(stateLogger, replicaId, 
51         replica.topicPartition, currentState, OfflineReplica)
52      // 向叢集所有Broker傳送請求,更新對應分割槽的元資料
53     controllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(
54       controllerContext.liveOrShuttingDownBrokerIds.toSeq,
55       Set(replica.topicPartition))
56     // 設定該分割槽給定副本的狀態為OfflineReplica
57     controllerContext.putReplicaState(replica, OfflineReplica)
58   }

首先,程式碼會給所有符合狀態轉換的副本所在的 Broker,傳送 StopReplicaRequest 請求,顯式地告訴這些 Broker 停掉其上的對應副本。Kafka 的副本管理器元件(ReplicaManager)負責處理這個邏輯。後面我們會用兩節課的時間專門討論 ReplicaManager 的實現,這裡你只需要瞭解,StopReplica 請求被髮送出去之後,這些 Broker 上對應的副本就停止工作了。其次,程式碼根據分割槽是否儲存了 Leader 資訊,將副本集合劃分成兩個子集:有 Leader 副本集合和無 Leader 副本集合。有無 Leader 資訊並不僅僅包含 Leader,還有 ISR 和 controllerEpoch 等資料。不過,你大致可以認為,副本集合是根據有無 Leader 進行劃分的。接下來,原始碼會遍歷有 Leader 的子集合,向這些副本所在的 Broker 傳送 LeaderAndIsrRequest 請求,去更新停止副本操作之後的分割槽資訊,再把這些分割槽狀態設定為 OfflineReplica。最後,原始碼遍歷無 Leader 的子集合,執行與上一步非常類似的操作。只不過,對於無 Leader 而言,因為我們沒有執行任何 Leader 選舉操作,所以給這些副本所在的 Broker 傳送的就不是 LeaderAndIsrRequest 請求了,而是 UpdateMetadataRequest 請求,顯式去告知它們更新對應分割槽的元資料即可,然後再把副本狀態設定為 OfflineReplica。從這段描述中,我們可以知道,把副本狀態變更為 OfflineReplica 的主要邏輯,其實就是停止對應副本 + 更新遠端 Broker 元資料的操作。

PartitionStateMachine

程式碼總共有 5 大部分。PartitionStateMachine:分割槽狀態機抽象類。它定義了諸如 startup、shutdown 這樣的公共方法,同時也給出了處理分割槽狀態轉換入口方法 handleStateChanges 的簽名。ZkPartitionStateMachine:PartitionStateMachine 唯一的繼承子類。它實現了分割槽狀態機的主體邏輯功能。和 ZkReplicaStateMachine 類似,ZkPartitionStateMachine 重寫了父類的 handleStateChanges 方法,並配以私有的 doHandleStateChanges 方法,共同實現分割槽狀態轉換的操作。PartitionState 介面及其實現物件:定義 4 類分割槽狀態,分別是 NewPartition、OnlinePartition、OfflinePartition 和 NonExistentPartition。除此之外,還定義了它們之間的流轉關係。PartitionLeaderElectionStrategy 介面及其實現物件:定義 4 類分割槽 Leader 選舉策略。你可以認為它們是發生 Leader 選舉的 4 種場景。PartitionLeaderElectionAlgorithms:分割槽 Leader 選舉的演算法實現。既然定義了 4 類選舉策略,就一定有相應的實現程式碼,PartitionLeaderElectionAlgorithms 就提供了這 4 類選舉策略的實現程式碼。

每個 Broker 啟動時,都會建立對應的分割槽狀態機和副本狀態機例項,但只有 Controller 所在的 Broker 才會啟動它們。如果 Controller 變更到其他 Broker,老 Controller 所在的 Broker 要呼叫這些狀態機的 shutdown 方法關閉它們,新 Controller 所在的 Broker 呼叫狀態機的 startup 方法啟動它們。

分割槽狀態

既然 ZkPartitionStateMachine 是管理分割槽狀態轉換的,那麼,我們至少要知道分割槽都有哪些狀態,以及 Kafka 規定的轉換規則是什麼。這就是 PartitionState 介面及其實現物件做的事情。和 ReplicaState 類似,PartitionState 定義了分割槽的狀態空間以及流轉規則。我以 OnlinePartition 狀態為例,說明下程式碼是如何實現流轉的:

1 sealed trait PartitionState {
2   def state: Byte // 狀態序號,無實際用途
3   def validPreviousStates: Set[PartitionState] // 合法前置狀態集合
4 }
5 
6 case object OnlinePartition extends PartitionState {
7   val state: Byte = 1
8   val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
9 }

如程式碼所示,每個 PartitionState 都定義了名為 validPreviousStates 的集合,也就是每個狀態對應的合法前置狀態集。對於 OnlinePartition 而言,它的合法前置狀態集包括 NewPartition、OnlinePartition 和 OfflinePartition。在 Kafka 中,從合法狀態集以外的狀態向目標狀態進行轉換,將被視為非法操作。目前,Kafka 為分割槽定義了 4 類狀態。

NewPartition:分割槽被建立後被設定成這個狀態,表明它是一個全新的分割槽物件。處於這個狀態的分割槽,被 Kafka 認為是“未初始化”,因此,不能選舉 Leader。

OnlinePartition:分割槽正式提供服務時所處的狀態。

OfflinePartition:分割槽下線後所處的狀態。

NonExistentPartition:分割槽被刪除,並且從分割槽狀態機移除後所處的狀態。

分割槽 Leader 選舉的場景及方法

剛剛我們說了兩個狀態機的相同點,接下來,我們要學習的分割槽 Leader 選舉,可以說是 PartitionStateMachine 特有的功能了。每個分割槽都必須選舉出 Leader 才能正常提供服務,因此,對於分割槽而言,Leader 副本是非常重要的角色。既然這樣,我們就必須要了解 Leader 選舉什麼流程,以及在程式碼中是如何實現的。我們重點學習下選舉策略以及具體的實現方法程式碼。

PartitionLeaderElectionStrategy

先明確下分割槽 Leader 選舉的含義,其實很簡單,就是為 Kafka 主題的某個分割槽推選 Leader 副本。那麼,Kafka 定義了哪幾種推選策略,或者說,在什麼情況下需要執行 Leader 選舉呢?這就是 PartitionLeaderElectionStrategy 介面要做的事情,請看下面的程式碼:

 1 // 分割槽Leader選舉策略介面
 2 sealed trait PartitionLeaderElectionStrategy
 3 // 離線分割槽Leader選舉策略
 4 final case class OfflinePartitionLeaderElectionStrategy(
 5   allowUnclean: Boolean) extends PartitionLeaderElectionStrategy
 6 // 分割槽副本重分配Leader選舉策略  
 7 final case object ReassignPartitionLeaderElectionStrategy 
 8   extends PartitionLeaderElectionStrategy
 9 // 分割槽Preferred副本Leader選舉策略
10 final case object PreferredReplicaPartitionLeaderElectionStrategy 
11   extends PartitionLeaderElectionStrategy
12 // Broker Controlled關閉時Leader選舉策略
13 final case object ControlledShutdownPartitionLeaderElectionStrategy 
14   extends PartitionLeaderElectionStrategy

當前,分割槽 Leader 選舉有 4 類場景。

OfflinePartitionLeaderElectionStrategy:因為 Leader 副本下線而引發的分割槽 Leader 選舉。

ReassignPartitionLeaderElectionStrategy:因為執行分割槽副本重分配操作而引發的分割槽 Leader 選舉。

PreferredReplicaPartitionLeaderElectionStrategy:因為執行 Preferred 副本 Leader 選舉而引發的分割槽 Leader 選舉。

ControlledShutdownPartitionLeaderElectionStrategy:因為正常關閉 Broker 而引發的分割槽 Leader 選舉。

PartitionLeaderElectionAlgorithms

針對這 4 類場景,分割槽狀態機的 PartitionLeaderElectionAlgorithms 物件定義了 4 個方法,分別負責為每種場景選舉 Leader 副本,這 4 種方法是:

offlinePartitionLeaderElection;

reassignPartitionLeaderElection;

preferredReplicaPartitionLeaderElection;

controlledShutdownPartitionLeaderElection。

offlinePartitionLeaderElection 方法的邏輯是這 4 個方法中最複雜的,我們就先從它開始學起。

 1 def offlinePartitionLeaderElection(assignment: Seq[Int], 
 2   isr: Seq[Int], liveReplicas: Set[Int], 
 3   uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = {
 4   // 從當前分割槽副本列表中尋找首個處於存活狀態的ISR副本
 5   assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse {
 6     // 如果找不到滿足條件的副本,檢視是否允許Unclean Leader選舉
 7     // 即Broker端引數unclean.leader.election.enable是否等於true
 8     if (uncleanLeaderElectionEnabled) {
 9       // 選擇當前副本列表中的第一個存活副本作為Leader
10       val leaderOpt = assignment.find(liveReplicas.contains)
11       if (leaderOpt.isDefined)
12         controllerContext.stats.uncleanLeaderElectionRate.mark()
13       leaderOpt
14     } else {
15       None // 如果不允許Unclean Leader選舉,則返回None表示無法選舉Leader
16     }
17   }
18 }

處理分割槽狀態轉換的方法

掌握了剛剛的這些知識之後,現在,我們正式來看 PartitionStateMachine 的工作原理。

handleStateChanges

如果用一句話概括 handleStateChanges 的作用,應該這樣說:handleStateChanges 把 partitions 的狀態設定為 targetState,同時,還可能需要用 leaderElectionStrategy 策略為 partitions 選舉新的 Leader,最終將 partitions 的 Leader 資訊返回。其中,partitions 是待執行狀態變更的目標分割槽列表,targetState 是目標狀態,leaderElectionStrategy 是一個可選項,如果傳入了,就表示要執行 Leader 選舉。下面是 handleStateChanges 方法的完整程式碼,我以註釋的方式給出了主要的功能說明:

 1 override def handleStateChanges(
 2     partitions: Seq[TopicPartition],
 3     targetState: PartitionState,
 4     partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
 5   ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
 6     if (partitions.nonEmpty) {
 7       try {
 8         // 清空Controller待發送請求集合,準備本次請求傳送
 9         controllerBrokerRequestBatch.newBatch()
10         // 呼叫doHandleStateChanges方法執行真正的狀態變更邏輯
11         val result = doHandleStateChanges(
12           partitions,
13           targetState,
14           partitionLeaderElectionStrategyOpt
15         )
16         // Controller給相關Broker傳送請求通知狀態變化
17         controllerBrokerRequestBatch.sendRequestsToBrokers(
18           controllerContext.epoch)
19         // 返回狀態變更處理結果
20         result
21       } catch {
22         // 如果Controller易主,則記錄錯誤日誌,然後重新丟擲異常
23         // 上層程式碼會捕獲該異常並執行maybeResign方法執行卸任邏輯
24         case e: ControllerMovedException =>
25           error(s"Controller moved to another broker when moving some partitions to $targetState state", e)
26           throw e
27         // 如果是其他異常,記錄錯誤日誌,封裝錯誤返回
28         case e: Throwable =>
29           error(s"Error while moving some partitions to $targetState state", e)
30           partitions.iterator.map(_ -> Left(e)).toMap
31       }
32     } else { // 如果partitions為空,什麼都不用做
33       Map.empty
34     }
35   }

doHandleStateChanges

 1 private def doHandleStateChanges(
 2     partitions: Seq[TopicPartition],
 3     targetState: PartitionState,
 4     partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
 5   ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
 6     val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
 7     val traceEnabled = stateChangeLog.isTraceEnabled
 8     // 初始化新分割槽的狀態為NonExistentPartition
 9     partitions.foreach(partition => controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition))
10     // 找出要執行非法狀態轉換的分割槽,記錄錯誤日誌
11     val (validPartitions, invalidPartitions) = controllerContext.checkValidPartitionStateChange(partitions, targetState)
12     invalidPartitions.foreach(partition => logInvalidTransition(partition, targetState))
13     // 根據targetState進入到不同的case分支
14     targetState match {
15       ......
16     }
17 }

這個方法首先會做狀態初始化的工作,具體來說就是,在方法呼叫時,不在元資料快取中的所有分割槽的狀態,會被初始化為 NonExistentPartition。接著,檢查哪些分割槽執行的狀態轉換不合法,併為這些分割槽記錄相應的錯誤日誌。之後,程式碼攜合法狀態轉換的分割槽列表進入到 case 分支。由於分割槽狀態只有 4 個,因此,它的 case 分支程式碼遠比 ReplicaStateMachine 中的簡單,而且,只有 OnlinePartition 這一路的分支邏輯相對複雜,其他 3 路僅僅是將分割槽狀態設定成目標狀態而已,所以,我們來深入研究下目標狀態是 OnlinePartition 的分支。

 1 case OnlinePartition =>
 2   // 獲取未初始化分割槽列表,也就是NewPartition狀態下的所有分割槽
 3   val uninitializedPartitions = validPartitions.filter(
 4     partition => partitionState(partition) == NewPartition)
 5   // 獲取具備Leader選舉資格的分割槽列表
 6   // 只能為OnlinePartition和OfflinePartition狀態的分割槽選舉Leader 
 7   val partitionsToElectLeader = validPartitions.filter(
 8     partition => partitionState(partition) == OfflinePartition ||
 9      partitionState(partition) == OnlinePartition)
10   // 初始化NewPartition狀態分割槽,在ZooKeeper中寫入Leader和ISR資料
11   if (uninitializedPartitions.nonEmpty) {
12     val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions)
13     successfulInitializations.foreach { partition =>
14       stateChangeLog.info(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
15         s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
16       controllerContext.putPartitionState(partition, OnlinePartition)
17     }
18   }
19   // 為具備Leader選舉資格的分割槽推選Leader
20   if (partitionsToElectLeader.nonEmpty) {
21     val electionResults = electLeaderForPartitions(
22       partitionsToElectLeader,
23       partitionLeaderElectionStrategyOpt.getOrElse(
24         throw new IllegalArgumentException("Election strategy is a required field when the target state is OnlinePartition")
25       )
26     )
27     electionResults.foreach {
28       case (partition, Right(leaderAndIsr)) =>
29         stateChangeLog.info(
30           s"Changed partition $partition from ${partitionState(partition)} to $targetState with state $leaderAndIsr"
31         )
32         // 將成功選舉Leader後的分割槽設定成OnlinePartition狀態
33         controllerContext.putPartitionState(
34           partition, OnlinePartition)
35       case (_, Left(_)) => // 如果選舉失敗,忽略之
36     }
37     // 返回Leader選舉結果
38     electionResults
39   } else {
40     Map.empty
41   }

第 1 步是為 NewPartition 狀態的分割槽做初始化操作,也就是在 ZooKeeper 中,建立並寫入分割槽節點資料。節點的位置是/brokers/topics//partitions/,每個節點都要包含分割槽的 Leader 和 ISR 等資料。而 Leader 和 ISR 的確定規則是:選擇存活副本列表的第一個副本作為 Leader;選擇存活副本列表作為 ISR。至於具體的程式碼,可以看下 initializeLeaderAndIsrForPartitions 方法程式碼片段的倒數第 5 行:

 1 private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicPartition]): Seq[TopicPartition] = {
 2   ......
 3     // 獲取每個分割槽的副本列表
 4     val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition))
 5     // 獲取每個分割槽的所有存活副本
 6     val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) =>
 7         val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))
 8         partition -> liveReplicasForPartition
 9     }
10     // 按照有無存活副本對分割槽進行分組
11     // 分為兩組:有存活副本的分割槽;無任何存活副本的分割槽
12     val (partitionsWithoutLiveReplicas, partitionsWithLiveReplicas) = liveReplicasPerPartition.partition { case (_, liveReplicas) => liveReplicas.isEmpty }
13     ......
14     // 為"有存活副本的分割槽"確定Leader和ISR
15     // Leader確認依據:存活副本列表的首個副本被認定為Leader
16     // ISR確認依據:存活副本列表被認定為ISR
17     val leaderIsrAndControllerEpochs = partitionsWithLiveReplicas.map { case (partition, liveReplicas) =>
18       val leaderAndIsr = LeaderAndIsr(liveReplicas.head, liveReplicas.toList)
19       ......
20     }.toMap
21     ......
22 }

第 2 步是為具備 Leader 選舉資格的分割槽推選 Leader,程式碼呼叫 electLeaderForPartitions 方法實現。這個方法會不斷嘗試為多個分割槽選舉 Leader,直到所有分割槽都成功選出 Leader。

總結

以後關於kafka系列的總結大部分來自Geek Time的課件,大家可以自行關鍵字搜尋。