1. 程式人生 > >RabbitMQ(二) ——工作隊列

RabbitMQ(二) ——工作隊列

bin 生產 就是 發送 定時 順序 -h 預取 list

RabbitMQ(二)

——工作隊列

(轉載請附上本文鏈接——linhxx)

一、概述

工作隊列模式(work queue),是有多個消費者的情況下,可以共同消費隊列內的內容,加快消息處理速度。這是RabbitMQ的基本工作模式。

技術分享圖片

二、使用方式

和上一篇中的生產和消費消息的方式一樣,就是需要多在cli進程中打開一個消費者的php文件。即需要打開3個php,一個是生產者的php文件,兩個消費者的php文件(或多個php文件)。

三、工作機制

3.1 輪詢(Round-robin dispatching)

當開啟多個生產者的時候,消費者產生消息並發送到隊列的情況下,隊列會將消息均衡的分發給同時打開的多個消費者。即采用輪詢的方式,假設有兩個消費者c1和c2,第一次有消息給c1,第二次有消息給c2,第三個再給c1,以此類推。

3.2 回饋機制(Message acknowledgment)

為了保證消息的可靠性,RabbitMQ允許用戶采用消費者的ack機制,即只有消費者回饋給隊列ack後,隊列才會將消息從隊列中剔除。這樣,可以確保隊列中的每個消息都是確認被消費者處理完畢的。

開啟的方式,只要將方法basic_consume第四個參數設置成false,就表示開啟ack機制。

開啟ack,就必須要記得在消費者的代碼總,加入回饋的代碼,否則,消息會被隊列認為沒有消費,不斷的堵在隊列中,導致隊列堵塞。

要查看隊列中還沒確認的內容,可以采用RabbitMQ的管理工具——rabbitmqctl。

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

3.3 消息持久化(Message durability)

RabbitMQ具有完善的持久化機制,能夠確保消息的安全性。可以在代碼中開啟持久化。消息持久化需要在隊列和消息分別開啟持久化。

1)隊列持久化:

queue_declare的第三個參數設置為true。

2)消息持久化:

$msg = new AMQPMessage($data,

array(‘delivery_mode‘ => AMQPMessage::DELIVERY_MODE_PERSISTENT)

);

這樣可以確保消息可以保存在本地磁盤,因此即使rabbitmq的服務器宕機的時候,也可以保證消息的安全性。

當然,這也也存在一定的風險,因為操作系統自身的機制,開啟持久化的時候,操作系統為了保證運作速度,消息會保存在操作系統層面的緩存,並且定時將消息存入硬盤。因此,還沒存入磁盤的這一小段間隔(30秒左右),如果服務器宕機,有可能消息丟失。但是這個可能性很低,因此安全性已經很高。

3.4 公平分發(Fair dispatch)與預取機制(prefetch)

消息的輪詢機制,每次有消息,隊列則直接按照排好的順序將消息傳給對應的隊列,有可能出現一部分隊列上一條消息還沒處理完,就出現了下一條的消息,而另外一部分隊列則空閑的情況。

因此,RabbitMQ允許用戶開啟公平分發機制,讓每個消費者只能接收一個消息,如果還沒給隊列回復ack,則消息來的時候,即使順序輪到隊列,也不會分發給它,而是分發給它下一個空隊列。

開啟方式很簡單,在消費者的channel,在消費消息之前(使用basic_consume方法),給channel加一個qos機制:

$channel->basic_qos(null, 1, null);

第二個參數就是要求消費者每次只能處理幾個消息。

這樣也存在一個隱患,即如果所有的消費者都還沒ack,而生產者又不斷的往隊列發數據,則隊列有可能會塞滿。這個處理方案只有增加消費者,或者從代碼、邏輯層面控制消息的產生速度。

——written by linhxx

更多最新文章,歡迎關註微信公眾號“決勝機器學習”,或掃描右邊二維碼。技術分享圖片

RabbitMQ(二) ——工作隊列