多執行緒消費阻塞佇列(生產者消費者模型)
阿新 • • 發佈:2019-01-22
一.幾種主要的阻塞佇列
ArrayBlockingQueue:基於陣列實現的一個阻塞佇列,在建立ArrayBlockingQueue物件時必須制定容量大小。並且可以指定公平性與非公平性,預設情況下為非公平的,即不保證等待時間最長的佇列最優先能夠訪問佇列。
LinkedBlockingQueue:基於連結串列實現的一個阻塞佇列,在建立LinkedBlockingQueue物件時如果不指定容量大小,則預設大小為Integer.MAX_VALUE。
PriorityBlockingQueue:以上2種佇列都是先進先出佇列,而PriorityBlockingQueue卻不是,它會按照元素的優先順序對元素進行排序,按照優先順序順序出隊,每次出隊的元素都是優先順序最高的元素。注意,此阻塞佇列為無界阻塞佇列,即容量沒有上限(通過原始碼就可以知道,它沒有容器滿的訊號標誌),前面2種都是有界佇列。
DelayQueue:基於PriorityQueue,一種延時阻塞佇列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素。DelayQueue也是一個無界佇列,因此往佇列中插入資料的操作(生產者)永遠不會被阻塞,而只有獲取資料的操作(消費者)才會被阻塞。
- LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。
- LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列
阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作是:在佇列為空時,獲取元素的執行緒會等待佇列變為非空。當佇列滿時,儲存元素的執行緒會等待佇列可用。阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒。阻塞佇列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。
阻塞佇列提供了四種處理方法:
方法\處理方式 | 丟擲異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
- 丟擲異常:是指當阻塞佇列滿時候,再往佇列裡插入元素,會丟擲IllegalStateException(“Queue full”)異常。當佇列為空時,從佇列裡獲取元素時會丟擲NoSuchElementException異常 。
- 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從佇列裡拿出一個元素,如果沒有則返回null
- 一直阻塞:當阻塞佇列滿時,如果生產者執行緒往佇列裡put元素,佇列會一直阻塞生產者執行緒,直到拿到資料,或者響應中斷退出。當佇列空時,消費者執行緒試圖從佇列裡take元素,佇列也會阻塞消費者執行緒,直到佇列可用。
- 超時退出:當阻塞佇列滿時,佇列會阻塞生產者執行緒一段時間,如果超過一定的時間,生產者執行緒就會退出。
1、訊息生產者
package com.es.queue;
/**
* Created by Administrator on 2018/7/1 0001.
*/
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 生產者執行緒
*
* @author jackyuj
*/
public class Producer implements Runnable {
private volatile boolean isRunning = true;//是否在執行標誌
private BlockingQueue queue;//阻塞佇列
private static AtomicInteger count = new AtomicInteger();//自動更新的值
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
//建構函式
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
String data = null;
Random r = new Random();
System.out.println("啟動生產者執行緒!");
try {
while (isRunning) {
System.out.println("正在生產資料...");
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一個隨機數
data = "data:" + count.incrementAndGet();//以原子方式將count當前值加1
System.out.println("將資料:" + data + "放入佇列...");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//設定的等待時間為2s,如果超過2s還沒加進去返回true
System.out.println("放入資料失敗:" + data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出生產者執行緒!");
}
}
public void stop() {
isRunning = false;
}
}
2、訊息消費者
package com.es.queue;
/**
* Created by Administrator on 2018/7/1 0001.
*/
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 消費者執行緒
*
* @author jackyuj
*/
public class Consumer implements Runnable {
private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
//建構函式
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
System.out.println("啟動消費者執行緒!");
Random r = new Random();
boolean isRunning = true;
try {
while (isRunning) {
System.out.println("正從佇列獲取資料...");
String data = queue.poll(2, TimeUnit.SECONDS);//有資料時直接從佇列的隊首取走,無資料時阻塞,在2s內有資料,取走,超過2s還沒資料,返回失敗
if (null != data) {
System.out.println("拿到資料:" + data);
System.out.println("正在消費資料:" + data);
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
} else {
// 超過2s還沒資料,認為所有生產執行緒都已經退出,自動退出消費執行緒。
isRunning = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出消費者執行緒!");
}
}
}
3、測試程式入口
package com.es.queue;
/**
* Created by Administrator on 2018/7/1 0001.
*/
import com.es.queue.Consumer;
import com.es.queue.Producer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
// 宣告一個容量為10的快取佇列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
//new了三個生產者和一個消費者
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue);
// 藉助Executors
ExecutorService service = Executors.newCachedThreadPool();
// 啟動執行緒
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer);
// 執行10s
Thread.sleep(10 * 1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(2000);
// 退出Executor
service.shutdown();
}
}