1. 程式人生 > 程式設計 >詳解Springboot整合ActiveMQ(Queue和Topic兩種模式)

詳解Springboot整合ActiveMQ(Queue和Topic兩種模式)

寫在前面: 從2018年底開始學習SpringBoot,也用SpringBoot寫過一些專案。這裡對學習Springboot的一些知識總結記錄一下。如果你也在學習SpringBoot,可以關注我,一起學習,一起進步。

ActiveMQ簡介

1、ActiveMQ簡介

Apache ActiveMQ是Apache軟體基金會所研發的開放原始碼訊息中介軟體;由於ActiveMQ是一個純Java程式,因此只需要作業系統支援Java虛擬機器,ActiveMQ便可執行。

2、ActiveMQ下載

下載地址:http://activemq.apache.org/components/classic/download/

詳解Springboot整合ActiveMQ(Queue和Topic兩種模式)

下載完成後解壓雙擊activemq.bat檔案開啟(不用安裝,直接使用),目錄和開啟後效果如下:

詳解Springboot整合ActiveMQ(Queue和Topic兩種模式)

詳解Springboot整合ActiveMQ(Queue和Topic兩種模式)

執行後,瀏覽器訪問http://localhost:8161/地址進入一下介面。

詳解Springboot整合ActiveMQ(Queue和Topic兩種模式)

點選Manage ActiveMQ broker登入到ActiveMQ管理頁面,預設賬號和密碼都是admin。管理頁面如下:

詳解Springboot整合ActiveMQ(Queue和Topic兩種模式)

SpringBoot整合ActiveMQ

1、新建SpringBoot專案

新建Springboot專案,新增對應的依賴。專案完整的pom.xml檔案如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.2.5.RELEASE</version>
  <relativePath/> <!-- lookup parent from repository -->
 </parent>
 <groupId>com.mcy</groupId>
 <artifactId>springboot-mq</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <name>springboot-mq</name>
 <description>Demo project for Spring Boot</description>

 <properties>
  <java.version>1.8</java.version>
 </properties>

 <dependencies>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency>

  <!--Activemq依賴-->
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
  </dependency>

  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
   <exclusions>
    <exclusion>
     <groupId>org.junit.vintage</groupId>
     <artifactId>junit-vintage-engine</artifactId>
    </exclusion>
   </exclusions>
  </dependency>
 </dependencies>

 <build>
  <plugins>
   <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
   </plugin>
  </plugins>
 </build>
</project>

2、專案結構

3、相關配置資訊

在application.properties類中新增ActiveMQ相關的配置資訊

server.port=8080
server.servlet.context-path=/mq

#MQ伺服器地址
spring.activemq.broker-url=tcp://localhost:61616
#使用者名稱
spring.activemq.user=admin
#密碼
spring.activemq.password=admin
#設定是Queue佇列還是Topic,false為Queue,true為Topic,預設false-Queue
spring.jms.pub-sub-domain=false
#spring.jms.pub-sub-domain=true

#變數,定義佇列和topic的名稱
myqueue: activemq-queue
mytopic: activemq-topic

4、ActiveMQ配置類

ActiveMQ配置類ConfigBean,配置了Queue佇列和topic兩種模式,程式碼如下:

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.stereotype.Component;
import javax.jms.Topic;

/**
 1. MQ配置類
 */
@Component
@EnableJms
public class ConfigBean {
 @Value("${myqueue}")
 private String myQueue;
 @Value("${mytopic}")
 private String topicName;

 //佇列
 @Bean
 public ActiveMQQueue queue(){
  return new ActiveMQQueue(myQueue);
 }

 //topic
 @Bean
 public Topic topic(){
  return new ActiveMQTopic(topicName);
 }
}

Queue佇列模式

佇列模式即點對點傳輸。
點對點訊息傳遞域的特點如下:

每個訊息只能有一個消費者,類似於1對1的關係。好比個人快遞自己領自己的。

訊息的生產者和消費者之間沒有時間上的相關性。無論消費者在生產者傳送訊息的時候是否處於執行狀態,消費者都可以提取訊息。好比我們的傳送簡訊,傳送者傳送後不見得接收者會即收即看。

訊息被消費後佇列中不會再儲存,所以消費者不會消費到已經被消費掉的訊息。

1、佇列生產者

QueueProducerController類為佇列生產者控制器,主要向訊息佇列中傳送訊息。程式碼如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.Queue;

/*
 * 佇列訊息生產者
 */
@RestController
public class QueueProducerController {
 @Autowired
 private JmsMessagingTemplate jmsMessagingTemplate;

 @Autowired
 private Queue queue;

 /*
  * 訊息生產者
  */
 @RequestMapping("/sendmsg")
 public void sendmsg(String msg) {
  System.out.println("傳送訊息到佇列:" + msg);
  // 指定訊息傳送的目的地及內容
  this.jmsMessagingTemplate.convertAndSend(this.queue,msg);
 }
}

2、佇列消費者

QueueConsumerController類為佇列消費者控制器,具體程式碼如下:

import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.web.bind.annotation.RestController;

/*
 1. 佇列queue消費者控制器
 */
@RestController
public class QueueConsumerController {
 /*
  * 消費者接收訊息
  */
 @JmsListener(destination="${myqueue}")
 public void readActiveQueue(String message) {
  System.out.println("接受到:" + message);
 }
}

3、測試效果

執行專案在瀏覽器中訪問http://localhost:8080/mq/sendmsg?msg=123。向訊息佇列中傳送123。控制檯輸出效果:

詳解Springboot整合ActiveMQ(Queue和Topic兩種模式)

ActiveMQ控制檯顯示:

詳解Springboot整合ActiveMQ(Queue和Topic兩種模式)

  • Number Of Pending Messages:訊息佇列中待處理的訊息
  • Number Of Consumers:消費者的數量
  • Messages Enqueued:累計進入過訊息佇列的總量
  • Messages Dequeued:累計消費過的訊息總量

【注】佇列模式時,配置檔案application.properties中spring.jms.pub-sub-domain屬性必須設定為false。

Topic模式

topic模式基於釋出/訂閱模式的傳輸。
基於釋出/訂閱模式的傳輸的特點如下:

  • 生產者將訊息釋出到topic中,每個訊息可以有多個消費者,屬於1:N的關係;
  • 生產者和消費者之間有時間上的相關性。訂閱某一個主題的消費者只能消費自它訂閱之後釋出的訊息。
  • 生產者生產時,topic不儲存訊息它是無狀態的不落地,假如無人訂閱就去生產,那就是一條廢訊息。

1、topic生產者

TopicProducerController類為topic生產者控制器,主要向訊息佇列中傳送訊息。程式碼如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.Queue;
import javax.jms.Topic;

/*
* topic訊息生產者
*/
@RestController
public class TopicProducerController {
 @Autowired
 private JmsMessagingTemplate jmsMessagingTemplate;

 @Autowired
 private Topic topic;

 /*
 * 訊息生產者
 */
 @RequestMapping("/topicsendmsg")
 public void sendmsg(String msg) {
  System.out.println("傳送訊息到MQ:" + msg);
  // 指定訊息傳送的目的地及內容
  this.jmsMessagingTemplate.convertAndSend(this.topic,msg);
 }
}

2、topic消費者

TopicConsumerController類為topic消費者控制器,其中寫了兩個消費者方法,可以理解為有兩個使用者訂閱。具體程式碼如下:

import org.springframework.jms.annotation.JmsListener;
import org.springframework.web.bind.annotation.RestController;

/*
 1. topic消費者控制器
 */
@RestController
public class TopicConsumerController {
 /*
  * 消費者接收訊息
  */
 @JmsListener(destination="${mytopic}")
 public void readActiveQueue(String message) {
  System.out.println("接受到:" + message);
 }

 @JmsListener(destination="${mytopic}")
 public void readActiveQueue1(String message) {
  System.out.println("接受到:" + message);
 }
}

3、測試效果

執行專案在瀏覽器中訪問http://localhost:8080/mq/topicsendmsg?msg=123。向訊息佇列中傳送123。控制檯輸出效果(有兩個消費者方法):

詳解Springboot整合ActiveMQ(Queue和Topic兩種模式)

ActiveMQ控制檯顯示:

詳解Springboot整合ActiveMQ(Queue和Topic兩種模式)

  • Number Of Consumers:消費者的數量
  • Messages Enqueued:累計進入過訊息佇列的總量
  • Messages Dequeued:累計消費過的訊息總量

【注】Topic模式時,配置檔案application.properties中spring.jms.pub-sub-domain屬性必須設定為true。

到此這篇關於詳解Springboot整合ActiveMQ(Queue和Topic兩種模式)的文章就介紹到這了,更多相關Springboot整合ActiveMQ內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!