1. 程式人生 > >RabbitMQ入門:主題路由器(Topic Exchange)

RabbitMQ入門:主題路由器(Topic Exchange)

AI orange topic 都是 erro col host nfa 匹配

上一篇博文中,我們使用direct exchange 代替了fanout exchange,這次我們來看下topic exchange。

一、Topic Exchange介紹

topic exchange和direct exchange類似,都是通過routing key和binding key進行匹配,不同的是topic exchange可以為routing key設置多重標準。

direct路由器類似於sql語句中的精確查詢;topic 路由器有點類似於sql語句中的模糊查詢。

還記得嗎?我們在《RabbitMQ入門:發布/訂閱(Publish/Subscribe)》中對exchange的分類進行過介紹:

Direct:完全根據key進行投遞的,例如,綁定時設置了routing key為”abc”,那麽客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。
Topic:對key進行模式匹配後進行投遞,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。
Fanout:不需要key,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。
Headers:我們可以不考慮它。

下面是官網給出的工作模型(P代表生產者,X代表exhange,紅色的Q代表隊列,C代表消費者):

技術分享圖片

我們來分析下這個模型。

它發送的消息是用來描述動物的。路由鍵有三個單詞:<speed>.<color>.<species>,第一個單詞描述了速度,第二個描述了顏色,第三個描述了物種。
有三個綁定鍵,Q1綁定鍵為*.orange.*(關註所有顏色為orange的動物); Q2的綁定鍵有兩個,分別是*.*.rabbit(關註所有的兔子)和lazy.#(關註所有速度為lazy的動物)。

因此,路由鍵為quick.orange.rabbit的消息將發送到Q1和Q2,路由鍵為quick.orange.fox的消息將發送到Q1,路由鍵為lazy.brown.fox的消息將發送到Q2。路由鍵為lazy.pink.rabbit的消息將發送到Q2,但是註意,它只會到達Q2一次,盡管它匹配了兩個綁定鍵。路由鍵為quick.brown.fox的消息因為不和任意的綁定鍵匹配,所以將會被丟棄。

如果有人手一抖發了個lazy.orange.male.rabbit這種四個單詞的,這個怎麽辦呢? 由於它和lazy.#匹配,因此將發送到Q2。

二、代碼示例

接下來我們看下代碼

  1. 生產者
    public class LogTopicSender {
        // exchange名字
        public static String EXCHANGE_NAME = "topicExchange";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.創建連接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.為通道聲明topic類型的exchange
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                
                // 3.發送消息到指定的exchange,隊列指定為空,由exchange根據情況判斷需要發送到哪些隊列
                String routingKey = "info";
    //            String routingKey = "log4j.error";
    //            String routingKey = "logback.error";
    //            String routingKey = "log4j.warn";
                String msg = " hello rabbitmq, I am " + routingKey;
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
                System.out.println("product send a msg: " + msg);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 4.關閉連接
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    }

  2. 消費者
    public class LogTopicReciver {
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.創建連接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.為通道聲明topic類型的exchange
                channel.exchangeDeclare(LogTopicSender.EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
                // 3.創建隨機名字的隊列
                String queueName = channel.queueDeclare().getQueue();
    
                // 4.建立exchange和隊列的綁定關系
                String[] bindingKeys = { "#" };
    //            String[] bindingKeys = { "log4j.*", "#.error" };
    //            String[] bindingKeys = { "*.error" };
    //            String[] bindingKeys = { "log4j.warn" };
                for (int i = 0; i < bindingKeys.length; i++) {
                    channel.queueBind(queueName, LogTopicSender.EXCHANGE_NAME, bindingKeys[i]);
                    System.out.println(" **** LogTopicReciver keep alive ,waiting for " + bindingKeys[i]);
                }
    
                // 5.通過回調生成消費者並進行監聽
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                        // 獲取消息內容然後處理
                        String msg = new String(body, "UTF-8");
                        System.out.println("*********** LogTopicReciver" + " get message :[" + msg + "]");
                    }
                };
                // 6.消費消息
                channel.basicConsume(queueName, true, consumer);
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

  3. 啟動消費者,作為消費者1
  4. 分別將String[] bindingKeys = { "#" };改為String[] bindingKeys = { "log4j.*", "#.error" };/String[] bindingKeys = { "*.error" };/String[] bindingKeys = { "log4j.warn" };,然後啟動作為消費者2、消費者3、消費者4
  5. 啟動4次生產者,routing key分別為String routingKey = "info";、String routingKey = "log4j.error";、String routingKey = "logback.error";、String routingKey = "log4j.warn";
  6. 觀察控制臺log
    生產者:
    product send a msg:  hello rabbitmq, I am info
    product send a msg:  hello rabbitmq, I am log4j.error
    product send a msg:  hello rabbitmq, I am logback.error
    product send a msg:  hello rabbitmq, I am log4j.warn
    
    消費者1:
     **** LogTopicReciver keep alive ,waiting for #
    *********** LogTopicReciver get message :[ hello rabbitmq, I am info]
    *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error]
    *********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error]
    *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn]
    
    消費者2:
     **** LogTopicReciver keep alive ,waiting for log4j.*
     **** LogTopicReciver keep alive ,waiting for #.error
    *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error]
    *********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error]
    *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn] 消費者3: **** LogTopicReciver keep alive ,waiting for *.error *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.error] *********** LogTopicReciver get message :[ hello rabbitmq, I am logback.error] 消費者4: **** LogTopicReciver keep alive ,waiting for log4j.warn *********** LogTopicReciver get message :[ hello rabbitmq, I am log4j.warn]

  7. 觀察RabbitMQ管理頁面技術分享圖片

    技術分享圖片

    技術分享圖片

RabbitMQ入門:主題路由器(Topic Exchange)