1. 程式人生 > >Spring整合ActiveMQ(topic 詳細配置步驟)

Spring整合ActiveMQ(topic 詳細配置步驟)

maven工程新增依賴

<dependencies>
   <!-- Spring -->
<dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-web</artifactId>
   </dependency>
   <!-- ActiveMQ依賴 -->
<dependency>
      <groupId>org.apache.activemq</groupId
> <artifactId>activemq-client</artifactId> </dependency> <!-- 加入spring-jms依賴 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> </dependencies>

一、建立生產者: applicationContext-activemq-topic-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!-- 真正可以產生Connection
ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://xxx.xxx.xxx.xxx:61616"/> </bean> <!-- Spring用於管理真正的ConnectionFactoryConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的可以產生JMS ConnectionConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 --> <property name="connectionFactory" ref="connectionFactory"/> <!-- 訂閱模式 --> <property name="pubSubDomain" value="true"/> </bean> <!--訂閱模式,一對多 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic.item"/> </bean> </beans>

二、建立消費者1    applicationContext-activemq-topic-consumer1.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!-- 真正可以產生ConnectionConnectionFactory,由對應的 JMS服務廠商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://xxx.xxx.xxx.xxx:61616"/>
    </bean>

    <!-- Spring用於管理真正的ConnectionFactoryConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的可以產生JMS ConnectionConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        <!-- 消費者標識 -->
<property name="clientId" value="topic-consumer-a"/>
    </bean>

    <!--訂閱模式,一對多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic.item"/>
    </bean>
    <!-- messageListener實現類 -->
<bean id="topicMessageListener1" class="it.hehe.spring.topic.TopicMessageListener1"/>

    <!-- 配置一個jms監聽容器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="topicDestination"/>
        <property name="messageListener" ref="topicMessageListener1"/>
        <!-- 訊息持久化 -->
<property name="subscriptionDurable" value="true"/>
        <!-- 消費者標識 -->
<property name="clientId" value="topic-consumer-a"/>
        <!-- 訂閱者標識(與消費者識別符號可以不一樣;預設為監聽器全限定名) -->
<property name="durableSubscriptionName" value="topic-consumer-a"/>
        <!-- 設定為釋出/訂閱模式 -->
<property name="pubSubDomain" value="true"/>
        <!-- 是否開啟事務 -->
<property name="sessionTransacted" value="false"/>
        <!-- 客戶端訊息確認模式名稱 -->
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/>
    </bean>

</beans>

二、建立消費者2    applicationContext-activemq-topic-consumer2.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- 真正可以產生ConnectionConnectionFactory,由對應的 JMS服務廠商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://xxx.xxx.xxx.xxx:61616"/>
    </bean>

    <!-- Spring用於管理真正的ConnectionFactoryConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的可以產生JMS ConnectionConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        <!-- 消費者標識 -->
<property name="clientId" value="topic-consumer-b"/>
    </bean>

    <!--訂閱模式,一對多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic.item"/>
    </bean>

    <!-- messageListener實現類 -->
<bean id="topicMessageListener2" class="it
.hehe.spring.topic.TopicMessageListener2"/>
<!-- 配置一個jms監聽容器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="topicDestination"/>
    <property name="messageListener" ref="topicMessageListener2"/>
    <!-- 訊息持久化 -->
<property name="subscriptionDurable" value="true"/>
    <!-- 消費者標識 -->
<property name="clientId" value="topic-consumer-b"/>
    <!-- 訂閱者標識(與消費者識別符號可以不一樣;預設為監聽器全限定名) -->
<property name="durableSubscriptionName" value="topic-consumer-b"/>
    <!-- 設定為釋出/訂閱模式 -->
<property name="pubSubDomain" value="true"/>
    <!-- 是否開啟事務 -->
<property name="sessionTransacted" value="false"/>
    <!-- 客戶端訊息確認模式名稱 -->
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/>
</bean>

三、建立消費者2監聽器類  

TopicMessageListener2.java
package cn.itcast.activemq.spring.topic;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener;

public class TopicMessageListener2 extends AbstractAdaptableMessageListener {

   @Override
public void onMessage(Message message, Session session) throws JMSException {
      // 判斷訊息型別是TextMessage
if (message instanceof TextMessage) {
         // 如果是,則進行強轉
TextMessage textMessage = (TextMessage) message;
         try {
            // 消費訊息,列印訊息內容
String text = textMessage.getText();
            System.out.println("TopicMessageListener2-消費者2訊息監聽器接收到訊息;訊息內容為:" + text);
         } catch (Exception e) {
            e.printStackTrace();
         }
         /**
          * spring的配置檔案配置監聽容器的時候如果 AcknowledgeMode配置為CLIENT_ACKNOWLEDGE的話:
* 那麼在監聽器程式碼中丟擲異常或者執行session.recover();則會將資訊重新發送6次(預設每秒發一個訊息)
* 在重發6次後訊息還是處理失敗,那麼訊息將自動到DLQ-死信佇列(Dead Letter Queue用來儲存處理失敗或者過期的訊息;
* 預設在ActiveMQ佇列裡面的名稱為:ActiveMQ.DLQ)
          */
session.recover();
         
         /**
          * 什麼時候會重發:
* Messages are redelivered to a client when any of the following occurs:
             A transacted session is used and rollback() is called.
             A transacted session is closed before commit() is called.
             A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called.
          */
}
   }
}
四、建立消費者1監聽器類  
TopicMessageListener1.java
package cn.itcast.activemq.spring.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener;

public class TopicMessageListener1 extends AbstractAdaptableMessageListener {

   @Override
public void onMessage(Message message, Session session) throws JMSException {
      // 判斷訊息型別是TextMessage
if (message instanceof TextMessage) {
         // 如果是,則進行強轉
TextMessage textMessage = (TextMessage) message;
         try {
            // 消費訊息,列印訊息內容
String text = textMessage.getText();
            System.out.println("TopicMessageListener1-消費者1 訊息監聽器接收到訊息;訊息內容為:" + text);
         } catch (Exception e) {
            e.printStackTrace();
         }
      }
   }
}

五、建立測試生產者類

Producer.java
package cn.itcast.activemq.spring.topic;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class Producer {

   public static void main(String[] args) {
      // 建立spring容器
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-activemq-topic-producer.xml");

      // spring容器中獲取JMSTemplate,這個物件是用於傳送訊息的
JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);

      //建立訊息模式
Destination destination = (Destination)context.getBean("topicDestination");
      
      // 使用JMSTemplate傳送訊息
jmsTemplate.send(destination, new MessageCreator() {

         @Override
public Message createMessage(Session session) throws JMSException {
            TextMessage textMessage = new ActiveMQTextMessage();
            textMessage.setText("---spring-topic的方式傳送。訂閱主題的名稱為:topic.item");

            System.out.println("已傳送訊息...");

            return textMessage;
         }
      });
   }
}

六、分別建立消費者測試類1、2

package cn.itcast.activemq.spring.topic;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Consumer1 {

   public static void main(String[] args) {
      // 建立spring容器
new ClassPathXmlApplicationContext("applicationContext-activemq-topic-consumer1.xml");
   }

}
package cn.itcast.activemq.spring.topic;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Consumer2 {

   public static void main(String[] args) {
      //