訊息佇列-ActiveMQ學習筆記(三)-釋出-訂閱訊息模式實現
阿新 • • 發佈:2019-02-07
釋出-訂閱訊息模式與點對點模式類似,只不過在session建立訊息佇列時,由session.createQuene()變為session.createTopic()。
訊息釋出者程式碼:
package com.feiyang.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 訊息釋出者 * * @author MCL * */ public class JMSProducer { private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER;// 預設的連線的使用者名稱 private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;// 預設的連線密碼 private static final String BROKEURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;// 預設的連線地址 private static final int SENDNUM = 10;// 傳送訊息的數量 public static void main(String[] args) { ConnectionFactory connectionFactory;// 連線工廠 Connection connection = null;// 連線 Session session;// 會話 傳送或者接受訊息的執行緒 Destination destination;// 訊息的目的地 MessageProducer messageProducer;// 訊息生產者 // 例項化連線工廠 connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL); try { // 通過連線工廠獲取連線 connection = connectionFactory.createConnection(); // 啟動連線 connection.start(); session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("FirstTopic");// 建立訊息佇列 messageProducer = session.createProducer(destination);// 建立訊息生產者 sendMessage(session, messageProducer); // 由於設定新增事務,這裡需要使用提交才能將資料傳送出去 session.commit(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } // 傳送訊息 public static void sendMessage(Session session, MessageProducer messageProducer) { for (int i = 0; i < JMSProducer.SENDNUM; i++) { try { TextMessage message = session.createTextMessage("Active MQ傳送訊息" + i); System.out.println("釋出訊息:Active MQ傳送訊息"); messageProducer.send(message); } catch (JMSException e) { // TODO Auto-generated catc h block e.printStackTrace(); } } } }
訊息訂閱者程式碼:
package com.feiyang.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 訊息訂閱者一 * @author MCL * */ public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 預設的連線的使用者名稱 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 預設的連線密碼 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;// 預設的連線地址 public static void main(String[] args) { ConnectionFactory connectionFactory; // 連線工廠 Connection connection = null; // 連線 Session session; // 會話 接受或者傳送訊息的執行緒 Destination destination; // 訊息的目的地 MessageConsumer messageConsumer; // 訊息的消費者 connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try { // 通過連線工廠獲取連線 connection=connectionFactory.createConnection(); // 通過連線工廠獲取連線 connection.start(); // 啟動連線 session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立Session destination=session.createTopic("FirstTopic"); messageConsumer=session.createConsumer(destination); // 建立訊息消費者 messageConsumer.setMessageListener(new Listener()); // 註冊訊息監聽 } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
監聽器程式碼:
package com.feiyang.activemq2; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 訊息監聽器 * @author MCL * */ public class Listener implements MessageListener{ @Override public void onMessage(Message message) { // TODO Auto-generated method stub try { System.out.println("訊息訂閱者一收到的訊息:"+((TextMessage)message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
我們可以定義多個訊息訂閱者及其監聽器。這裡定義了兩個訂閱者,由於程式碼相似所以只貼上一份。
由於釋出-訂閱模型的關係,需要先進行訂閱後,才能接收發布者的訊息。
先啟動 訂閱者一和訂閱者二的執行緒,然後用釋出者 釋出訊息。開啟後臺管理介面,點選Topics
有上述圖片可以看出,FirstTopic中,訊息釋出者釋出了10條資訊,並由兩個訂閱者進行消費,每人消費10條資訊。