1. 程式人生 > >ActiveMQ釋出-訂閱訊息模式(同點對點模式的區別)

ActiveMQ釋出-訂閱訊息模式(同點對點模式的區別)

點對點與釋出訂閱最初是由JMS定義的。這兩種模式主要區別或解決的問題就是傳送到佇列的訊息能否重複消費(多訂閱)

點對點: 
訊息生產者生產訊息傳送到queue中,然後訊息消費者從queue中取出並且消費訊息。這裡要注意: 
訊息被消費以後,queue中不再有儲存,所以訊息消費者不可能消費到已經被消費的訊息。 
Queue支援存在多個消費者,但是對一個訊息而言,只會有一個消費者可以消費。 
釋出/訂閱 
訊息生產者(釋出)將訊息釋出到topic中,同時有多個訊息消費者(訂閱)消費該訊息。和點對點方式不同,釋出到topic的訊息會被所有訂閱者消費

1、訊息生產者-訊息釋出-Topic

  1. /**  
  2.  * 訊息生產者-訊息釋出者  
  3.  * @author Administrator  
  4.  *  
  5.  */  
  6. public class JMSProducer {  
  7.     private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 預設的連線使用者名稱  
  8.     private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 預設的連線密碼  
  9.     private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 預設的連線地址  
  10.     private static final int SENDNUM=10; // 傳送的訊息數量  
  11.     public static void main(String[] args) {  
  12.         ConnectionFactory connectionFactory; // 連線工廠  
  13.         Connection connection = null; // 連線  
  14.         Session session; // 會話 接受或者傳送訊息的執行緒  
  15.         Destination destination; // 訊息的目的地  
  16.         MessageProducer messageProducer; // 訊息生產者         
  17.         // 例項化連線工廠  
  18.         connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);  
  19.         try {  
  20.             connection=connectionFactory.createConnection(); // 通過連線工廠獲取連線  
  21.             connection.start(); // 啟動連線  
  22.             session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立Session  
  23.             // destination=session.createQueue("FirstQueue1"); // 建立訊息佇列  
  24.             destination=session.createTopic("FirstTopic1");  
  25.             messageProducer=session.createProducer(destination); // 建立訊息生產者  
  26.             sendMessage(session, messageProducer); // 傳送訊息  
  27.             session.commit();  
  28.         } catch (Exception e) {  
  29.             // TODO Auto-generated catch block  
  30.             e.printStackTrace();  
  31.         } finally{  
  32.             if(connection!=null){  
  33.                 try {  
  34.                     connection.close();  
  35.                 } catch (JMSException e) {  
  36.                     // TODO Auto-generated catch block  
  37.                     e.printStackTrace();  
  38.                 }  
  39.             }  
  40.         }  
  41.     }  
  42.     /**  
  43.      * 傳送訊息  
  44.      * @param session  
  45.      * @param messageProducer  
  46.      * @throws Exception  
  47.      */  
  48.     public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{  
  49.         for(int i=0;i<JMSProducer.SENDNUM;i++){  
  50.             TextMessage message=session.createTextMessage("ActiveMQ 傳送的訊息"+i);  
  51.             System.out.println("傳送訊息:"+"ActiveMQ 釋出的訊息"+i);  
  52.             messageProducer.send(message);  
  53.         }  
  54.     }  
  55. }  
2、多個訊息訂閱者-訊息消費者

訊息訂閱者一

  1. /**  
  2.  * 訊息消費者-訊息訂閱者一  
  3.  * @author Administrator  
  4.  *  
  5.  */  
  6. public class JMSConsumer {  
  7.     private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 預設的連線使用者名稱  
  8.     private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 預設的連線密碼  
  9.     private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 預設的連線地址  
  10.     public static void main(String[] args) {  
  11.         ConnectionFactory connectionFactory; // 連線工廠  
  12.         Connection connection = null; // 連線  
  13.         Session session; // 會話 接受或者傳送訊息的執行緒  
  14.         Destination destination; // 訊息的目的地  
  15.         MessageConsumer messageConsumer; // 訊息的消費者  
  16.         // 例項化連線工廠  
  17.         connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);  
  18.         try {  
  19.             connection=connectionFactory.createConnection();  // 通過連線工廠獲取連線  
  20.             connection.start(); // 啟動連線  
  21.             session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立Session  
  22.             // destination=session.createQueue("FirstQueue1");  // 建立連線的訊息佇列  
  23.             destination=session.createTopic("FirstTopic1");  
  24.             messageConsumer=session.createConsumer(destination); // 建立訊息消費者  
  25.             messageConsumer.setMessageListener(new Listener()); // 註冊訊息監聽  
  26.         } catch (JMSException e) {  
  27.             // TODO Auto-generated catch block  
  28.             e.printStackTrace();  
  29.         }   
  30.     }  
  31. }  

訊息訂閱者二

  1. /**  
  2.  * 訊息消費者-訊息訂閱者二  
  3.  * @author Administrator  
  4.  *  
  5.  */  
  6. public class JMSConsumer2 {  
  7.     private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 預設的連線使用者名稱  
  8.     private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 預設的連線密碼  
  9.     private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 預設的連線地址  
  10.     public static void main(String[] args) {  
  11.         ConnectionFactory connectionFactory; // 連線工廠  
  12.         Connection connection = null; // 連線  
  13.         Session session; // 會話 接受或者傳送訊息的執行緒  
  14.         Destination destination; // 訊息的目的地  
  15.         MessageConsumer messageConsumer; // 訊息的消費者  
  16.         // 例項化連線工廠  
  17.         connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);  
  18.         try {  
  19.             connection=connectionFactory.createConnection();  // 通過連線工廠獲取連線  
  20.             connection.start(); // 啟動連線  
  21.             session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立Session  
  22.             // destination=session.createQueue("FirstQueue1");  // 建立連線的訊息佇列  
  23.             destination=session.createTopic("FirstTopic1");  
  24.             messageConsumer=session.createConsumer(destination); // 建立訊息消費者  
  25.             messageConsumer.setMessageListener(new Listener2()); // 註冊訊息監聽  
  26.         } catch (JMSException e) {  
  27.             // TODO Auto-generated catch block  
  28.             e.printStackTrace();  
  29.         }   
  30.     }  
  31. }  
     兩個Linsner用於列印不同的標識資訊,故省略。

     注:釋出訂閱模式適用於1個訊息生產者,多個消費者場景,首先啟動訊息訂閱方,在訊息釋出方開始執行後,接收該訊息進行處理。在ActiveMQ管理介面會動態跟進訊息產生-消費(入隊、出隊)情況;以及生產者個數,消費者個數。

http://blog.csdn.net/Daybreak1209/article/details/51672277

http://blog.csdn.net/zbw18297786698/article/details/53000605