RabbitMQ學習(二)工作隊列
阿新 • • 發佈:2017-07-13
lose borde 阻塞 lpad mes getc actor 使用 處理
1.工作隊列(Work Queue)又叫任務隊列(Task Queue)指將任務分發個多個消費者。
2.實際操作:
這裏使用一個生產者產生多條數據提供給3個消費者
生產者代碼:
public class Producter { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("starktan"); factory.setPassword("starktan"); factory.setVirtualHost("/"); //建立連接和通道 Channel channel = connection.createChannel(); //聲明隊列,可以手動在mq中創建 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //寫入10條數據(直接循環寫入) for (int i = 0; i < 10; i++) { System.out.println(" String message = "WorkQueue Message number " + i + " " + System.currentTimeMillis(); channel.basicPublish("", QUEUE_NAME, true, null, message.getBytes()); } channel.close(); connection.close(); } } |
消費者代碼
public class Consumer { //隊列名稱 private final static String QUEUE_NAME = "Work_Queue"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { //創建連接和通道 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); ExecutorService service = Executors.newFixedThreadPool(10); for(int i=0;i<3;i++){ final int cur = i; service.submit(new Runnable() { Channel channel = connection.createChannel(); public void run() { //創建隊列消費者 QueueingConsumer consumer = new QueueingConsumer(channel); //指定消費隊列 try { channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { //nextDelivery是一個阻塞方法(內部實現其實是阻塞隊列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("線程 "+cur+" 獲取到消息 " + message + "開始處理"); Thread.sleep(1000*(cur+5)*2); System.out.println("線程 "+cur+" "+message + "處理完成"); } } catch (Exception e) { e.printStackTrace(); } } }); } service.shutdown(); } } |
運行效果:(消費者循環調度)
3.消息確認:
在處理一個比較耗時的任務的時候,如果消費者在中途崩潰掉,則對應的這條數據就丟失了,為了避免消息丟失的情況,RabbitMQ提供了消息確認
使用兩個消費者進行演示,調用方法
public void getConsum() throws IOException, TimeoutException, InterruptedException { |
手動關掉一個消費者
消息被另一個消費者繼續進行處理;
4.公平調度:
channel.basicQos(1);//保證一次只分發一個
5.持久化: 保證當RabbitMQ服務器崩潰關機也不會造成消息丟失
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
第二個參數改為true
RabbitMQ學習(二)工作隊列