1. 程式人生 > >Kafka動態配置實現原理解析

Kafka動態配置實現原理解析

問題導讀

Apache Kafka在全球各個領域各大公司獲得廣泛使用,得益於它強大的功能和不斷完善的生態。其中Kafka動態配置是一個比較高頻好用的功能,下面我們就來一探究竟。

  • 動態配置是如何設計的?
  • 動態配置優先順序是怎樣的?
  • Broker初始化是如何讀取配置的?
  • 動態配置支援哪些特性功能?
  • 動態配置如何使用呢?

前言介紹

 Kafka初始開源的幾個版本,當broker初始化啟動時,所有配置資訊只能從server.properties靜態檔案讀取,以後不再發生任何更改,隨著Kafka逐步迭代,線上業務對穩定性和個性化要求越來越突出,需要能支援線上修改功能動態生效的需求應運而生。例如:按照topic維度清理資料,依據clientid限流,根據使用者名稱稱進行訪問許可權控制等等。目前Kafka最新版本支援以下幾類動態配置。

 

發展歷程

動態配置檔案發展歷程:

kafka在0.8.0對topic的管理功能分佈在三個shell中,它們分別是kafka-list-topic.sh、kafka-create-topic.sh、kafka-delete-topic.sh、kafka-add-partitions.sh,後來社群考慮到topic管理功能過於分散,到了0.8.1版本有關topic所有功能收斂到kafka-topics.sh中。0.8.0中只有topic的建立、刪除和列表及新增分割槽功能,到了0.8.1開始支援topics動態配置了。 

0.9.0.0開始支援client(producer和consumer)客戶端配額限流支援,確保不因為某個或少數幾個topic的客戶端佔滿了broker頻寬資源和磁碟IO資源,影響其他客戶端的正常讀寫,導致叢集內主從同步也受到影響。這個功能對確保系統SLA大有好處,通過服務降級,保證寫/生產不受影響,降低或暫停讀/消費流量更容易解決系統資源瓶頸。

0.9.0版本動態配置與topic管理分離,為了保持向下相容kafka-topics.sh依然包含操作topic動態配置功能,新增kafka-configs.sh支援clients和topics動態配置功能,所以kafka-topics.sh和kafka-configs.sh任意一個都可以修改topic動態配置

0.10.1.0版本新增支援users和brokers動態配置功能,user動態配置用於訪問資源的許可權控制,提升叢集的訪問和資料安全性,例如:使用者對讀/寫/建立/刪除等操作和API、topic、group資源訪問控制。broker動態配置,在不用重啟及影響服務執行情況下,broker級別功能實現動態生效,例如:副本註冊複製速率、磁碟內掛載點間資料遷移速率、網路請求的執行緒數、處理請求的I/O執行緒數等等全域性引數等等。

0.10.1.0~2.3.1版本都支援topics、clients、users、brokers四型別動態配置的11種粒度配置物件,只是配置模組和屬性欄位有增減與調整。

動態配置設計原理

使用者使用kafka-configs.sh指令碼,根據格式和引數規範要求,ConfigCommand類進行相關邏輯處理、json格式和內容校驗,生成notification json,寫入到序列化持久節點上,zk路徑為xxx/config/changes/config_change_seqNo,節點名稱為config_change_seqNo,其中seqNo從1開始的自增序號。kafka叢集中所有broker通過監聽zk上xxx/config/changes的children變化,每次獲得比當前記憶體中last_seqNo大的seqNo的json內容,從中讀取entity_type/entity_name相對路徑,由此判斷如何從xxx/config/topics|clients|users|brokers四種類型中讀取哪個配置路徑。同一個Broker在操作過程中任何時刻只能序列讀寫一種型別的配置,多種配置需要序列操作。

各個角色的作用:

kafka-config.sh: 負責寫dynamic config和notification,寫順序上圖有先後標識。

broker:負責監聽xxx/config/changes子節點變化和讀取entity_type/entity_name路徑節點上內容

zk:負責儲存notification和dynamic config及下發配置給相應的broker

notification json內容:

V1 0.10.0.1及以前版本有效

{
    "version": 1,
    "entity_type": "topics",
    "entity_name":  "finalTest"
}

V2 0.10.1.0~2.3.1 當前最新版本都有效 

以上不管是version 1還是version2,本質上沒有變化。都是通過entity_type/entity_name獲得entity_path的zk相對路徑,全路徑為xxx/config/entityType/entityName,具體請看如下詳圖

entity_type=topics | clients | users | brokers

entity_name=topicName | clientId | userId | (brokerId | <default>)

當entity_type為brokers時,brokerId為broker編號與自己的server.properties對應,只對某個broker生效。“<default>”指對所有broker生效。而entity_type為topics | clients | users對所有broker都生效。通過以上entity_type/entity_name六種組合成六個zk相對路徑。

topics和clients組合原理一樣,但users和brokers卻略有不同,他們各自有2個組合,除了普通組合還有複合組合,兩種型別組合在一起,例如users有users與clients組合,zk路徑為users/<user>/clients/<clientId>;brokers動態配置非常實用,不需要重啟就能動態更改任意數量brokers配置,更改所有brokers為xxx/brokers/<default>

四類動態配置11種zk相對路徑,根據11種zk相對路徑可以讀取11種粒度配置物件dynamic config。

<default>說明:某種型別下所有作用域生效,例如xxx/clients/<default>和xxx/brokers/<default>就是叢集內所有All clients和叢集內所有All brokers配置都會生效,其他同理。

dynamic config內容示例:

entity_type/entity_name=topics/<topic_name>=topics/finalTest

{
    "version": 1,
    "config": {
        "retention.bytes": "102400000",
        "flush.ms": "5000",
        "cleanup.policy": "compact",
        "flush.messages",
        ...
    }
}

entity_type/entity_name=clients/<clientId>=clients/camusall

{
    "version": 1,
    "config": {
        "producer_byte_rate": "20971520",
        "consumer_byte_rate": "20971520"
    }
}

entity_type/entity_name=brokers/all brokers=brokers/<default>

{
    "version": 1,
    "config": {
        "leader.replication.throttled.rate": "5000",
        "follower.replication.throttled.rate": "60000",
        "replica.alter.log.dirs.io.max.bytes.per.second": "5000",
        "log.retention.hours": "24",
        "log.flush.interval.messages": "5000",
        "min.insync.replicas": "2",
        ...
    }
}

entity_type/entity_name=brokers/brokerId     與all brokers的配置形成完全一樣,只是作用域範圍不同而已,此處省略。

寫配置格式校驗

如果寫入配置不進行規範校驗,broker就會讀取處理過程中,就會卡住或阻塞,影響服務執行穩定性。所以配置校驗至關重要,校驗規則如下:

  • 配置引數格式必須合法,否則報錯不予接收
  • 輸入配置項進行校驗,輸入引數必須是kafka包含的配置項,否則過濾掉

config_change_seqNo生成規則

 kafka-configs.sh指令碼每成功執行一次,在zk上就建立一個新的seqNode節點(即/xxx/config/changes/config_change_seqNo),seqNode是zk的持久順序節點(PERSISTENT_SEQUENTIAL),它的組成是seqNode = seqNodePrefix + seqNodeSuffix,config_change_固定為seqNode的字首,seqNodeSuffix = seqNo為seqNode的字尾,seqNo是10位數字的序列號,這個序列號字尾是自增的,由zk服務端自動生成和維護,每次事務請求成功就加1,它與MySQL的自增id原理一樣。

config_change_seqno清除規則 

叢集經過長期執行積累,xxx/config/changes下會留存大量歷史節點,如果不及時清理,會有以下影響:

  • 大量無用的seqNode進行傳輸,會增加網路頻寬負擔
  • 佔用zk服務端記憶體及儲存資源
  • Kafka會做大量無效判斷和計算

綜上所述,因此必須要及時清除無用的seqNode集合,清除公式步驟如下:

  • 當broker監聽到notification變化回撥時,記錄系統時間。
  • 獲取xxx/config/changes下所有子節點,讀取每個seqNode的建立時間
  • 系統時間減去seqNode建立時間,如果時間差值大於過期時間,即changeExpirationMs,就會被刪除
  • changeExpirationMs預設為15分鐘,可由broker配置。

config_change_seqno判斷處理

前面提到每當觸發回撥處理,seqNode節點建立時間過期15分鐘會被刪除,刪除條件是觸發才會被執行,如果長時間不建立就可能有少數幾個seqNode一直保留。如果短時間內(15分鐘內)建立大量seqNode,又不會立即被刪除,只有等到下次觸發達到條件才行,那怎麼判斷哪些會被處理呢?broker快取中維護一個變數lastExecutedSeqNo,它負責儲存執行歷史中seqNode最大順序號,所以每當觸發回撥獲取seqNodeSet列表時,都能輕易判斷出哪些需要處理計算,也會同步更新lastExecutedSeqNo。

notification作用

  • 通知broker有新的動態配置產生,讀取相應的動態配置
  • 不用監聽大量四種類型配置下子節點,每個broker只需要監聽一個notification節點即可,高效且效能也高
  • 大大減少broker監聽數量,如果像controller監聽/xxx/partitions/[0-N]/state一樣,監聽數量就是四種類型配置zk路徑乘以broker數量了

靜態與動態的配置及優先順序

 

 

靜態與動態區別。靜態配置是Broker內建預設配置和靜態配置檔案server.properties,broker啟動前可以任意修改,啟動後不可修改。動態配置是broker啟動執行後,可以線上更新生效,偷偷說一句離線也可以改,就是不生效而已。

配置優先順序。以上4個圖包含4型別配置既有動態也有靜態,那優先順序如何呢?動態配置優先順序高於靜態配置。如上圖1、2、4,環越小優先順序越高,對於動態配置來說,修改配置作用域範圍越小優先順序越高,反之亦然。優先順序最高的,會逐級覆蓋相同配置項。

當broker啟動時。讀取順序依次為broker內建預設配置,broker靜態配置檔案,動態配置。當配置項相同時,高優先順序覆蓋次優先的,其他依次類推

圖示說明:上圖1、2、3、4中,其中圖1、2、4中環數字表示配置優先順序關係,數字從1~5表示優先順序從高到低。圖3為兩級關聯。Users和Clients組合實現配置管理,這兩者組合用於客戶端配額限流,Users與Clients就像兩級目錄一樣,一個User可以包含一個、多個clientId或所有clientId。圖4中既有優先順序關係也有配置引數包含關係,topics型別配置是brokers型別配置的子集,brokers除了包含topic配置外還有DynamicThreadPool、DynamicListenerConfig、DynamicConnectionQuota、LogCleaner配置。

如何使用動態配置

  • 使用指令碼kafka-configs.sh和kafka-topics.sh,kafka-configs.sh支援四種類型,kafka-topics.sh僅支援topics型別
  • 使用Apache-kafka官方提供的java版本客戶端API呼叫
  • 直接寫zk實現,具體遵循如上寫notification和dynamic_config規範

如想更深入瞭解kafka-configs.sh用法,請檢視

總結

  • Kafka配置引數分為靜態配置和動態配置,靜態分為內建預設與外接使用者配置,使用者配置優先於內建配置
  • 動態配置為4型別11個zk相對路徑,即11種粒度配置物件生效,同類型作用域範圍越小優先順序越高
  • 動態配置優先順序比靜態配置優先順序高,動態配置中Users與Clients可以組合配置
  • 從設計原理中瞭解config_change_seqNo生成規則

  • 寫上文中理解了寫Notification的作用,從而理解什麼場景會適合使用zk中持久順序節點(PERSISTENT_SEQUENTIAL)

注意事項:以上配置解析,是基於Kafka-2.3.1版本分析

參考資料

Release Notes - Kafka - Version 0.8.1:https://archive.apache.org/dist/kafka/0.8.1/RELEASE_NOTES.html
Release Notes - Kafka - Version 0.10.1.0:https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html
Release Notes - Kafka - Version 0.10.2.0:https://archive.apache.org/dist/kafka/0.10.2.0/RELEASE_NOTES.html
Release Notes - Kafka - Version 0.10.2.1:https://archive.apache.org/dist/kafka/0.10.2.1/RELEASE_NOTES.html
Release Notes - Kafka - Version 1.1.0:https://archive.apache.org/dist/kafka/1.1.0/RELEASE_NOTES.html
Release Notes - Kafka - Version 1.1.1:https://archive.apache.org/dist/kafka/1.1.1/RELEASE_NOTES.html
Release Notes - Kafka - Version 2.0.0:https://archive.apache.org/dist/kafka/2.0.0/RELEASE_NOTES.html
KIP-21 - Dynamic Configuration:https://cwiki.apache.org/confluence/display/KAFKA/KIP-21+-+Dynamic+Configuration
KIP-226 - Dynamic Broker Configuration:https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
Make DynamicConfigManager to use the ZkNodeChangeNotificationListener introduced as part of KAFKA-2211:https://issues.apache.org/jira/browse/KAFKA-2547
KIP-257 - Configurable Quota Management:https://cwiki.apache.org/confluence/display/KAFKA/KIP-257+-+Configurable+Quota+Management
KIP-73 Replication Quotas:https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas