使用SpringCloud Stream結合rabbitMQ實現訊息消費失敗重發機制
阿新 • • 發佈:2020-06-21
> 前言:實際專案中經常遇到訊息消費失敗了,要進行訊息的重發。比如支付訊息消費失敗後,要分不同時間段進行N次的訊息重發提醒。
# 本文模擬場景
1. 當金額少於100時,訊息消費成功
1. 當金額大於100,小於200時,會進行3次重發,第一次1秒;第二次2秒;第三次3秒。
1. 當金額大於200時,訊息消費失敗,會進行5次重發,第一次1秒;第二次2秒;第三次3秒;第四次4秒;第五次5秒。重試五次後,訊息自動進入死信佇列,在死信佇列存活60秒後消失。
# 程式碼例項
**`特別注意程式碼與配置檔案中的註釋,各個使用說明都已經詳細寫在配置檔案中`**
## pom包引入
```xml
```
## 配置application.yml檔案
**注意各個配置的縮排格式,別搞錯了**
```yaml
server:
port: 8081
spring:
application:
name: stream-demo
#rabbitmq連線配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: 123456
cloud:
stream:
bindings:
#訊息生產者,與DelayDemoTopic介面中的DELAY_DEMO_PRODUCER變數值一致
delay-demo-producer:
#①定義交換機名
destination: demo-delay-queue
#訊息消費者,與DelayDemoTopic介面中的DELAY_DEMO_CONSUMER變數值一致
delay-demo-consumer:
#定義交換機名,與①一致,就可以使傳送和消費都指向一個佇列
destination: demo-delay-queue
#分組,這個配置可以開啟訊息持久化、可以解決在叢集環境下重複消費的問題。
#比如A、B兩臺伺服器叢集,如果沒有這個配置,則A、B都能收到同樣的訊息,如果有該配置則只有其中一臺會收到訊息
group: delay-consumer-group
consumer:
#最大重試次數,預設為3。不使用預設的,這裡定義為1,由我們程式控制傳送時間和次數
maxAttempts: 1
rabbit:
bindings:
#訊息生產者,與DelayDemoTopic介面中的DELAY_DEMO_PRODUCER變數值一致
delay-demo-producer:
producer:
#②申明為延遲佇列
delayedExchange: true
#訊息消費者,與DelayDemoTopic介面中的DELAY_DEMO_CONSUMER變數值一致
delay-demo-consumer:
consumer:
#申明為延遲佇列,與②的配置的成對出現的
delayedExchange: true
#開啟死信佇列
autoBindDlq: true
#死信佇列中訊息的存活時間
dlqTtl: 60000
```
## 定義佇列通道
1. 定義通道
```java
/**
* 定義延遲訊息通道
*/
public interface DelayDemoTopic {
/**
* 生產者,與yml檔案配置對應
*/
String DELAY_DEMO_PRODUCER = "delay-demo-producer";
/**
* 消費者,與yml檔案配置對應
*/
String DELAY_DEMO_CONSUMER = "delay-demo-consumer";
/**
* 定義訊息消費者,在@StreamListener監聽訊息的時候用到
* @return
*/
@Input(DELAY_DEMO_CONSUMER)
SubscribableChannel delayDemoConsumer();
/**
* 定義訊息傳送者,在傳送訊息的時候用到
* @return
*/
@Output(DELAY_DEMO_PRODUCER)
MessageChannel delayDemoProducer();
}
```
2. 繫結通道
```java
/**
* 配置訊息的binding
*
*/
@EnableBinding(value = {DelayDemoTopic.class})
@Component
public class MessageConfig {
}
```
## 訊息傳送模擬
```java
/**
* 傳送訊息
*/
@RestController
public class SendMessageController {
@Autowired
DelayDemoTopic delayDemoTopic;
@GetMapping("send")
public Boolean sendMessage(BigDecimal money) throws JsonProcessingException {