1. 程式人生 > 其它 >[Pulsar系列] 10分鐘學會Pulsar訊息系統概念

[Pulsar系列] 10分鐘學會Pulsar訊息系統概念

技術標籤:訊息中介軟體pulsar

Apache Pulsar

Pulsar是一個支援多租戶的、高效能的服務與服務之間訊息通訊的解決方案,最初由雅虎開發,現在由Apache軟體基金會管理。

Pulsar在Yahoo的生產環境運行了三年多,助力Yahoo的主要應用,如Yahoo Mail、Yahoo Finance、Yahoo Sports、Flickr、Gemini廣告平臺和Yahoo分散式鍵值儲存系統Sherpa。

Kafka不夠好,智聯招聘基於Pulsar打造企業級事件中心。

Pulsar的主要特性如下:

  • Pulsar例項原生支援多叢集,能無縫的基於地理位置進行跨叢集備份
  • 非常低的訊息釋出和端到端的延遲
  • 無縫擴充套件到超過百萬個topic
  • 支援Java,Go,Pytho和C++的客戶端
  • Topic支援多種訂閱模式: 獨佔(exclusive), 共享(shared)和災備(failover)
  • 通過Apache BookKeeper提供的持久化訊息儲存機制保證訊息的送達
  • serverless的輕量級計算框架Pulsar Functions提供了原生的流資料處理
  • serverless的聯結器框架Pulsar IO構建於 Pulsar Functions之上,能夠輕鬆的將資料從Pulsar中移入和移出
  • 當資料老化時,分層儲存將資料從熱儲存解除安裝到冷儲存(如S3和GCS)

訊息佇列的使用場景包括非同步處理,應用解耦,流量削鋒和訊息通訊四個場景.

1. 訊息系統概念

Pulsar採用了釋出訂閱的設計模式,也稱作pub-sub。該設計模式中,producer釋出訊息到topic,consumer可以訂閱這些topic,處理髮布過來的訊息,在處理完成後傳送確認。
一旦訂閱被建立,所有的訊息都將被Pulsar保留,即使consumer斷開連線。 只有在consumer確認訊息被成功處理後,保留下來的訊息才會被丟棄。

1.1 Messages

訊息是Pulsar的基礎單元。 訊息就是producer發給topic的內容,以及consumer從topic消費的內容(訊息處理完成後傳送確認)。 訊息類似於郵政系統中的信件。
訊息包含了多個屬性:Value(資料),Key(打標籤,用來壓縮訊息),Properties(可選,使用者自定義key/value),Producer name(生產者名稱,可預設生成,也可指定),Sequence ID(訊息的序列id),Publish time(釋出時間,生產者自動加上),Event time(訊息的可選時間戳)。

1.2 Producers

生產者是關聯topic的程式,它釋出訊息到Pulsar的broker上。
傳送模式:Producer可以以同步(sync)或者非同步(async)的方式釋出訊息到broker。

  • 同步傳送:producer傳送每條訊息後會等待broker的確認,如果沒有收到確認資訊,producer會認為傳送失敗
  • 非同步傳送:Producer將會把訊息放入blocking佇列,然後馬上返回。 然後客戶端在後臺將訊息傳送給broker。如果佇列已滿( 配置的最大數量),根據傳入producer的引數,producer可能阻塞或者直接返回失敗。

壓縮:訊息在傳送過程中可以被壓縮來節省頻寬,pulsar支援LZ4,ZLIB,ZSTD,SNAPPY型別。
批處理:如果啟用了批處理,生產者將在單個請求中傳送批量訊息。批處理大小由最大訊息數和最大發布延遲決定。

1.3 Consumers

消費者是訂閱關聯topic,然後接收訊息的程式。
接收模式:訊息可以通過同步或者非同步的方式從broker接收。

  • 同步接收:同步接收將會阻塞,直到訊息可用
  • 非同步接收:非同步接收立即返回future值,例如java中的CompletableFuture,一旦新訊息可用,它立即完成。

監聽:客戶端庫為consumers提供listener的實現,例如Java客戶端,提供MesssageListener介面,實現該介面,一旦接受到新的訊息,received方法將被呼叫。

void received(Consumer<T> consumer,Message<T> msg);

確認:當一個consumer 成功消費掉一條訊息後,那麼這個consumer會發送一個確認請求到broker,broker會丟棄這條訊息,否則儲存這條訊息。

訊息的確認可以一個接一個,也可以累積一起。 累積確認時,消費者只需要確認最後一條他收到的訊息。 所有之前(包含此條)的訊息,都不會被再次重發給那個消費者。

累積訊息確認不能用於共享訂閱模式,因為共享模式中,一個訂閱會涉及到多個消費者。
共享模式中,多條訊息可以單獨確認。

否定確認:當consumer 在一定時間內沒有成功消費訊息,而想再次消費該條訊息,那麼這個consumer可以傳送一個否定確認到broker,然後broker重發這條訊息。訊息可以一條接一條的否定確認,也可以累積否定確認,這取決於消費訂閱模式。在獨佔和災備模式,消費者只能否定確認其接收的最後一條訊息。在共享模式,消費者可以獨立否定確認。
確認超時:當一條訊息沒有被成功消費,並且您想要觸發broker自動重發訊息時,您可以採用未確認訊息自動重發機制。客戶端將在整個AckTimeout時間範圍內跟蹤未確認的訊息,並在指定確認超時時間自動向broker傳送重發未確認的訊息請求。

在確認超時之前使用否定確認。否定確認以更精確的方式控制單個訊息的重發,並在訊息處理時間超過確認超時時間後,避免無效的重發訊息。

死信(Dead letter)topic:死信topic使您能夠在消費者無法成功消費某些訊息時消費新訊息。在這種機制中,無法消費的訊息儲存在單獨的topic,稱為死信topic。您可以決定如何處理死信topic中的訊息。
在Java客戶端中,可以使用以下例子處理死信topic:

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
          .topic(topic)
          .subscriptionName("my-subscription")
          .subscriptionType(SubscriptionType.Shared)
          .deadLetterPolicy(DeadLetterPolicy.builder()
                .maxRedeliverCount(maxRedeliveryCount)
                .build())
          .subscribe();

死信topic依賴於訊息的重發。您需要確認訊息的重發方法:否定確認或確認超時。在確認超時之前使用否定確認。

目前,死信topic僅適用於共享模式。

1.4 Topics

和其他的釋出訂閱系統一樣,Pulsar 中的 topic 是被命名的通道,用做從producer到 consumer傳輸訊息。 Topic的名稱是具有明確定義結構的URL:

{persistent|non-persistent}://tenant/namespace/topic

persistent/non-persistent:topic的型別,包括持久化和非持久化(預設是持久型別)。topic指定持久化後,所有的訊息會持久化到硬碟(這意味著多塊硬碟,除非是單機模式的broker)。反之,非持久topic的資料不會儲存到硬碟上。

tenant:topic在例項中的租戶,租戶對於Pulsar的多租戶來說是必不可少的,可以分佈在多個叢集中。

namespace:Topic的管理單元,充當關聯topic組的管理機制。 大多數的topic配置在namespace層面生效。 每個tenant可以有多個namespace。

topic:topic名稱是自由定義的,在pulsar例項中無特殊意義。

1.4.1 namespace

名稱空間是租戶內部邏輯上的命名術語。 一個租戶可以通過admin API建立多個名稱空間。 例如,一個對接多個應用的租戶,可以為每個應用建立不同的namespace。 Namespace使得程式可以以層級的方式建立和管理topic。 例如:"my-tenant/app1" ,它的namespace是app1這個應用,對應的租戶是 my-tenant。 你可以在namespace下建立任意數量的topic。

1.4.2 訂閱模型

訂閱是命名好的配置規則,用於確定如何將訊息發給消費者。Pulsar有三種訂閱模式:exclusive(獨佔),shared(共享),failover(災備)。 下圖展示了這三種模式:

1.4.2.1 Exclusive

獨佔模式,只能有一個消費者訂閱topic。 如果多於一個消費者嘗試以同樣方式去訂閱topic,消費者將會收到錯誤。
上面的圖中,只有Consumer A可以消費。

Exclusive模式為預設訂閱模式。

1.4.2.2 Failover

災備模式,多個consumer可以繫結到同一個訂閱。Consumer將會按字典順序排序,第一個consumer被初始化為唯一接受訊息的消費者,這個consumer被稱為master consumer。
當master consumer斷開時,所有的訊息(未被確認和後續進入的)將會被分發給佇列中的下一個consumer。
下圖中,Consumer B-0是master consumer,當Consumer B-0斷開連線時,由於Consumer B-1在佇列中下一個位置,那麼它將會開始接收訊息。

1.4.2.3 Shared

共享模式,多個消費者可以繫結到同一個訂閱上。 訊息通過round robin輪詢機制分發給不同的消費者,並且每個訊息僅會被分發給一個消費者。當消費者斷開連線,所有被髮送給他,但沒有被確認的訊息將被重新安排,分發給其它存活的消費者。
下圖中,topic下有5條訊息,m0~m4,消費者有C1/C2/C3,最終m0和m3分配給C1,m1分給C2,m2和m4分給C3,可以說明每個訊息僅發給一個消費者。

Shared模式的限制
有兩點需注意,1、不保證訊息順序; 2、不能使用累計確認

Key_shared:
在Key-shared模式下,多個消費者可以關聯到同一訂閱。訊息以分散式在消費者之間傳遞,具有相同key/orderingKey 的訊息僅傳遞給一個消費者。無論訊息被重發多少次,它都發給同一個消費者。當消費者連線或斷開連線時,將導致某些訊息的key的消費者變更。

該模式限制:訊息必須指定key/orderingKey;不能使用累計確認;該模式目前是測試版,可以在broker.config禁用。

1.5 多topic訂閱

當consumer訂閱pulsar的topic時,它預設指定訂閱了一個topic,例如:persistent://public/default/my-topic。 從Pulsar的1.23.0-incubating的版本開始,Pulsar消費者可以同時訂閱多個topic。 你可以用以下兩種方式定義topic的列表:

  • 通過最基礎的正則表示式(regex),例如 persistent://public/default/finance-.*
  • 通過明確指定的topic列表

通過正則訂閱多主題時,所有的主題必須在同一個namespace。

當訂閱多主題時,Pulsar客戶端會自動呼叫Pulsar的API來發現匹配表示式或者列表的所有topic,然後全部訂閱。 如果此時有暫不存在的topic,那麼一旦這些topic被建立,conusmer會自動訂閱。

不能保證順序性
當消費者訂閱多topic時,Pulsar所提供對單一topic訂閱的順序保證,就hold不住了。 如果你在使用Pulsar的時候,遇到必須保證順序的需求,強烈建議不要使用此特性。

下面是多主題訂閱在java中的例子:

import java.util.regex.Pattern;
 
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
 
PulsarClient pulsarClient = // 例項化pulsar客戶端
 
// 訂閱一個namespace下的所有topic
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
                .topicsPattern(allTopicsInNamespace)
                .subscriptionName("subscription-1")
                .subscribe();
 
// 根據正則訂閱一個namespace下的多個topic
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer()
                .topicsPattern(someTopicsInNamespace)
                .subscriptionName("subscription-1")
                .subscribe();

通常一個topic僅被一個broker服務,這限制了topic的最大吞吐量。 分割槽topic是特殊的topic型別,他可以被多個broker處理,這讓topic有更高的吞吐量。

其實在背後,分割槽的topic通過N個內部topic實現,N是分割槽的數量。 當向分割槽的topic傳送訊息,每條訊息被路由到其中一個broker。 Pulsar自動處理跨broker的分割槽分佈。

下圖對此做了闡明:



分析上圖可知,Topic1有5個分割槽(P0到P4),分佈在3個broker上。因為分割槽數量多於broker數量,其中有兩個broker每個處理兩個分割槽,第三個broker則只處理一個。(再次強調,分割槽的分佈是Pulsar自動處理的)。

這個topic的訊息被廣播給兩個consumer,路由模式決定哪個broker處理哪個partition,訂閱模式決定哪條訊息傳送到哪個consumer。

大多數境況下,路由和訂閱模式可以分開制定。通常來講,吞吐能力的要求,決定了分割槽/路 的方式。訂閱模式則應該由應用來做決定。

分割槽topic和普通topic,對於訂閱模式如何工作,沒有任何不同。分割槽只是決定了從生產者生產訊息到消費者處理及確認訊息過程中發生的事情。

分割槽topic需要通過admin API顯式建立,建立topic時可以指定分割槽數。

1.6.1 路由模式

釋出到分割槽主題時,必須指定路由模式。路由模式決定每個訊息應該釋出到哪個分割槽,即哪個內部主題。三種路由模式如下:

  • RoundRobinPartition:如果沒有key,所有的訊息通過輪詢方式被路由到不同的分割槽,以達到最大吞吐量。請注意round-robin並不是作用於每條單獨的訊息,而是作用於延遲處理的批次邊界,以確保批處理有效。 如果為message指定了key,分割槽的producer會把key做hash,然後分配訊息到指定的分割槽。 這是預設的模式。
  • SinglePartition:如果沒有key被提供,producer將會隨機選擇一個分割槽,把所有的訊息發往該分割槽。 如果為message指定了key,分割槽的producer會把key做hash,然後分配訊息到指定的分割槽。
  • CustomPartition:使用客製化訊息路由實現,可以決定特定的訊息進入指定的分割槽。 使用者可以建立客製化的路由模式,通過使用 Java client ,實現MessageRouter介面。

1.7 順序保證

訊息的順序與訊息路由模式和訊息的key有關。通常,使用者需要對每個key分割槽的訊息保證順序。
當使用 SinglePartition或者RoundRobinPartition模式時,如果訊息有key,訊息將會被路由到匹配的分割槽,這是基於ProducerBuilder 中HashingScheme 指定的雜湊shema。
順序保證有兩種方式:

  • 按key分割槽:所有擁有相同key的訊息有序, 並且會被髮送至相同的partition。使用SinglePartition或RoundRobinPartition模式, 每條訊息都需要有key。
  • 按producer:來自於相同producer的訊息有序,路由策略為SinglePartition, 且每條訊息都沒有key。
1.7.1 HashingScheme

HashingScheme 是代表一組標準雜湊函式的列舉,為一個指定訊息選擇分割槽時使用。
有兩種可用的雜湊函式:JavaStringHash 和Murmur332Hash,producer 的預設hash函式是JavaStringHash。請注意,當producer可能來自於不同語言客戶端時,JavaStringHash是不起作用的。建議使用Murmur332Hash。

1.8 非持久topic

預設情況下,Pulsar儲存所有沒有確認的訊息到多個BookKeeper的bookies中(儲存節點)。持久topic的訊息資料可以在broker重啟或者訂閱者出問題的情況下存活下來。 因此,永續性topic上的訊息資料可以在 broker 重啟和訂閱者故障轉移之後繼續存在。
但是,Pulsar還支援非永續性topic,這些topic的訊息從不持久化儲存到磁碟,只存在於記憶體中。 Pulsar也提供了非持久topic。非持久topic的訊息不會被儲存在硬碟上,只存活於記憶體中。當使用非持久topic分發時,關掉Pulsar的broker或者關閉訂閱者,此topic( non-persistent))上所有的瞬時訊息都會丟失,意味著客戶端可能會遇到訊息缺失。
非永續性topic具有這種形式的名稱(注意名稱中的 non-persistent):

non-persistent://tenant/namespace/topic

非持久topic中,broker會立即釋出訊息給所有連線的訂閱者,而不會在BookKeeper中儲存。 如果有一個訂閱者斷開連線,broker將無法重發這些瞬時訊息,訂閱者將永遠也不能收到這些訊息了。 去掉持久化儲存的步驟,在某些情況下,使得非持久topic的訊息比持久topic稍微變快。但是同時,Pulsar的一些核心優勢也喪失掉了。

非持久topic,訊息資料僅存活在記憶體。 如果broker掛掉或者因其他情況不能從記憶體取到,你的訊息資料就可能丟失。 只有在真的確信你的使用場景符合,並且你可以忍受時,才可去使用非持久topic。

預設非持久topic在broker上是開啟的。 你可以通過broker的配置關閉。 你可以通過使用pulsar-admin-topics介面管理非持久topic。

1.8.1 效能

非持久訊息通常比持久訊息更快,因為broker無須持久化訊息,當訊息被分發給所有訂閱者時,會立即傳送ack給producer。 非持久topic讓producer有更低的釋出延遲。

1.8.2 客戶端API

Producer和consumer連線持久topic和連線到非持久topic的方式是一樣的。非持久的區別在於,topic的名稱必須以non-persistent開頭。 三種訂閱模式--exclusive,shared,failover對於非持久topic都是支援的。
下面是一個非持久topic的java consumer例子:

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build();
String npTopic = "non-persistent://public/default/my-topic"; //這裡表明是非持久化
String subscriptionName = "my-subscription-name";
 
Consumer<byte[]> consumer = client.newConsumer()
    .topic(npTopic)
    .subscriptionName(subscriptionName)
    .subscribe();

這裡還有一個非持久topic的java producer例子:

Producer<byte[]> producer = client.newProducer()
            .topic(npTopic)
            .create();

1.9 訊息保留和到期(retention and expiry)

Pulsar broker預設如下:

  • 立即刪除所有已經被cunsumer確認過的的訊息
  • 以訊息backlog的形式,持久儲存所有的未被確認訊息

Pulsar有兩個特性,讓你可以覆蓋上面的預設行為:

  • 訊息存留讓你可以儲存consumer確認過的訊息
  • 訊息過期讓你可以給未被確認的訊息設定存活時長(TTL) 所有訊息保留和到期都在namespace級別進行管理。有關操作方法,請參閱Message retention and expiry cookbook。
    下圖說明了這兩種概念:

    圖中第一個是訊息存留,存留規則會被用於某namespace下所有的topic,指明哪些訊息會被持久儲存,即使已經被確認過。 沒有被留存規則覆蓋的訊息將會被刪除。 沒有留存規則的話,所有被確認的訊息都會被刪除。
    圖中第二個是訊息過期,有些訊息即使還沒有被確認,也被刪除掉了。因為根據設定在namespace上的TTL,他們已經過期了。(例如,TTL為5分鐘,過了十分鐘訊息還沒被確認)

1.10 重複資料消除(Message deduplication)

當訊息被Pulsar持久化多於一次的時候,訊息就會重複。 訊息去重是Pulsar可選的特性,阻止不必要的訊息重複,每條訊息僅處理一次,即使訊息被接收多次。
下圖說明了禁用和啟用重複資料消除的情況:

上圖第一個場景中,訊息去重被關閉。 Producer釋出訊息1到一個topic,訊息到達broker後,被持久化到BookKeeper。 然後producer又傳送了訊息1(可能因為某些重試邏輯),然後訊息被接收後又持久化在BookKeeper,這意味著訊息重複發生了。
在第二個場景中,producer傳送了訊息1,訊息被broker接收然後持久化,和第一個場景是一樣的。 當producer再次傳送訊息時,broker知道已經收到個訊息1,所以不會再持久化訊息1。

訊息重複資料消除是在namespace級別處理的。

1.10.1 生產者冪等

訊息去重的另外一種方法是確保每條訊息僅生產一次。 這種方法通常被叫做生產者冪等。 這種方式的缺點是,把訊息去重的工作推給了應用去做。 在Pulsar中,去重被broker處理的,這意味著你不需要修改你的客戶端程式碼。 你只需要做一些管理上的變化(參考Managing message deduplication )。

1.10.2 去重和實際一次語義

訊息去重,使Pulsar成為與流處理引擎(SPE)或者其他尋求實際一次處理語義的系統連線的完美訊息系統。 訊息系統若不提供自動訊息去重,則需要SPE或者其他系統保證去重。這意味著嚴格的訊息順序來自於讓程式承擔額外的去重工作。 使用Pulsar,嚴格的順序保證不會帶來任何應用層面的代價。