1. 程式人生 > >使用SpringCloud Stream結合rabbitMQ實現訊息消費失敗重發機制

使用SpringCloud Stream結合rabbitMQ實現訊息消費失敗重發機制

> 前言:實際專案中經常遇到訊息消費失敗了,要進行訊息的重發。比如支付訊息消費失敗後,要分不同時間段進行N次的訊息重發提醒。 # 本文模擬場景 1. 當金額少於100時,訊息消費成功 1. 當金額大於100,小於200時,會進行3次重發,第一次1秒;第二次2秒;第三次3秒。 1. 當金額大於200時,訊息消費失敗,會進行5次重發,第一次1秒;第二次2秒;第三次3秒;第四次4秒;第五次5秒。重試五次後,訊息自動進入死信佇列,在死信佇列存活60秒後消失。 # 程式碼例項 **`特別注意程式碼與配置檔案中的註釋,各個使用說明都已經詳細寫在配置檔案中`** ## pom包引入 ```xml ``` ## 配置application.yml檔案 **注意各個配置的縮排格式,別搞錯了** ```yaml server: port: 8081 spring: application: name: stream-demo #rabbitmq連線配置 rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: 123456 cloud: stream: bindings: #訊息生產者,與DelayDemoTopic介面中的DELAY_DEMO_PRODUCER變數值一致 delay-demo-producer: #①定義交換機名 destination: demo-delay-queue #訊息消費者,與DelayDemoTopic介面中的DELAY_DEMO_CONSUMER變數值一致 delay-demo-consumer: #定義交換機名,與①一致,就可以使傳送和消費都指向一個佇列 destination: demo-delay-queue #分組,這個配置可以開啟訊息持久化、可以解決在叢集環境下重複消費的問題。 #比如A、B兩臺伺服器叢集,如果沒有這個配置,則A、B都能收到同樣的訊息,如果有該配置則只有其中一臺會收到訊息 group: delay-consumer-group consumer: #最大重試次數,預設為3。不使用預設的,這裡定義為1,由我們程式控制傳送時間和次數 maxAttempts: 1 rabbit: bindings: #訊息生產者,與DelayDemoTopic介面中的DELAY_DEMO_PRODUCER變數值一致 delay-demo-producer: producer: #②申明為延遲佇列 delayedExchange: true #訊息消費者,與DelayDemoTopic介面中的DELAY_DEMO_CONSUMER變數值一致 delay-demo-consumer: consumer: #申明為延遲佇列,與②的配置的成對出現的 delayedExchange: true #開啟死信佇列 autoBindDlq: true #死信佇列中訊息的存活時間 dlqTtl: 60000 ``` ## 定義佇列通道 1. 定義通道 ```java /** * 定義延遲訊息通道 */ public interface DelayDemoTopic { /** * 生產者,與yml檔案配置對應 */ String DELAY_DEMO_PRODUCER = "delay-demo-producer"; /** * 消費者,與yml檔案配置對應 */ String DELAY_DEMO_CONSUMER = "delay-demo-consumer"; /** * 定義訊息消費者,在@StreamListener監聽訊息的時候用到 * @return */ @Input(DELAY_DEMO_CONSUMER) SubscribableChannel delayDemoConsumer(); /** * 定義訊息傳送者,在傳送訊息的時候用到 * @return */ @Output(DELAY_DEMO_PRODUCER) MessageChannel delayDemoProducer(); } ``` 2. 繫結通道 ```java /** * 配置訊息的binding * */ @EnableBinding(value = {DelayDemoTopic.class}) @Component public class MessageConfig { } ``` ## 訊息傳送模擬 ```java /** * 傳送訊息 */ @RestController public class SendMessageController { @Autowired DelayDemoTopic delayDemoTopic; @GetMapping("send") public Boolean sendMessage(BigDecimal money) throws JsonProcessingException {