1. 程式人生 > 其它 >Apache Pulsar 框架簡介

Apache Pulsar 框架簡介

一、架構概述

【1】Pulsar租戶架構圖

【2】Pulsar叢集架構

【3】Pulsar關係概念圖

【4】Pulsar訂閱模式圖

二、訂閱模式介紹

【1】四種訂閱模式

【2】Producer

【3】Consumer

【4】Topic

【5】多Topic訂閱

【6】資料分割槽

【7】路由模式

【8】訊息存留、過期、去重

三、對比Kafka

四、Bug修改 :埠號衝突

具 體 內 容

一、Pulsar框架介紹

Pulsar 是一個用於伺服器到伺服器的訊息系統,具有多租戶、高效能等優勢,最初由 Yahoo 開發,目前由 Apache 軟體基金會管理。

  • 跨地域複製(geo-replication),單個例項原生支援多個叢集(跨叢集複製);
  • 支援多種topic訂閱模式:獨佔訂閱、共享訂閱、故障轉移訂閱、鍵共享;
  • 通過Apache BookKeeper提供的持久化訊息儲存機制保證訊息傳遞;

【1】Pulsar關係架構圖:

  • property/tenant:一個property代表一個租戶,一個property可包含多個namesapce;假設部署了一個Pulsar叢集來支援多個應用程式,在企業中每個property都可以代表一個團隊,一個核心的功能,或者一個產品線;

  • namespace:是Pulsar的基本管理單元,在namaspace級別可設定許可權permission,備份fine-tune,跨叢集管理訊息資料的地理複製geo-replication、訊息TTL等;一個namaspace裡的所有topic都繼承相同的設定;

  • topic:一種通道,用作從producer到consumer傳輸訊息:持久(預設,硬碟)和非持久(僅記憶體);

【2】Pulsar叢集架構及術語

Pulsar Cluster~= Broker 叢集 + Bookkeeper +ZooKeeper
叢集間可以通過跨地域複製(Geo-Replication)進行訊息同步

  1. 多個Broker節點組成一個Pulsar Cluster;多個Pulsar Cluster組成一個Pulsar Instance。
  2. Pulsar通過geo-replication支援一個Instance內在不同的叢集傳送和消費訊息。
  • BookKeeper 有狀態叢集,負責訊息資料的持久化儲存(Bookie);
  • Broker 叢集屬於無狀態叢集,只處理業務邏輯;
  • ZooKeeper負責各種與配置和協調相關的任務(localZK負責cluster內部配置,globalZK負責cluster之間的配置);
  • Global replication負責叢集間的資料複製;

【3】Pulsar關係概念圖

【4】Pulsar訂閱模式圖

Producer釋出訊息到topic,Consumer可訂閱這些topic,處理髮布過來的訊息,在處理完成後傳送確認。

(一旦訂閱被建立,所有的訊息都將被Pulsar保留,即使consumer斷開連線。 只有在consumer確認訊息被成功處理後,保留下來的訊息才會被丟棄。)

訊息佇列:

  • Producer是如何生產訊息,傳送到對應的Broker???
  • Broker是如何處理訊息,將高效的持久化以及查詢???
  • Consumer是如何進行消費訊息???

二、訂閱模式:producer-topic-subscription-consumer

Topic支援多種訂閱模式: 獨佔(exclusive), 共享(shared)和災備(failover),key共享(key_shared);

【1】四種訂閱模式:

  • 【獨佔模式(預設)】:只能有一個Consumer消費topic的訊息,超過一個Consumer會收到錯誤;

  • 【災備模式】:同一時刻只有一個有效的Consumer,其餘的Consumer作為備用節點,在Master Consumer不可用後進行替代。

  • 【共享模式】:可以同時存在多個Consumer,每個Consumer處理Topic中一部訊息。訊息通過輪詢機制分發給不同的消費者,並且每個訊息僅會被分發給一個消費者。當某個消費者斷開連線,所有被髮送給它但沒有被確認的訊息將被重新安排,分發給其它可用的消費者(有兩點需注意,1、不保證訊息順序; 2、不能使用累計確認);

  • 【Key-shared模式】: 類似於shared模式,但是相同key的訊息會傳遞給同一個消費者,(該模式限制:訊息必須指定key/orderingKey;不能使用累計確認;該模式目前是測試版,可禁用);

【2】Producer:

  • 傳送模式:同步或非同步
  • 壓縮:訊息在傳送中可壓縮來節省頻寬;
  • 批處理:生產者將在單個請求中傳送批量訊息
  • 分塊(批處理需禁用):訊息分塊傳送至broker(一P對一C、多P對一C)

【3】Consumer:

  • 接收模式:同步或非同步
  • 監聽:客戶端庫為consumers提供listener的實現,(例如Java客戶端,提供MesssageListener介面,實現該介面,一旦接受到新的訊息,received方法將被呼叫。
  • 確認: 當consumer 成功消費掉一條訊息後,會發送一個確認請求到broker,broker會丟棄這條訊息,否則儲存這條訊息。單條確認 或 累計確認(共享模式不支援);
  • 取消確認:當consumer 在一定時間內沒有成功消費訊息,想再次消費該訊息,這個consumer就可以傳送一個否定確認到broker,然後broker重發這條訊息。(獨佔消費模式和災備訂閱模式中,消費者僅僅只能對收到的最後一條訊息進行取消確認)。單條取消確認+累積取消模式;
  • 確認超時:未確認訊息會自動重新交付

【4】Topic:

  • 持久topic(預設):
    Pulsar儲存所有沒有確認的訊息到多個BookKeeper的bookies中,永續性topic上的訊息資料可以在 broker
    重啟和訂閱者故障轉移之後繼續存在。
  • 非持久topic:訊息資料僅存活在記憶體。 如果broker掛掉或者因其他情況不能從記憶體取到,訊息資料就可能丟失。
  • 死信(Dead letter)topic:死信topic使您能夠在消費者無法成功消費某些訊息時消費新訊息。在這種機制中,無法成功消費的訊息儲存在單獨的topic,稱為死信topic---僅僅適用於共享模式;

【5】多topic訂閱(不能保證順序性):

  • 正則表示式(所有的主題必須在同一個namespace);
  • 明確指定topic列表;

【6】資料分割槽

通常一個topic僅被一個broker服務,這限制了topic的最大吞吐量。 分割槽topic是特殊的topic型別,他可以被多個broker處理,這讓topic有更高的吞吐量。分割槽的topic通過N個內部topic實現,N是分割槽的數量。 當向分割槽的topic傳送訊息,每條訊息被路由到其中一個broker。 Pulsar自動處理跨broker的分割槽分佈。

Topic1有5個分割槽(P0到P4),分佈在3個broker上。因為分割槽數量多於broker數量,其中有兩個broker每個處理兩個分割槽,第三個broker則只處理一個。

Topic1的訊息被廣播給兩個consumer,路由模式決定哪個broker處理哪個partition,訂閱模式決定哪條訊息傳送到哪個consumer;

【7】路由模式:訊息應該釋出到哪個分割槽

  • roundRobinPartition(預設):message無key則輪詢,有key則hash(key)指定分割槽
  • SinglePartition:無key,producer將會隨機選擇一個分割槽,把所有的訊息發往該分割槽。
    如果為message指定了key,分割槽的producer會把key做hash,然後分配訊息到指定的分割槽。
  • CustomPartition:使用客製化訊息路由實現,可以決定特定的訊息進入指定的分割槽。

【8】訊息存留/到期/去重(namesapce級別管理)

訊息的順序與路由模式和訊息的key有關;

  • 按key分割槽:相同key的訊息有序,並被髮送到相同的partition(非客戶自定義路由)
  • 按producer:來自相同producer的訊息有序,(singlepartition) Broker預設:

Broker預設:

  • 立即刪除所有已被consumer確認過的訊息;
  • 以訊息backlog的形式,持久儲存所有未被確認的訊息;

存留:可以儲存被consumer確認過的訊息;

過期:可以給未被確認的訊息設定TTL

去重:pulsar保證訊息僅持久化一次(避免生產者冪)

三、與Kafka的不同

1、Pulsar是流式處理(Kafka)和佇列的合體;
2、Pulsar儲存計算分離,其他MQ不是;
3、Pulsar的broker是無狀態的,而Kafka是有狀態的;
4、Pulsar簡單的跨域賦值、擴容簡單,資料處理快;

四、Bug修改 :埠號衝突

1、Pulsar本地啟動時出現埠衝突問題
Apache.Pulsar.zookeeper.BindException in ininating zookeeper: Address already in use:0.0.0.0:2181
問題:pulsar中自帶的zookeeper客戶端埠號被佔用導致啟動失敗
解決方案一:殺掉佔用該埠的其他程序
netstat -anp | grep 2181
kill -9 pid

解決方案二:更改pulsar中的conf檔案修改埠號
grep 2181 *conf
vi *conf
:n 編輯下一個文件
:N 編輯上一個文件
:e# 編輯上一個文件,用於在檔案間相互交換編輯時使用;
?# 編輯前一次編輯的文件