1. 程式人生 > >RabbitMQ必備核心知識

RabbitMQ必備核心知識

黑洞 模式 private 自動 amp text 上一個 rac cep


  現在很多知名的互聯網公司都有用到RabbitMQ,其性能,可擴展性讓很多大公司青睞於使用它,不過想要完全使用好RabbitMQ需要掌握其核心的一些概念,這裏就說說掌握RabbitMQ所需的必要知識

  生產者與消費者

  生產者: 創建消息,然後發送到代理服務器(RabbitMQ)的程序

  消費者:連接到代理服務器,並訂閱到隊列上接收消息

  消息流程

  AMQP協議規定,AMQP消息必須有三部分,交換機,隊列和綁定。生產者把消息發送到交換機,交換機與隊列的綁定關系決定了消息如何路由到特定的隊列,最終被消費者接收。

技術分享圖片

  Note: 消息是不能直接到達隊列(Queue)的

  交換機

  消息實際上投遞到的是交換機,具體路由到那個隊列由交換機根據路由鍵(routing key)完成。

  · 當你發消息到代理服務器時,即便路由鍵是空的,RabbitMQ也會將其和使用的路由鍵進行匹配。如果路由的消息不匹配任何綁定模式,消息將會進入黑洞。

  交換機在隊列與消息中間起到了中間層的作用,有了交換機我們可以實現更靈活的功能,RabbitMQ中有三種常用的交換機類型:

  · direct: 如果路由鍵匹配,消息就投遞到對應的隊列

  · fanout:投遞消息給所有綁定在當前交換機上面的隊列

  · topic:允許實現有趣的消息通信場景,使得5不同源頭的消息能夠達到同一個隊列。topic隊列名稱有兩個特殊的關鍵字。

  o * 可以替換一個單詞

  o # 可以替換所有的單詞

  可以理解,direct為1v1, fanout為1v所有,topic比較靈活,可以1v任意。

技術分享圖片

  虛擬主機

  每一個虛擬主機(vhost)相當於mini版的RabbitMQ服務器,擁有自己的隊列,交換機和綁定,權限… 這使得一個RabbitMQ服務眾多的應用程序,而不會互相沖突。

  rabbitMQ默認的虛擬主機為: “/” ,一般我們在創建Rabbit的用戶時會再給用戶分配一個虛擬主機。

  操作虛擬主機,除了命令行之外還有一個web管理頁面

  #創建虛擬主機

  rabbitmqctl add vhost [vhost_name]

  #刪除虛擬主機

  rabbitmqctl delete vhost [vhost_name]

  #列出虛擬主機

  rabbitmqctl list_vhosts

技術分享圖片

  消息投遞策略

  默認情況下RabbitMQ的隊列和交換機在RabbitMQ服務器重啟之後會消失,原因在於隊列和交換機的durable屬性,該屬性默認情況下為false.

  能從AMQP服務器崩潰中恢復的消息稱為持久化消息,如果想要從崩潰中恢復那麽消息必須

  · 投遞模式設置2,來標記消息為持久化

  · 發送到持久化的交換機

  · 到到持久化的隊列

  缺點:消息寫入磁盤性能差很多。除非特別關鍵的消息會使用

  關鍵API

  以上都是概念性的內容,實際我們還是要通過編程來實現我們的目的,RabbitMQ的客戶端api提供了很多功能,通過看代碼,來了解它的強大之處。

  基本步驟之前的RabbitMQ快速入門已經提過了,Channel類是關鍵的部分:包含了很多我們想要的功能

技術分享圖片

  消息確認

  生成端可以添加監聽事件:

  channel.addConfirmListener(new ConfirmListener() {

  @Override

  public void handleNack(long deliveryTag, boolean multiple) throws IOException {

  System.err.println(-------no ack!-----------);

  }

  @Override

  public void handleAck(long deliveryTag, boolean multiple) throws IOException {

  System.err.println(-------ack!-----------);

  }

  });

  消費端可以確認消息狀態:

  public class MyConsumer extends DefaultConsumer {

  private Channel channel ;

  public MyConsumer(Channel channel) {

  super(channel);

  this.channel = channel;

  }

  @Override

  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

  System.err.println(-----------consume message----------);

  System.err.println(body: + new String(body));

  try {

  Thread.sleep(2000);

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  if((Integer)properties.getHeaders().get(num) == 0) {

  channel.basicNack(envelope.getDeliveryTag(), false, true);

  } else {

  channel.basicAck(envelope.getDeliveryTag(), false);

  }

  }

  }

  channel.basicAck與basicNack最後一個參數指定消息是否重回隊列。

  監聽不可達消息

  我們的消息生產者通過指定交換機和路由鍵來把消息送到隊列中,但有時候指定的路由鍵不存在,或者交換機不存在,那麽消息就會return,我們可以通過添加return listener來實現:

  channel.addReturnListener(new ReturnListener() {

  @Override

  public void handleReturn(int replyCode, String replyText, String exchange,

  String routingKey, BasicProperties properties, byte[] body) throws IOException {

  System.err.println(---------handle return----------);

  System.err.println(replyCode: + replyCode);

  System.err.println(replyText: + replyText);

  System.err.println(exchange: + exchange);

  System.err.println(routingKey: + routingKey);

  System.err.println(properties: + properties);

  System.err.println(body: + new String(body));

  }

  });

  channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());

  在basicPublish中的Mandatory要設置為true才會生效,否則broker會刪除該消息

  消費端限流

  假設MQ服務器上面囤積了成千上萬條的消息的時候,這個時候突然連接消費端,那麽巨量的消息全部推過來,但是客戶端無法一次性處理這麽多的數據。

  在高並發的時候,瞬間產生的流量很大,消息很大,而MQ有個重要的作用就是限流,限流則是消費端做的。

  RabbitMQ提供了一種Qos(服務質量保證)功能,即在非自動確認消息的前提下,在一定數量的消息未被消費前,不進行消費新的消息。

  // prefetchSize消息的限制大小,一般設置為0,在生產端限制

  // prefetchCount 我們一次最多消費多少條消息,一般設置為1

  // global,一般設置為false,在消費端進行限制

  channel.basicQos(int prefetchSize, int prefetchCount, boolean global)

  // 使用

  channel.basicQos(0, 1, false);

  channel.basicConsume(queueName, false, new MyConsumer(channel));

  Note: autoAck設置為false, 一定要手工簽收消息

  死信隊列(DLX)

  當消息在隊列中變成死信,沒有消費者進行消費的時候,消息可能會被重新發布到另外一個隊列中,這個隊列就是死信隊列。

  以下情況會導致消息進入死信隊列:

  · basic.reject/basic.nack 並且 requeue為false(不重回隊列)的時候,消息就是死信

  · 消息TTL過期

  · 隊列達到最大的長度

  死信隊列也是正常的Exchange,和一般的Exchange沒什麽區別,不過要做一點操作。

  設置死信隊列包括:

  · 設置Exchange(dlx.exchange名稱隨意),設置Queue(dlx.queue),設置RoutingKey(#)

  · 創建正常的交換機,隊列,綁定,只不過加上一個參數 arguments.put(“x-dead-letter-exchange”,“dlx.exchange”)

  // 這就是一個普通的交換機 和 隊列 以及路由

  String exchangeName = test_dlx_exchange;

  String routingKey = dlx.#;

  String queueName = test_dlx_queue;

  channel.exchangeDeclare(exchangeName, topic, true, false, null);

  Map agruments = new HashMap();

  agruments.put(x-dead-letter-exchange, dlx.exchange);

  //這個agruments屬性,要設置到聲明隊列上

  channel.queueDeclare(queueName, true, false, false, agruments);

  channel.queueBind(queueName, exchangeName, routingKey);

  //要進行死信隊列的聲明:

  channel.exchangeDeclare(dlx.exchange, topic, true, false, null);

  channel.queueDeclare(dlx.queue, true, false, false, null);

  channel.queueBind(dlx.queue, dlx.exchange, #);

  最後

  這裏主要講了一些使用RabbitMQ中經常涉及到的概念,懂了概念,在進行應用的時候才不至於糊塗。然後列舉了MQ的Java客戶端重要的幾個API。

?

RabbitMQ必備核心知識