1. 程式人生 > >RabbitMQ學習——常見概念詳解

RabbitMQ學習——常見概念詳解

文章目錄

Exchange

描述

用於接收訊息,並根據路由鍵轉發訊息到所繫結的佇列

結構圖

在這裡插入圖片描述

藍色框表示傳送訊息,然後訊息通過路由關係路由到Queue1或Queue2

綠色框表示接收訊息,消費者與佇列建立監聽,去消費訊息

黃色框表示路由鍵繫結關係

交換機屬性

  • Name:交換機名稱
  • Type:交換機型別direct、topic、fanout、headers
  • Durablity:是否需要持久化 true|false
  • Auto Delete:當最後一個繫結到Exchange上的佇列刪除後,自動刪除該Exchange
  • Internal:當前Exchange是否用於RabbitMQ內部使用,預設為false
  • Arguments:擴充套件引數,用於擴充套件AMQP協議自制定使用

Direct Exchange

  • 所有傳送到Direct Exchange的訊息被轉發到RoutingKey中指定的Queue

在這裡插入圖片描述

如圖所示,路由到佇列名為Key的佇列中去了

生產者程式碼:

public class Producer4DirectExchange {
    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.getConn();

        Channel channel = connection.createChannel
(); //宣告 String exchangeName = "test_direct_exchange"; String routingKey = "test.direct"; //傳送 String msg = "Hello, I am Producer4DirectExchange"; channel.basicPublish(exchangeName,routingKey,null,msg.getBytes()); } }

消費者程式碼:

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = ConnectionUtil.getConn();

        Channel channel = connection.createChannel();
        //宣告
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";

        //宣告Exchange
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);

        //宣告一個佇列
        channel.queueDeclare(queueName,false,false,false,null);

        //建立一個繫結關係
        channel.queueBind(queueName,exchangeName,routingKey);

        //durable 是否持久化訊息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //引數:佇列名稱,是否自動ACK,Consumer
        channel.basicConsume(queueName,true,consumer);
        while (true) {
            //阻塞獲取訊息
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到訊息:"+msg);
        }


    }

公共類:

public class ConnectionUtil {
    public static final String MQ_HOST = "192.168.222.101";
    public static final String MQ_VHOST = "/";
    public static final int MQ_PORT = 5672;

    public static Connection getConn() {
        //1. 建立一個ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(MQ_HOST);//配置host
        connectionFactory.setPort(MQ_PORT);//配置port
        connectionFactory.setVirtualHost(MQ_VHOST);//配置vHost

        //2. 通過連線工廠建立連線
        try {
            return connectionFactory.newConnection();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

登入控制檯,可以看到名為test_direct_exchange的交換機通過路由鍵test.direct繫結到test_direct_queue

在這裡插入圖片描述

Topic Exchange

  • 所有傳送到Topic Exchange的訊息都被轉發到所有關心RoutingKey中指定Topic的佇列上
  • Exchange將RoutingKey和某Topic進行模糊匹配,此時佇列需要繫結一個Topic

可以使用萬用字元進行模糊匹配:
# 匹配一個或多個詞(單詞的意思,不是一個字元)
*匹配一個詞
log.# 能匹配到"log.info.a"
log.* 只匹配"log.error"

在這裡插入圖片描述

如上圖,比如,user.news和user.weather能路由到第一個佇列

生成者程式碼:

public class Producer4TopicExchange {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConn();

        Channel channel = connection.createChannel();

        //宣告
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.find.abc";


        //傳送
        String msg = "Hello, I am Producer4TopicExchange";
        channel.basicPublish(exchangeName,routingKey1,null,msg.getBytes());
        channel.basicPublish(exchangeName,routingKey2,null,msg.getBytes());
        channel.basicPublish(exchangeName,routingKey3,null,msg.getBytes());

        CloseTool.closeElegantly(channel,connection);
    }
}

消費者程式碼:


public class Consumer4TopicExchange {
    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = ConnectionUtil.getConn();

        Channel channel = connection.createChannel();
        //宣告
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        String routingKey = "user.#";

        //宣告Exchange
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);

        //宣告一個佇列
        channel.queueDeclare(queueName,false,false,false,null);

        //建立一個繫結關係
        channel.queueBind(queueName,exchangeName,routingKey);

        //durable 是否持久化訊息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //引數:佇列名稱,是否自動ACK,Consumer
        channel.basicConsume(queueName,true,consumer);
        while (true) {
            //阻塞獲取訊息
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到訊息:"+msg);
        }

    }
}

先啟動消費者,然後發現下面兩個繫結關係:

在這裡插入圖片描述

佇列

在這裡插入圖片描述

再啟動生產者,從消費者端可以看到如下輸出:

收到訊息:Hello, I am Producer4TopicExchange
收到訊息:Hello, I am Producer4TopicExchange
收到訊息:Hello, I am Producer4TopicExchange

說明3條訊息都收到了,接下來,我們改一下消費者的路由鍵,改為:String routingKey = "user.*";

再次啟動

收到訊息:Hello, I am Producer4TopicExchange
收到訊息:Hello, I am Producer4TopicExchange
收到訊息:Hello, I am Producer4TopicExchange

奇怪,怎麼還是收到3條,帶著疑問我們去看控制檯

在這裡插入圖片描述
可以看到,之前的路由規則繫結還在。因此可以解釋為啥能收到3條。

點選unbind解綁"user.#"

然後繼續操作一把,檢視輸出

收到訊息:Hello, I am Producer4TopicExchange
收到訊息:Hello, I am Producer4TopicExchange

此時,只收到兩條了。

Fanout Exchange

  • 不處理路由鍵,只需要簡單的將佇列繫結到交換機上
  • 傳送到交換機的訊息都會被轉發到與該互動機繫結的所有佇列上
  • 轉發訊息是最快的

在這裡插入圖片描述

訊息不走路由鍵,只要佇列與交換機有繫結關係就能收到。

生產者:

public class Producer4FanoutExchange {
    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.getConn();

        Channel channel = connection.createChannel();

        //宣告
        String exchangeName = "test_fanout_exchange";
        String routingKey = "nothing";

        //傳送
        String msg = "Hello, I am Producer4FanoutExchange";
        channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());

        CloseTool.closeElegantly(channel,connection);
    }
}

消費者:


public class Consumer4FanoutExchange {
    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = ConnectionUtil.getConn();

        Channel channel = connection.createChannel();
        //宣告
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = "";//不設定路由鍵

        //宣告Exchange
        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        //宣告一個佇列
        channel.queueDeclare(queueName,false,false,false,null);
        //建立一個繫結關係
        channel.queueBind(queueName,exchangeName,routingKey);

        //durable 是否持久化訊息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //引數:佇列名稱,是否自動ACK,Consumer
        channel.basicConsume(queueName,true,consumer);
        while (true) {
            //阻塞獲取訊息
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到訊息:"+msg);
        }



    }
}

在這裡插入圖片描述

在這裡插入圖片描述

Binding

  • Exchange和Exchange、Queue之間的連線關係
  • Binding中可以包含RoutingKey或引數

Queue

  • 訊息佇列,實際儲存訊息

屬性

  • Durability:是否持久化
  • Auto delete: 若為yes,代表當最後一個監聽被移除後,該Queue會自動被刪除

Message

  • 伺服器和應用程式之間傳遞的資料
  • 本質就是一段資料,由Properties和Payload(Body)組成

屬性

  • delivery mode
  • headers(自定義屬性放到這裡面)
  • content_type
  • content_encoding
  • priority
  • correlation_id
  • reply_to:指令訊息失敗返回的佇列
  • expiration:過期時間
  • message_id:訊息ID

Producer:


public class Producer {
    public static final String MQ_HOST = "192.168.222.101";
    public static final String MQ_VHOST = "/";
    public static final int MQ_PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立一個ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(MQ_HOST);//配置host
        connectionFactory.setPort(MQ_PORT);//配置port
        connectionFactory.setVirtualHost(MQ_VHOST);//配置vHost

        //2. 通過連線工廠建立連線
        Connection connection = connectionFactory.newConnection();
        //3. 通過connection建立一個Channel
        Channel channel = connection.createChannel();

        Map<String,Object> headers = new HashMap<>();
        headers.put("var1","abc");
        headers.put("var2","sdd");
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .deliveryMode(2) //2:持久化投遞;1:非持久化(未消費的訊息重啟後就沒了)
                .contentEncoding("UTF-8")
                .expiration("5000")//5s
                .headers(headers)
                .build();

        //4. 通過Channel傳送資料
        for (int i = 0; i < 10; i++) {
            String message = "Hello" + i;
            //exchange為"",則通過routingKey取尋找佇列
            channel.basicPublish("","testQueue",properties,message.getBytes());
        }

        //5. 關閉連線
        channel.close();
        connection.close();

    }
}

consumer:

public class Consumer {


    public static final String QUEUE_NAME = "testQueue";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConn();

        //3. 通過connection建立一個Channel
        Channel channel = connection.createChannel();

        //4. 宣告(建立)一個佇列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        //5. 建立消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        //6. 設定Channel
        channel.basicConsume(QUEUE_NAME,true,queueingConsumer);

        int num = 0;
        //7. 獲取訊息
        while (true) {
            //nextDelivery 會阻塞直到有訊息過來
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("收到:" + message);
            Map<String,Object > headers = delivery.getProperties().getHeaders();
            System.out.println("headers get va1 :" + headers.get("var1"));
        }


    }
}

Virtual host - 虛擬主機

  • 虛擬地址,用於進行邏輯隔離,最上層的訊息路由
  • 一個Virtuall Host裡面可以有若干個Exchange和Queue
  • 同一個Virtual Host裡面不能有相同名稱的Exchange或Queue