1. 程式人生 > >RabbitMQ學習(二)工作隊列

RabbitMQ學習(二)工作隊列

lose borde 阻塞 lpad mes getc actor 使用 處理

1.工作隊列(Work Queue)又叫任務隊列(Task Queue)指將任務分發個多個消費者。

2.實際操作:

這裏使用一個生產者產生多條數據提供給3個消費者

生產者代碼:

public class Producter {
//隊列名稱
private final static String QUEUE_NAME = "Work_Queue";
public static void main(String[] args) throws IOException, TimeoutException {
//

配置rabbitmq服務器地址
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("starktan");
factory.setPassword("starktan");
factory.setVirtualHost("/");
//建立連接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//聲明隊列,可以手動在mq中創建
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//寫入10條數據(直接循環寫入)
for (int i = 0; i < 10; i++) {
System.out.println("
發送第" + i + "信息!");
String message = "WorkQueue Message number " + i + " " + System.currentTimeMillis();
channel.basicPublish("", QUEUE_NAME, true, null, message.getBytes());
}
channel.close();
connection.close();
}
}

消費者代碼

public class Consumer {
//隊列名稱
private final static String QUEUE_NAME = "Work_Queue";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
//創建連接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
ExecutorService service = Executors.newFixedThreadPool(10);
for(int i=0;i<3;i++){
final int cur = i;
service.submit(new Runnable() {
Channel channel = connection.createChannel();
public void run() {
//創建隊列消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消費隊列
try {
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true)
{
//nextDelivery是一個阻塞方法(內部實現其實是阻塞隊列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("線程 "+cur+" 獲取到消息 " + message + "開始處理");
Thread.sleep(1000*(cur+5)*2);
System.out.println("線程 "+cur+" "+message + "處理完成");
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
service.shutdown();
}
}


技術分享運行效果:(消費者循環調度)

3.消息確認:

在處理一個比較耗時的任務的時候,如果消費者在中途崩潰掉,則對應的這條數據就丟失了,為了避免消息丟失的情況,RabbitMQ提供了消息確認

使用兩個消費者進行演示,調用方法

public void getConsum() throws IOException, TimeoutException, InterruptedException {
//創建連接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//創建隊列消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消費隊列,且改為手動確認
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 獲取到消息 " + message + "開始處理");
try{
Thread.sleep(10000);
}catch (InterruptedException e){}
finally {
channel.basicAck(delivery.getEnvelope().getDeliveryTag()
, false);
}
System.out.println( message + "處理完成");
}
}

技術分享

手動關掉一個消費者

技術分享

消息被另一個消費者繼續進行處理;

4.公平調度:

        channel.basicQos(1);//保證一次只分發一個

5.持久化: 保證當RabbitMQ服務器崩潰關機也不會造成消息丟失

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

第二個參數改為true

RabbitMQ學習(二)工作隊列