1. 程式人生 > >【RabbitMQ】三種Exchange模式——訂閱、路由、萬用字元模式

【RabbitMQ】三種Exchange模式——訂閱、路由、萬用字元模式

   前兩篇部落格介紹了兩種佇列模式,這篇部落格介紹訂閱、路由和萬用字元模式,之所以放在一起介紹,是因為這三種模式都是用了Exchange交換機,訊息沒有直接傳送到佇列,而是傳送到了交換機,經過佇列繫結交換機到達佇列。

一、訂閱模式(Fanout Exchange):

   一個生產者,多個消費者,每一個消費者都有自己的一個佇列,生產者沒有將訊息直接傳送到佇列,而是傳送到了交換機,每個佇列繫結交換機,生產者傳送的訊息經過交換機,到達佇列,實現一個訊息被多個消費者獲取的目的。需要注意的是,如果將訊息傳送到一個沒有佇列繫結的exchange上面,那麼該訊息將會丟失,這是因為在rabbitMQ中exchange不具備儲存訊息的能力,只有佇列具備儲存訊息的能力。

      

         

示例程式碼:

生產者:

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        //從連線中建立通道
        Channel channel = connection.createChannel();

        // 宣告exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 訊息內容
        String message = "商品已經新增,id = 1000";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}
消費者1:
public class Recv {

    private final static String QUEUE_NAME = "test_queue_fanout_1";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 前臺系統: '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

  消費者2的程式碼和消費者1的程式碼大致相同,只是佇列的名稱不一樣,這樣兩個消費者有自己的佇列,都可以接收到生產者傳送的訊息


  但是如果生產者有新增商品,修改商品,刪除商品的訊息,消費者包快前臺系統和搜尋系統,要求前臺系統接收修改和刪除商品的訊息,搜尋系統接收新增商品、修改商品和刪除商品的訊息。所以使用這種訂閱模式實現商品資料的同步並不合理。因此我們介紹下一種模式:路由模式。

二、路由模式(Direct Exchange)

  這種模式添加了一個路由鍵,生產者釋出訊息的時候新增路由鍵,消費者繫結佇列到交換機時新增鍵值,這樣就可以接收到需要接收的訊息。

       


示例程式碼:

生產者:
public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        // 訊息內容
        String message = "刪除商品, id = 1001";
        channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}
消費者1:接收更新和刪除訊息
public class Recv {

    private final static String QUEUE_NAME = "test_queue_direct_1";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 前臺系統: '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
消費者2:接收insert,update,delete的訊息
public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_direct_2";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 搜尋系統: '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
  如果生產者釋出了insert訊息,那麼消費者2可以收到,消費者 1收不到,如果釋出了update或者delete訊息,兩個消費者都可以收到。如果釋出ABC訊息兩個消費者都收不到,因為沒有繫結這個鍵值。這種模式基本滿足了我們的需求,但是還不夠靈活,下面介紹另外一個模式。

三、萬用字元模式(Topic Exchange)

   基本思想和路由模式是一樣的,只不過路由鍵支援模糊匹配,符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞

       

示例程式碼:

生產者:

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 訊息內容
        String message = "刪除商品,id = 1001";
        channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

消費者1:
public class Recv {

    private final static String QUEUE_NAME = "test_queue_topic_1";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 前臺系統: '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}


消費者2:

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_topic_2";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 獲取到連線以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 宣告佇列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 繫結佇列到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");

        // 同一時刻伺服器只會發一條訊息給消費者
        channel.basicQos(1);

        // 定義佇列的消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 監聽佇列,手動返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 獲取訊息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 搜尋系統: '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

  消費者1是按需索取,並沒有使用萬用字元模式,而是用的完全匹配,消費者2使用萬用字元模式,這樣以item.開頭的訊息都會全部接收。

小結:

  1.與簡單模式和work模式對比,前面兩種同一個訊息只能被一個消費者獲取,而今天的這三種模式,可以實現一個訊息被多個消費者 獲取。

  2.fanout這種模式沒有加入路由器,佇列與exchange繫結後,就會接收到所有的訊息,其餘兩種增加了路由鍵,並且第三種增加萬用字元,更加便利。