MQ訊息佇列三(SpringBoot 整合rocketMq)
一. JMS規範
在瞭解rocketMq之前先了解一下jms規範,rocketmq雖然不完全基於jms規範,但是他參考了jms規範和 CORBA Notification 規範等,可以說是青出於藍而勝於藍。
JMS即Java訊息服務(Java Message Service)應用程式介面,是一個Java平臺中關於面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。Java訊息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支援。
二. RocketMQ
RocketMQ原理:mmap+write,檔案系統,資料儲存結構,佇列,刷盤策略,訊息查詢,訊息過濾,事務訊息,傳送、訂閱負載均衡,同步雙寫/非同步複製,充分利用記憶體,訊息堆積能力以及解決辦法。
RcoketMQ 是一款低延遲、高可靠、可伸縮、易於使用的訊息中介軟體。具有以下特性:
- 支援釋出/訂閱(Pub/Sub)和點對點(P2P)訊息模型
(1)點對點訊息模型:
點對點就是一對一的關係,一個訊息發出只有一個接受者所處理。每個訊息都被髮送到一個特定的佇列,接收者從佇列中獲取訊息。佇列保留著訊息,直到他們被消費或超時。
(2)釋出訂閱訊息模型:
客戶端將訊息傳送到主題。多個釋出者將訊息傳送到Topic,系統將這些訊息傳遞給多個訂閱者。如果你希望傳送的訊息可以不被做任何處理、或者被一個訊息者處理、或者可以被多個消費者處理的話,那麼可以採用Pub/Sub模型。
2. 在一個佇列中可靠的先進先出(FIFO)和嚴格的順序傳遞
3. 支援拉(pull)和推(push)兩種訊息模式
4. 單一佇列百萬訊息的堆積能力
5. 支援多種訊息協議,如 JMS、MQTT 等
6. 分散式高可用的部署架構,滿足至少一次訊息傳遞語義
7. 提供 docker 映象用於隔離測試和雲集群部署
8. 提供配置、指標和監控等功能豐富的 Dashboard
三.怎麼用RocketMQ
1. 在pom檔案中引入rocketMq
<!-- https://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client --> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency>
2. 構建Producer類(訊息生產者由Session建立,並用於將訊息傳送到Destination。同樣,訊息生產者分兩種型別:QueueSender和TopicPublisher。可以呼叫訊息生產者的方法(send或publish方法)傳送訊息)
public class Producer {
public void test(String msgString){
DefaultMQProducer producer = new DefaultMQProducer("Producer_ChenHT_Test");
producer.setNamesrvAddr("192.168.20.31:9876");
try {
producer.start();
Message msg = new Message("PushTopicChenhaitao",
"push",
"1",
msgString.getBytes());
SendResult result = producer.send(msg);
System.out.println("傳送端:"+"id:" + result.getMsgId() +
" result:" + result.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
}finally{
producer.shutdown();
}
}
}
3. 構建Consumer類,訊息消費者由Session建立,用於接收被髮送到Destination的訊息。兩種型別:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來建立。當然,也可以session的creatDurableSubscriber方法來建立持久化的訂閱者。
public class Consumer {
public void testConsume(){
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer("consumer_test_msg");
consumer.setNamesrvAddr("192.168.20.31:9876");
try {
//訂閱PushTopic下Tag為push的訊息
consumer.subscribe("pushTopicHaleyliu", "push");
//程式第一次啟動從訊息佇列頭取資料
System.out.println("開始監聽推送topic頻道的push動作。。。。。。");
consumer.setConsumeFromWhere(
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> list,
ConsumeConcurrentlyContext Context) {
Message msg = list.get(0);
System.out.println("接收到的訊息是:"+msg.toString());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
4. Service層
public interface RocketMqService {
void testRocketMQForProduceAndConsume(String msg);
}
@Service
public class RocketMqServiceImpl implements RocketMqService{
@Override
public void testRocketMQForProduceAndConsume(String msg){
Producer producer = new Producer();
producer.test(msg);
}
}
5. Controller層
@Controller
public class RocketMqController {
@Autowired
private RocketMqService rocketMqService;
@GetMapping("/mq")
public String getMq(@RequestParam("msg") String msg){
rocketMqService.testRocketMQForProduceAndConsume(msg);
return "完成";
}
}
如果出現rocketmq執行時提示 No route info of this topic 異常產生的原因可能是:
① Broker禁止自動建立Topic,且使用者沒有通過手工方式建立Topic
mqbroker.exe -n 電腦ipv4地址:9876 autoCreateTopicEnable=true
② Broker沒有正確連線到Name Server
③ Producer沒有正確連線到Name Server
④ 防火牆未關閉
⑤ 少 jar 包:fastjson-1.2.29.jar
⑥ mqnameserver 或者 mqbroker 會報錯記憶體不夠
vim bin/runserver.sh (調整nameserver啟動的記憶體,不調整此檔案,可能導致無法啟動。)
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
vim bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"