1. 程式人生 > >activemq訊息機制和持久化介紹

activemq訊息機制和持久化介紹

前面一節簡單學習了activemq的使用,我們知道activemq的使用方式非常簡單有如下幾個步驟:

  1. 建立連線工廠
  2. 建立連線
  3. 建立會話
  4. 建立目的地
  5. 建立生產者或消費者
  6. 生產或消費訊息
  7. 關閉生產或消費者、關閉會話、關閉連線

前面我們的例項程式碼中已經按照這個步驟完成了P2P和Pub/Sub模式的訊息傳送和接收。那麼這一節我們就針對他的訊息傳播機制和持久化方式做一個簡單的學習。在會用的同時我們也需要理解一些基本的概念,這樣才不至於在出錯後無從下手。

1.activemq伺服器工作模型

我們先看一下訊息傳送的時序圖:

這裡寫圖片描述

ConnectionFactory 物件建立一個連線工廠,訊息的傳送和接受服務均由此進行;

ConnectionFactory 建立一個活動Connection作為當前使用的連線;

Session 是一個用於生成和使用訊息的單執行緒上下文,它用於建立傳送的生產者和接收訊息的消費者,併為所傳送的訊息定義傳送順序。會話通過大量確認選項或通過事務來支援可靠傳送。

戶端使用 MessageProducer 向指定的物理目標傳送訊息,生產者可指定一個預設傳送模式(永續性訊息與非永續性訊息)、優先順序和有效期值,以控制生產者向物理目標傳送的所有訊息;

消費者可以支援同步或非同步訊息接收。非同步使用可通過向消費者註冊 MessageListener 來實現。當會話執行緒呼叫 MessageListener 物件的 onMessage 方法時,客戶端將使用訊息。

2.ActiveMQ訊息傳送模型

ActiveMQ 支援兩種訊息傳送模型:PTP(即點對點模型)和Pub/Sub(即釋出 /訂閱模型),前面我們已經講過,在此就不贅述。

3.訊息選擇器

ActiveMQ提供了一種機制,使用它,訊息服務可根據訊息選擇器中的標準來執行訊息過濾。生產者可在訊息中放入應用程式特有的屬性,而消費者可使用基於這些屬性的選擇標準來表明對訊息是否感興趣。

訊息選擇器是根據 header 和 properties 允許客戶端選擇性的制定需要接收的訊息,訊息選擇器是無法利用 訊息主體(Body)進行過濾的。無論你的訊息主題是什麼型別, 文字、或者物件、或者鍵值對。下面我們講一下訊息選擇器的語法以及使用規範:

可接收的型別包括:byte,int,double,boolean,String;

屬性識別符號定義:
變數名與java定義一樣; 

要麼在heads中定義 要麼在 properties中定義,如果在sender中是在heads中定義而receiver中卻從properties中尋找的話,找不到的情況下他是不會自動去heads中尋找的,而是會返回null;

根據不同型別的變數選擇不同的方法:

message.setIntProperty("test",14);

那麼在接收端可以對該變數進行攔截:

session.createConsumer(destination,"test > 14");

屬性標誌符是區分大小寫的;

    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
攔截器中的部分表示方式:
可以是條件表示式

可以是算術表示式

可以是比較運算和邏輯運算組成的表示式

    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

支援 () 左右括號;

支援邏輯運算的優先順序表示式 例如: NOT , AND , OR;

比較運算子有: = , > , >= , < , <= , <> (not equal);

eg:

識別符號是null
"prop_name IS NULL"
識別符號非空 not null
"prop_name IS NOT NULL"
"age BETWEEN 15 AND 19" is equivalent to "age >= 15 AND age <= 19"
"Country NOT IN (' UK', 'US', 'France') "

    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

程式碼很簡單,只需要在Sender端做如下改寫:

TextMessage message = session.createTextMessage();
message.setIntProperty("test",14);
message.setText("test");
    
  • 1
  • 2
  • 3

Receiver端:

consumer = session.createConsumer(destination,"test > 14");
    
  • 1

對傳送端的特定字元做一個判斷符合條件即被攔截

4.訊息確認機制

jms訊息只有在被確認之後才認為成功消費了這條訊息。訊息的成功消費通常包括三個步驟:

(1)client接收訊息

(2)client處理訊息

(3)訊息被確認(也就是client給一個確認訊息)

在事務性會話中當一個事務被提交的時候,確認自動發生,和應答模式沒關係,這個值可以隨便寫。(這裡多提一句非同步訊息接收中不能使用事務性會話)。

在非事務性會話中訊息何時被確認取決於建立的session中設定的訊息應答模式(acknowledge model)該引數有三個值:

  1. Session.AUTO_ACKNOWLEDGE:當client端成功的從receive方法或從onMessage(Message message) 方法返回的時候,會話自動確認client收到訊息。
  2. Session.CLIENT_ACKNOWLEDGE: 客戶單通過呼叫acknowledge方法來確認客戶端收到訊息。但需要注意在這種應答模式下,確認是在會話層上進行的,確認一個被消費的訊息將自動確認所有已消費的其他訊息。比如一個消費者已經消費了10條訊息,然後確認了第5條訊息被消費,則這10條都被確認消費了。、

acknowledge()通知方法是在Message物件上,同步接收,呼叫acknowledge()方法進行確認如下所示:

consumer = session.createConsumer(queue);
Message message = consumer.receive();
message.acknowledge();

非同步接受,呼叫acknowledge()方法進行確認:

consumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            String value = textMessage.getText();
            System.out.println("value: " + value);
            message.acknowledge(); //訊息消費確認通知
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});

    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3.Session.DUPS_ACKNOWLEDGE:不是必須簽收,訊息可能會重複傳送。在第二次重新傳送訊息的時候,訊息頭的JmsDelivered會被置為true標示當前訊息已經傳送過一次,客戶端需要進行訊息的重複處理控制。

5. 持久化訊息

JMS 支援以下兩種訊息提交模式:

5.1 ERSISTENT 持久訊息

是activemq預設的傳送方式,此方式下的訊息在配合activemq.xml中配置的訊息儲存方式,會被儲存在特定的地方,直到有消費者將訊息消費或者訊息過期進入DLQ佇列,訊息生命週期才會結束。此模式下可以保證訊息只會被成功傳送一次和成功使用一次,訊息具有可靠性。在訊息傳遞到目標消費者,在消費者沒有成功應答前,訊息不會丟失。所以很自然的,需要一個地方來永續性儲存。如果訊息消費者在進行消費過程發生失敗,則訊息會被再次投遞。

DeliveryMode.PERSISTENT 指示JMS provider持久儲存訊息,以保證訊息不會因為JMS provider的失敗而丟失。 訊息持久化在硬碟中,ActiveMQ持久化有三種方式:AMQ、KahaDB、JDBC。

AMQ

AMQ是一種檔案儲存形式,它具有寫入速度快和容易恢復的特點。訊息儲存在一個個檔案中,檔案的預設大小為32M,如果一條訊息的大小超過了32M,那麼這個值必須設定大一點。當一個儲存檔案中的訊息已經全部被消費,那麼這個檔案將被標識為可刪除,在下一個清除階段,這個檔案被刪除。AMQ適用於ActiveMQ5.3之前的版本。

KahaDB

KahaDB是基於檔案的本地資料庫儲存形式,雖然沒有AMQ的速度快,但是它具有強擴充套件性,恢復的時間比AMQ短,從5.4版本之後KahaDB做為預設的持久化方式。

JDBC

可以將訊息儲存到資料庫中,例如:Mysql、SQL Server、Oracle、DB2。

具體使用方式大家下去查一下,限於篇幅在此就不做太詳細的介紹。

5.2 NON_PERSISTENT 非持久訊息

非持久的訊息適用於不重要的,可以接受訊息丟失的哪一類訊息,這種訊息只會被投遞一次,訊息不會在永續性儲存中儲存,也不會保證訊息丟失後的重新投遞。

DeliveryMode.NON_PERSISTENT 不要求JMS provider持久儲存訊息,訊息存放在記憶體中,讀寫速度快,在JMS服務停止後訊息會消失,沒有持久化到硬碟。

6. ActiveMQ訊息過期設定

允許訊息過期 。預設情況下,訊息永不會過期。如果訊息在特定週期內失去意義,那麼可以設定過期時間。
有兩種方法設定訊息的過期時間,時間單位為毫秒:

  1. 使用 setTimeToLive 方法為所有的訊息設定過期時間;
  2. 使用 send 方法為每一條訊息設定過期時間。

訊息過期時間,send 方法中的 timeToLive 值加上傳送時刻的 GMT 時間值。如果 timeToLive 值等於零,則 JMSExpiration 被設為零,表示該訊息永不過期。如果傳送後,在訊息過期時間之後訊息還沒有被髮送到目的地,則該訊息被清除。

這一節對activemq的訊息機制和持久化我們就簡單介紹到這裡,後面我們結合具體的工程來把它應用到生產中,再來講解如何持久化如何高效的應用於生產環境。

        <link rel="stylesheet" href="https://csdnimg.cn/release/phoenix/template/css/markdown_views-ea0013b516.css">
            </div>

前面一節簡單學習了activemq的使用,我們知道activemq的使用方式非常簡單有如下幾個步驟:

  1. 建立連線工廠
  2. 建立連線
  3. 建立會話
  4. 建立目的地
  5. 建立生產者或消費者
  6. 生產或消費訊息
  7. 關閉生產或消費者、關閉會話、關閉連線

前面我們的例項程式碼中已經按照這個步驟完成了P2P和Pub/Sub模式的訊息傳送和接收。那麼這一節我們就針對他的訊息傳播機制和持久化方式做一個簡單的學習。在會用的同時我們也需要理解一些基本的概念,這樣才不至於在出錯後無從下手。

1.activemq伺服器工作模型

我們先看一下訊息傳送的時序圖:

這裡寫圖片描述

ConnectionFactory 物件建立一個連線工廠,訊息的傳送和接受服務均由此進行;

ConnectionFactory 建立一個活動Connection作為當前使用的連線;

Session 是一個用於生成和使用訊息的單執行緒上下文,它用於建立傳送的生產者和接收訊息的消費者,併為所傳送的訊息定義傳送順序。會話通過大量確認選項或通過事務來支援可靠傳送。

戶端使用 MessageProducer 向指定的物理目標傳送訊息,生產者可指定一個預設傳送模式(永續性訊息與非永續性訊息)、優先順序和有效期值,以控制生產者向物理目標傳送的所有訊息;

消費者可以支援同步或非同步訊息接收。非同步使用可通過向消費者註冊 MessageListener 來實現。當會話執行緒呼叫 MessageListener 物件的 onMessage 方法時,客戶端將使用訊息。

2.ActiveMQ訊息傳送模型

ActiveMQ 支援兩種訊息傳送模型:PTP(即點對點模型)和Pub/Sub(即釋出 /訂閱模型),前面我們已經講過,在此就不贅述。

3.訊息選擇器

ActiveMQ提供了一種機制,使用它,訊息服務可根據訊息選擇器中的標準來執行訊息過濾。生產者可在訊息中放入應用程式特有的屬性,而消費者可使用基於這些屬性的選擇標準來表明對訊息是否感興趣。

訊息選擇器是根據 header 和 properties 允許客戶端選擇性的制定需要接收的訊息,訊息選擇器是無法利用 訊息主體(Body)進行過濾的。無論你的訊息主題是什麼型別, 文字、或者物件、或者鍵值對。下面我們講一下訊息選擇器的語法以及使用規範:

可接收的型別包括:byte,int,double,boolean,String;

屬性識別符號定義:
變數名與java定義一樣; 

要麼在heads中定義 要麼在 properties中定義,如果在sender中是在heads中定義而receiver中卻從properties中尋找的話,找不到的情況下他是不會自動去heads中尋找的,而是會返回null;

根據不同型別的變數選擇不同的方法:

message.setIntProperty("test",14);

那麼在接收端可以對該變數進行攔截:

session.createConsumer(destination,"test > 14");

屬性標誌符是區分大小寫的;

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
攔截器中的部分表示方式:
可以是條件表示式

可以是算術表示式

可以是比較運算和邏輯運算組成的表示式

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

支援 () 左右括號;

支援邏輯運算的優先順序表示式 例如: NOT , AND , OR;

比較運算子有: = , > , >= , < , <= , <> (not equal);

eg:

識別符號是null
"prop_name IS NULL"
識別符號非空 not null
"prop_name IS NOT NULL"
"age BETWEEN 15 AND 19" is equivalent to "age >= 15 AND age <= 19"
"Country NOT IN (' UK', 'US', 'France') "

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

程式碼很簡單,只需要在Sender端做如下改寫:

TextMessage message = session.createTextMessage();
message.setIntProperty("test",14);
message.setText("test");
  
  • 1
  • 2
  • 3

Receiver端:

consumer = session.createConsumer(destination,"test > 14");
  
  • 1

對傳送端的特定字元做一個判斷符合條件即被攔截

4.訊息確認機制

jms訊息只有在被確認之後才認為成功消費了這條訊息。訊息的成功消費通常包括三個步驟:

(1)client接收訊息

(2)client處理訊息

(3)訊息被確認(也就是client給一個確認訊息)

在事務性會話中當一個事務被提交的時候,確認自動發生,和應答模式沒關係,這個值可以隨便寫。(這裡多提一句非同步訊息接收中不能使用事務性會話)。

在非事務性會話中訊息何時被確認取決於建立的session中設定的訊息應答模式(acknowledge model)該引數有三個值:

  1. Session.AUTO_ACKNOWLEDGE:當client端成功的從receive方法或從onMessage(Message message) 方法返回的時候,會話自動確認client收到訊息。
  2. Session.CLIENT_ACKNOWLEDGE: 客戶單通過呼叫acknowledge方法來確認客戶端收到訊息。但需要注意在這種應答模式下,確認是在會話層上進行的,確認一個被消費的訊息將自動確認所有已消費的其他訊息。比如一個消費者已經消費了10條訊息,然後確認了第5條訊息被消費,則這10條都被確認消費了。、

acknowledge()通知方法是在Message物件上,同步接收,呼叫acknowledge()方法進行確認如下所示:

consumer = session.createConsumer(queue);
Message message = consumer.receive();
message.acknowledge();

非同步接受,呼叫acknowledge()方法進行確認:

consumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            String value = textMessage.getText();
            System.out.println("value: " + value);
            message.acknowledge(); //訊息消費確認通知
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});

  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

3.Session.DUPS_ACKNOWLEDGE:不是必須簽收,訊息可能會重複傳送。在第二次重新傳送訊息的時候,訊息頭的JmsDelivered會被置為true標示當前訊息已經傳送過一次,客戶端需要進行訊息的重複處理控制。

5. 持久化訊息

JMS 支援以下兩種訊息提交模式:

5.1 ERSISTENT 持久訊息

是activemq預設的傳送方式,此方式下的訊息在配合activemq.xml中配置的訊息儲存方式,會被儲存在特定的地方,直到有消費者將訊息消費或者訊息過期進入DLQ佇列,訊息生命週期才會結束。此模式下可以保證訊息只會被成功傳送一次和成功使用一次,訊息具有可靠性。在訊息傳遞到目標消費者,在消費者沒有成功應答前,訊息不會丟失。所以很自然的,需要一個地方來永續性儲存。如果訊息消費者在進行消費過程發生失敗,則訊息會被再次投遞。

DeliveryMode.PERSISTENT 指示JMS provider持久儲存訊息,以保證訊息不會因為JMS provider的失敗而丟失。 訊息持久化在硬碟中,ActiveMQ持久化有三種方式:AMQ、KahaDB、JDBC。

AMQ

AMQ是一種檔案儲存形式,它具有寫入速度快和容易恢復的特點。訊息儲存在一個個檔案中,檔案的預設大小為32M,如果一條訊息的大小超過了32M,那麼這個值必須設定大一點。當一個儲存檔案中的訊息已經全部被消費,那麼這個檔案將被標識為可刪除,在下一個清除階段,這個檔案被刪除。AMQ適用於ActiveMQ5.3之前的版本。

KahaDB

KahaDB是基於檔案的本地資料庫儲存形式,雖然沒有AMQ的速度快,但是它具有強擴充套件性,恢復的時間比AMQ短,從5.4版本之後KahaDB做為預設的持久化方式。

JDBC

可以將訊息儲存到資料庫中,例如:Mysql、SQL Server、Oracle、DB2。

具體使用方式大家下去查一下,限於篇幅在此就不做太詳細的介紹。

5.2 NON_PERSISTENT 非持久訊息

非持久的訊息適用於不重要的,可以接受訊息丟失的哪一類訊息,這種訊息只會被投遞一次,訊息不會在永續性儲存中儲存,也不會保證訊息丟失後的重新投遞。

DeliveryMode.NON_PERSISTENT 不要求JMS provider持久儲存訊息,訊息存放在記憶體中,讀寫速度快,在JMS服務停止後訊息會消失,沒有持久化到硬碟。

6. ActiveMQ訊息過期設定

允許訊息過期 。預設情況下,訊息永不會過期。如果訊息在特定週期內失去意義,那麼可以設定過期時間。
有兩種方法設定訊息的過期時間,時間單位為毫秒:

  1. 使用 setTimeToLive 方法為所有的訊息設定過期時間;
  2. 使用 send 方法為每一條訊息設定過期時間。

訊息過期時間,send 方法中的 timeToLive 值加上傳送時刻的 GMT 時間值。如果 timeToLive 值等於零,則 JMSExpiration 被設為零,表示該訊息永不過期。如果傳送後,在訊息過期時間之後訊息還沒有被髮送到目的地,則該訊息被清除。

這一節對activemq的訊息機制和持久化我們就簡單介紹到這裡,後面我們結合具體的工程來把它應用到生產中,再來講解如何持久化如何高效的應用於生產環境。

        <link rel="stylesheet" href="https://csdnimg.cn/release/phoenix/template/css/markdown_views-ea0013b516.css">
            </div>