1. 程式人生 > 實用技巧 >【日常摘要】- RabbitMq實現延時佇列

【日常摘要】- RabbitMq實現延時佇列

簡介

  • 什麼是延時佇列?

    • 一種帶有延遲功能的訊息佇列

過程:

  • 使用場景

    • 比如存在某個業務場景

      • 發起一個訂單,但是處於未支付的狀態?如何及時的關閉訂單並退還庫存?
      • 如何定期檢查處於退款訂單是否已經成功退款?
    • 為瞭解決上述的場景,就可以通過延時佇列去處理
  • 簡單實現
    /**
* rabbitTemplate
*/
@Autowired
private RabbitTemplate rabbitTemplate; /**
* rabbitAdmin
*/
@Autowired
private RabbitAdmin rabbitAdmin; /**
* messageProperties
*/
@Autowired
private MessageProperties messageProperties; /**
* 傳送延遲佇列訊息
*
* @param exchange
* @param content
* @param millseconds
*/
public void delayPublish(String exchange, String content, int millseconds) {
String delayExchangeName = exchange + "_delay";
String delayQueueName = delayExchangeName + "->queue_delay";
String delayRouteKey = "dead";
Map<String, Object> arguments = new HashMap<>();
arguments.putIfAbsent("x-dead-letter-exchange", exchange);
declareExchange(exchange);
declareExchange(delayExchangeName);
rabbitAdmin.declareQueue(new Queue(delayQueueName, true, false, false, arguments));
rabbitAdmin.declareBinding(new Binding(delayQueueName, Binding.DestinationType.QUEUE, delayExchangeName,
delayRouteKey, Collections.emptyMap()));
MessageProperties messageProperties = getMessageProperties(Collections.emptyMap());
messageProperties.setExpiration(Integer.toString(millseconds));
publish(delayExchangeName, content, messageProperties);
} /**
* sendMessage
*
* @param exchange
* @param content
* @param messageProperties
*/
private void publish(String exchange, String content, MessageProperties messageProperties) {
declareExchange(exchange);
Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
try {
rabbitTemplate.send(exchange, "", message);
log.debug("推送給exchange:{},訊息體:{} 成功", exchange, content);
} catch (Exception e) {
log.error("推送給exchange:{},訊息體:{} 失敗!!", exchange, content, e);
throw e;
}
} /**
* declareExchange
*
* @param exchange
*/
private void declareExchange(String exchange) {
rabbitAdmin.declareExchange(new FanoutExchange(exchange, true, false, null));
} /**
* getMessageProperties
*
* @param header
* @return
*/
private MessageProperties getMessageProperties(Map<String, String> header) {
if (header == null) {
return this.messageProperties;
} MessageProperties customMessageProperties = new MessageProperties();
customMessageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
customMessageProperties.setHeader("content_encoding", "JSON");
for (Map.Entry<String, String> item : header.entrySet()) {
customMessageProperties.setHeader(item.getKey(), item.getValue());
}
return customMessageProperties;
}
  • 通過上述程式碼就可以實現一個簡單的延時佇列訊息的釋出
  • 那麼只需要進行對應的監聽便可以進行消費,達到延時佇列的釋出及消費的功能