1. 程式人生 > >關於RocketMQ(Apache接手後)新版本下,producer啟動報錯RocketMQ No route info of this topic問題

關於RocketMQ(Apache接手後)新版本下,producer啟動報錯RocketMQ No route info of this topic問題

坑爹啊,原本以為一會就搞定,結果從上午9點整整搞到下午2點半

producer

package com.zgd.rocketmq.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/** * 傳送訊息的生產者 * @author zgd * @time 2018年8月7日09:50:34 */ public class ExampleProducer { public static void main(String[] args) throws Exception { //生產者,可以指定producer叢集 DefaultMQProducer producer = new DefaultMQProducer("producer_group_name"); //設定name server的地址 producer.
setNamesrvAddr("127.0.0.1:9876"); producer.start(); System.out.println(producer.getNamesrvAddr()); System.out.println(producer.getClientIP()); System.out.println("啟動了生產者producer"); //message必須指定topic,和訊息體body // 可以選擇指定tag,key來進行細分message Message msgA =
new Message("topicA", "這是topicA的訊息,沒有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET)); Message msgB = new Message("topicB", "這是topicB的訊息,沒有指定tag和key".getBytes(RemotingHelper.DEFAULT_CHARSET)); Message msgC = new Message("topicC","tag-a","這是topicC的訊息,指定了tag-a".getBytes(RemotingHelper.DEFAULT_CHARSET)); Message msgD = new Message("topicC","tag-b","這是topicC的訊息,指定了tag-b".getBytes(RemotingHelper.DEFAULT_CHARSET)); Message msgE = new Message("topicC","tag-a","key1","這是topicC的訊息,指定了tag-a和key1".getBytes(RemotingHelper.DEFAULT_CHARSET)); Message[] messages =new Message[]{msgA,msgB,msgC,msgD,msgE}; //傳送訊息 for (Message message : messages) { SendResult result = producer.send(message); System.out.println("訊息傳送成功:id:" + result.getMsgId() + " result:" + result.getSendStatus()); } } }

consumer

package com.zgd.rocketmq.consumer;


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;

/**
 * 接收訊息的消費者
 * @author zgd
 * @time 2018年8月7日09:50:34
 */
public class ExampleConsumer {
    public static void main(String[] args) throws Exception {
        //定義消費者,可以指定消費叢集
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name");
        //同樣的,指定name server 的地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

      /*
        //訂閱topicA下的所有訊息
        consumer.subscribe("topicA","*");
        //一個consumer可以訂閱多個topic
        consumer.subscribe("topicB","*");
        */

      consumer.subscribe("topicC","tag-a");

        //程式第一次啟動從訊息佇列頭取資料
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //註冊訂閱訊息
        consumer.registerMessageListener(
                new MessageListenerConcurrently() {
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(
                            List<MessageExt> list,
                            ConsumeConcurrentlyContext Context) {
                        MessageExt msg = list.get(0);
                        try {
                            System.out.println( new Date().toLocaleString()
                                +"-收到訊息:id-"+msg.getMsgId()
                                +","+ new String(msg.getBody(), "UTF-8")
                                +","+"keys: "+msg.getKeys()
                            );
                            System.out.println("msg全部資訊:"+ msg.toString());

                        } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                        }
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                }
        );
        consumer.start();
        System.out.println("consumer消費者啟動");
        while (true){

        }
    }

}

windows部署RocketMQ, ok
啟動namesrv, ok
啟動broker,ok

windows直接點選藍色框框的兩個檔案, linux開啟紅色框框檔案;

sh bin/mqnamesrv &

sh bin/mqbroker -n localhost:9876 &

在這裡插入圖片描述

然後啟動consumer,沒問題
然後啟動producer發訊息,出錯了!

org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, topicA

這裡寫圖片描述

不得不說這個其實很坑.不管大大小小七七八八的問題都是報這個異常, 包括沒開broker和namesrv直接啟動也是報這個異常,所以網上很多解決辦法也千奇百怪

不過主要的就是producer沒有和broker建立長連線導致的

目前網上常見的解決方法:

1. broker沒啟動成功,啟動命令有問題

linux:

nohup sh mqbroker -n 192.168.180.133:9876 autoCreateTopicEnable=true

windows

mqbroker.exe -n localhost:9876autoCreateTopicEnable=true

這裡寫圖片描述

這個也是有問題,比較新版本的已經沒有mqbroker.exe這個了,如圖:
這裡寫圖片描述

直接cmd下./mqbroker -n 127.0.0.1:9876 啟動就好

2. broker沒有自動建立topic

其實這個問題也不存在,新版本中都是預設autoCreateTopicEnable=true

啟動./mqbroker -n 127.0.0.1:9876 -p -p是檢視配置資訊
這裡寫圖片描述
所以新版本都是開啟自動建立topic的

3. 不知道broker是否啟動成功

檢視broker.log,windows的預設在C:\Users\Admin\logs\rocketmqlogs

檢視logback_broker.xml可以看出
這裡寫圖片描述

linux的在~/logs/rocketmqlogs/
這裡寫圖片描述
看到register broker to name server localhost:9876 OK 說明已經成功註冊到name server上了

也可以啟動rocketmq-console
這裡寫圖片描述

看到這個就是成功了.

4. fastjson, slf4j的jar包缺失

其實這個新版本應該沒啥影響,不過以防萬一的話還是都加上, 看上面我貼出來的pom.xml

5. 版本不對

其實以上的問題基本在高版本都解決了

就是這個問題困擾了我一天,硬是把所有的原因都排除了一遍,甚至去排除broker的配置檔案,brokerIP,重啟name server,都不行.後面嘗試著去將pom的版本換成下載rocketmq的版本,結果居然就好了!!!

我的專案pom檔案:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.zgd.rocketmq</groupId>
  <artifactId>rocketmq-study-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>rocketmq-study-demo</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <rocketmq.version>4.3.0</rocketmq.version>
    <slf4j.version>1.7.25</slf4j.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.28.Final</version>
    </dependency>

    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>${rocketmq.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-common</artifactId>
      <version>${rocketmq.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-remoting</artifactId>
      <version>${rocketmq.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.49</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-nop</artifactId>
      <version>${slf4j.version}</version>
    </dependency>

  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
        <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.7.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.20.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>


我用的rocketmq版本: 4.3.0

分享一下成功後的截圖
這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

最後說一下windows關閉namesrv和broker

.\mqshutdown namesrv
.\mqshutdown broker

linux

sh mqshutdown namesrv
sh mqshutdown broker

這裡寫圖片描述