1. 程式人生 > >MQ訊息佇列三(SpringBoot 整合rocketMq)

MQ訊息佇列三(SpringBoot 整合rocketMq)

一. JMS規範

在瞭解rocketMq之前先了解一下jms規範,rocketmq雖然不完全基於jms規範,但是他參考了jms規範和 CORBA Notification 規範等,可以說是青出於藍而勝於藍。

JMS即Java訊息服務(Java Message Service)應用程式介面,是一個Java平臺中關於面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。Java訊息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支援。

二. RocketMQ

RocketMQ原理:mmap+write,檔案系統,資料儲存結構,佇列,刷盤策略,訊息查詢,訊息過濾,事務訊息,傳送、訂閱負載均衡,同步雙寫/非同步複製,充分利用記憶體,訊息堆積能力以及解決辦法。

RcoketMQ 是一款低延遲、高可靠、可伸縮、易於使用的訊息中介軟體。具有以下特性:

  1. 支援釋出/訂閱(Pub/Sub)和點對點(P2P)訊息模型

    (1)點對點訊息模型:

      點對點就是一對一的關係,一個訊息發出只有一個接受者所處理。每個訊息都被髮送到一個特定的佇列,接收者從佇列中獲取訊息。佇列保留著訊息,直到他們被消費或超時。

     (2)釋出訂閱訊息模型:

       客戶端將訊息傳送到主題。多個釋出者將訊息傳送到Topic,系統將這些訊息傳遞給多個訂閱者。如果你希望傳送的訊息可以不被做任何處理、或者被一個訊息者處理、或者可以被多個消費者處理的話,那麼可以採用Pub/Sub模型。

2.  在一個佇列中可靠的先進先出(FIFO)和嚴格的順序傳遞

3. 支援拉(pull)和推(push)兩種訊息模式

4. 單一佇列百萬訊息的堆積能力

5. 支援多種訊息協議,如 JMS、MQTT 等

6. 分散式高可用的部署架構,滿足至少一次訊息傳遞語義

7. 提供 docker 映象用於隔離測試和雲集群部署

8. 提供配置、指標和監控等功能豐富的 Dashboard

三.怎麼用RocketMQ

1.  在pom檔案中引入rocketMq

<!-- https://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client -->
<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
	<artifactId>rocketmq-client</artifactId>
	<version>3.2.6</version>
</dependency>

2.  構建Producer類(訊息生產者由Session建立,並用於將訊息傳送到Destination。同樣,訊息生產者分兩種型別:QueueSender和TopicPublisher。可以呼叫訊息生產者的方法(send或publish方法)傳送訊息)

public class Producer {


    public void test(String msgString){
        DefaultMQProducer producer = new DefaultMQProducer("Producer_ChenHT_Test");
        producer.setNamesrvAddr("192.168.20.31:9876");
        try {
            producer.start();
            Message msg = new Message("PushTopicChenhaitao",
                    "push",
                    "1",
                    msgString.getBytes());
            SendResult result = producer.send(msg);
            System.out.println("傳送端:"+"id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            producer.shutdown();
        }
    }


}

3. 構建Consumer類,訊息消費者由Session建立,用於接收被髮送到Destination的訊息。兩種型別:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來建立。當然,也可以session的creatDurableSubscriber方法來建立持久化的訂閱者。

public class Consumer {

    public void testConsume(){
        DefaultMQPushConsumer consumer =
                new DefaultMQPushConsumer("consumer_test_msg");
        consumer.setNamesrvAddr("192.168.20.31:9876");
        try {
            //訂閱PushTopic下Tag為push的訊息
            consumer.subscribe("pushTopicHaleyliu", "push");
            //程式第一次啟動從訊息佇列頭取資料
            System.out.println("開始監聽推送topic頻道的push動作。。。。。。");
            consumer.setConsumeFromWhere(
                    ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(
                    new MessageListenerConcurrently() {
                        public ConsumeConcurrentlyStatus consumeMessage(
                                List<MessageExt> list,
                                ConsumeConcurrentlyContext Context) {
                            Message msg = list.get(0);
                            System.out.println("接收到的訊息是:"+msg.toString());
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                    }
            );
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4.  Service層

public interface RocketMqService {

     void testRocketMQForProduceAndConsume(String msg);
}
@Service
public class RocketMqServiceImpl implements RocketMqService{
    @Override
    public void testRocketMQForProduceAndConsume(String msg){
        Producer producer = new Producer();
        producer.test(msg);
    }
}

5. Controller層

@Controller
public class RocketMqController {

    @Autowired
    private RocketMqService rocketMqService;

    @GetMapping("/mq")
    public String getMq(@RequestParam("msg") String msg){
        rocketMqService.testRocketMQForProduceAndConsume(msg);
        return "完成";
    }
}

如果出現rocketmq執行時提示 No route info of this topic 異常產生的原因可能是:

① Broker禁止自動建立Topic,且使用者沒有通過手工方式建立Topic

     mqbroker.exe -n 電腦ipv4地址:9876 autoCreateTopicEnable=true

② Broker沒有正確連線到Name Server

③ Producer沒有正確連線到Name Server 

④ 防火牆未關閉

⑤ 少 jar 包:fastjson-1.2.29.jar 

⑥ mqnameserver 或者 mqbroker 會報錯記憶體不夠

vim bin/runserver.sh (調整nameserver啟動的記憶體,不調整此檔案,可能導致無法啟動。)
 
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" 
 
vim bin/runbroker.sh     

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"