Apache Pulsar 框架簡介
一、架構概述
【1】Pulsar租戶架構圖
【2】Pulsar叢集架構
【3】Pulsar關係概念圖
【4】Pulsar訂閱模式圖
二、訂閱模式介紹
【1】四種訂閱模式
【2】Producer
【3】Consumer
【4】Topic
【5】多Topic訂閱
【6】資料分割槽
【7】路由模式
【8】訊息存留、過期、去重
三、對比Kafka
四、Bug修改 :埠號衝突
具 體 內 容
一、Pulsar框架介紹
Pulsar 是一個用於伺服器到伺服器的訊息系統,具有多租戶、高效能等優勢,最初由 Yahoo 開發,目前由 Apache 軟體基金會管理。
- 跨地域複製(geo-replication),單個例項原生支援多個叢集(跨叢集複製);
- 支援多種topic訂閱模式:獨佔訂閱、共享訂閱、故障轉移訂閱、鍵共享;
- 通過Apache BookKeeper提供的持久化訊息儲存機制保證訊息傳遞;
【1】Pulsar關係架構圖:
-
property/tenant:一個property代表一個租戶,一個property可包含多個namesapce;假設部署了一個Pulsar叢集來支援多個應用程式,在企業中每個property都可以代表一個團隊,一個核心的功能,或者一個產品線;
-
namespace:是Pulsar的基本管理單元,在namaspace級別可設定許可權permission,備份fine-tune,跨叢集管理訊息資料的地理複製geo-replication、訊息TTL等;一個namaspace裡的所有topic都繼承相同的設定;
-
topic:一種通道,用作從producer到consumer傳輸訊息:持久(預設,硬碟)和非持久(僅記憶體);
【2】Pulsar叢集架構及術語
Pulsar Cluster~= Broker 叢集 + Bookkeeper +ZooKeeper
叢集間可以通過跨地域複製(Geo-Replication)進行訊息同步
- 多個Broker節點組成一個Pulsar Cluster;多個Pulsar Cluster組成一個Pulsar Instance。
- Pulsar通過geo-replication支援一個Instance內在不同的叢集傳送和消費訊息。
- BookKeeper 有狀態叢集,負責訊息資料的持久化儲存(Bookie);
- Broker 叢集屬於無狀態叢集,只處理業務邏輯;
- ZooKeeper負責各種與配置和協調相關的任務(localZK負責cluster內部配置,globalZK負責cluster之間的配置);
- Global replication負責叢集間的資料複製;
【3】Pulsar關係概念圖
【4】Pulsar訂閱模式圖
Producer釋出訊息到topic,Consumer可訂閱這些topic,處理髮布過來的訊息,在處理完成後傳送確認。
(一旦訂閱被建立,所有的訊息都將被Pulsar保留,即使consumer斷開連線。 只有在consumer確認訊息被成功處理後,保留下來的訊息才會被丟棄。)
訊息佇列:
- Producer是如何生產訊息,傳送到對應的Broker???
- Broker是如何處理訊息,將高效的持久化以及查詢???
- Consumer是如何進行消費訊息???
二、訂閱模式:producer-topic-subscription-consumer
Topic支援多種訂閱模式: 獨佔(exclusive), 共享(shared)和災備(failover),key共享(key_shared);
【1】四種訂閱模式:
-
【獨佔模式(預設)】:只能有一個Consumer消費topic的訊息,超過一個Consumer會收到錯誤;
-
【災備模式】:同一時刻只有一個有效的Consumer,其餘的Consumer作為備用節點,在Master Consumer不可用後進行替代。
-
【共享模式】:可以同時存在多個Consumer,每個Consumer處理Topic中一部訊息。訊息通過輪詢機制分發給不同的消費者,並且每個訊息僅會被分發給一個消費者。當某個消費者斷開連線,所有被髮送給它但沒有被確認的訊息將被重新安排,分發給其它可用的消費者(有兩點需注意,1、不保證訊息順序; 2、不能使用累計確認);
-
【Key-shared模式】: 類似於shared模式,但是相同key的訊息會傳遞給同一個消費者,(該模式限制:訊息必須指定key/orderingKey;不能使用累計確認;該模式目前是測試版,可禁用);
【2】Producer:
- 傳送模式:同步或非同步
- 壓縮:訊息在傳送中可壓縮來節省頻寬;
- 批處理:生產者將在單個請求中傳送批量訊息
- 分塊(批處理需禁用):訊息分塊傳送至broker(一P對一C、多P對一C)
【3】Consumer:
- 接收模式:同步或非同步
- 監聽:客戶端庫為consumers提供listener的實現,(例如Java客戶端,提供MesssageListener介面,實現該介面,一旦接受到新的訊息,received方法將被呼叫。
- 確認: 當consumer 成功消費掉一條訊息後,會發送一個確認請求到broker,broker會丟棄這條訊息,否則儲存這條訊息。單條確認 或 累計確認(共享模式不支援);
- 取消確認:當consumer 在一定時間內沒有成功消費訊息,想再次消費該訊息,這個consumer就可以傳送一個否定確認到broker,然後broker重發這條訊息。(獨佔消費模式和災備訂閱模式中,消費者僅僅只能對收到的最後一條訊息進行取消確認)。單條取消確認+累積取消模式;
- 確認超時:未確認訊息會自動重新交付
【4】Topic:
- 持久topic(預設):
Pulsar儲存所有沒有確認的訊息到多個BookKeeper的bookies中,永續性topic上的訊息資料可以在 broker
重啟和訂閱者故障轉移之後繼續存在。 - 非持久topic:訊息資料僅存活在記憶體。 如果broker掛掉或者因其他情況不能從記憶體取到,訊息資料就可能丟失。
- 死信(Dead letter)topic:死信topic使您能夠在消費者無法成功消費某些訊息時消費新訊息。在這種機制中,無法成功消費的訊息儲存在單獨的topic,稱為死信topic---僅僅適用於共享模式;
【5】多topic訂閱(不能保證順序性):
- 正則表示式(所有的主題必須在同一個namespace);
- 明確指定topic列表;
【6】資料分割槽
通常一個topic僅被一個broker服務,這限制了topic的最大吞吐量。 分割槽topic是特殊的topic型別,他可以被多個broker處理,這讓topic有更高的吞吐量。分割槽的topic通過N個內部topic實現,N是分割槽的數量。 當向分割槽的topic傳送訊息,每條訊息被路由到其中一個broker。 Pulsar自動處理跨broker的分割槽分佈。
Topic1有5個分割槽(P0到P4),分佈在3個broker上。因為分割槽數量多於broker數量,其中有兩個broker每個處理兩個分割槽,第三個broker則只處理一個。
Topic1的訊息被廣播給兩個consumer,路由模式決定哪個broker處理哪個partition,訂閱模式決定哪條訊息傳送到哪個consumer;
【7】路由模式:訊息應該釋出到哪個分割槽
- roundRobinPartition(預設):message無key則輪詢,有key則hash(key)指定分割槽
- SinglePartition:無key,producer將會隨機選擇一個分割槽,把所有的訊息發往該分割槽。
如果為message指定了key,分割槽的producer會把key做hash,然後分配訊息到指定的分割槽。 - CustomPartition:使用客製化訊息路由實現,可以決定特定的訊息進入指定的分割槽。
【8】訊息存留/到期/去重(namesapce級別管理)
訊息的順序與路由模式和訊息的key有關;
- 按key分割槽:相同key的訊息有序,並被髮送到相同的partition(非客戶自定義路由)
- 按producer:來自相同producer的訊息有序,(singlepartition) Broker預設:
Broker預設:
- 立即刪除所有已被consumer確認過的訊息;
- 以訊息backlog的形式,持久儲存所有未被確認的訊息;
存留:可以儲存被consumer確認過的訊息;
過期:可以給未被確認的訊息設定TTL
去重:pulsar保證訊息僅持久化一次(避免生產者冪)
三、與Kafka的不同
1、Pulsar是流式處理(Kafka)和佇列的合體;
2、Pulsar儲存計算分離,其他MQ不是;
3、Pulsar的broker是無狀態的,而Kafka是有狀態的;
4、Pulsar簡單的跨域賦值、擴容簡單,資料處理快;
四、Bug修改 :埠號衝突
1、Pulsar本地啟動時出現埠衝突問題
Apache.Pulsar.zookeeper.BindException in ininating zookeeper: Address already in use:0.0.0.0:2181
問題:pulsar中自帶的zookeeper客戶端埠號被佔用導致啟動失敗
解決方案一:殺掉佔用該埠的其他程序
netstat -anp | grep 2181
kill -9 pid
解決方案二:更改pulsar中的conf檔案修改埠號
grep 2181 *conf
vi *conf
:n 編輯下一個文件
:N 編輯上一個文件
:e# 編輯上一個文件,用於在檔案間相互交換編輯時使用;
?# 編輯前一次編輯的文件