RabbitMQ ——與SpringBoot整合並實現訊息確認機制
阿新 • • 發佈:2018-12-24
不囉嗦直接上程式碼
目錄結構如下:
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.test</groupId> <artifactId>RabbitMQ_MQTT</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>RabbitMQ_MQTT</name> <url>http://maven.apache.org</url> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.6.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- <dependency> <groupId>org.fusesource.mqtt-client</groupId> <artifactId>mqtt-client</artifactId> <version>1.12</version> </dependency> --> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <fork>true</fork> </configuration> </plugin> </plugins> </build> </project>
application.properties
servier.port=8080
spring.rabbitmq.queues=topic.1,mqtt.test.*,mqtt.test.dd
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
Application.java
package com.gm; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.gm.rabbit.CallBackSender; @Configuration @RestController @EnableAutoConfiguration @ComponentScan @SpringBootApplication public class Application { @Autowired private CallBackSender sender; public static void main(String[] args) { SpringApplication.run(Application.class, args); } @RequestMapping("/callback") public void callbak() { sender.send("topic.baqgl.admin.1", "測試訊息"); } }
RabbitConfig.java
package com.gm.rabbit;
import java.util.ArrayList;
import java.util.List;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
@Configuration
public class RabbitConfig {
@Value("${spring.rabbitmq.host}")
private String addresses;
@Value("${spring.rabbitmq.port}")
private String port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.publisher-confirms}")
private boolean publisherConfirms;
@Value("${spring.rabbitmq.queues}")
private String queues;
final static String EXCHANGE_NAME = "amq.topic";
final static String QUEUE_NAME = "topic.baqgl.*.*";
final static String ROUTING_KEY = "topic.baqgl.#";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses + ":" + port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
/** 如果要進行訊息回撥,則這裡必須要設定為true */
connectionFactory.setPublisherConfirms(publisherConfirms);
return connectionFactory;
}
@Bean
/** 因為要設定回撥類,所以應是prototype型別,如果是singleton型別,則回撥類為最後一次設定 */
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
@Bean
TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME);
}
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME, true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY);
}
@Bean
public SimpleMessageListenerContainer messageContainer() {
/*Queue[] q = new Queue[queues.split(",").length];
for (int i = 0; i < queues.split(",").length; i++) {
q[i] = new Queue(queues.split(",")[i]);
}*/
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
try {
System.out.println(
"消費端接收到訊息:" + message.getMessageProperties() + ":" + new String(message.getBody()));
System.out.println("topic:"+message.getMessageProperties().getReceivedRoutingKey());
// deliveryTag是訊息傳送的次數,我這裡是為了讓訊息佇列的第一個訊息到達的時候丟擲異常,處理異常讓訊息重新回到佇列,然後再次丟擲異常,處理異常拒絕讓訊息重回佇列
/*if (message.getMessageProperties().getDeliveryTag() == 1
|| message.getMessageProperties().getDeliveryTag() == 2) {
throw new Exception();
}*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // false只確認當前一個訊息收到,true確認所有consumer獲得的訊息
} catch (Exception e) {
e.printStackTrace();
if (message.getMessageProperties().getRedelivered()) {
System.out.println("訊息已重複處理失敗,拒絕再次接收...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒絕訊息
} else {
System.out.println("訊息即將再次返回佇列處理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue為是否重新回到佇列
}
}
}
});
return container;
}
}
CallBackSender.java
package com.gm.rabbit;
import java.util.UUID;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class CallBackSender implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String topic, String message) {
rabbitTemplate.setConfirmCallback(this);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
System.out.println("訊息id:" + correlationData.getId());
//用RabbitMQ傳送MQTT需將exchange配置為amq.topic
this.rabbitTemplate.convertAndSend("amq.topic", topic, message, correlationData);
}
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("訊息id:" + correlationData.getId());
if (ack) {
System.out.println("訊息傳送確認成功");
} else {
System.out.println("訊息傳送確認失敗:" + cause);
}
}
}
ApplicationTests.java
package com.gm;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Test
public void contextLoads() {
System.out.println("hello world");
}
}
TopicTest.java
package com.gm.rabbit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class TopicTest {
@Autowired
private CallBackSender sender;
@Test
public void topic() throws Exception {
sender.send("topic.baqgl.admin.1", "測試訊息");
}
}
本文選擇的是RabbitMQ整合MQTT,並實現訊息持久化,如不需要整合MQTT只需修改RabbitConfig.java中的EXCHANGE_NAME即可。
整合MQTT相關配置:
建立使用者:
建立賬號
rabbitmqctl add_user admin 123456
設定使用者角色
rabbitmqctl set_user_tags admin administrator
設定使用者許可權
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
設定完成後可以檢視當前使用者和角色(需要開啟服務)
rabbitmqctl list_users
安裝外掛:
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_mqtt
預設配置。window下,rabbitmq的配置檔案在C:\Users\Administrator\AppData\Roaming\RabbitMQ下。沒配置的情況下,採用如下配置:
[{rabbit, [{tcp_listeners, [5672]}]},
{rabbitmq_mqtt, [{default_user, <<"admin">>},
{default_pass, <<"123456">>},
{allow_anonymous, true},
{vhost, <<"/">>},
{exchange, <<"amq.topic">>},
{subscription_ttl, 1800000},
{prefetch, 10},
{ssl_listeners, []},
%% Default MQTT with TLS port is 8883
%% {ssl_listeners, [8883]}
{tcp_listeners, [1883]},
{tcp_listen_options, [{backlog, 128},
{nodelay, true}]}]}
].