java併發程式設計一一執行緒池原理分析(一)
1、併發包
1、CountDownLatch(計數器)
CountDownLatch 類位於 java.util.concurrent 包下,利用它可以實現類似於計數器的功能。
比如有一個任務A,它要等待其他4個任務執行完成之後才能執行,此時就可以利用CountDownLatch
來實現這種功能了。CountDownLatch是通過一個計數器來實現的,計數器的初始值為執行緒的數量。
每當一個執行緒完成了自己的任務後,計數器的值就會減1,當計數器的值到達0 時,他表示所有的執行緒已經完成了任務,然後在閉鎖上等待的執行緒就可以恢復執行任務了。
package com.javagi.test.thread; import java.util.concurrent.CountDownLatch; /** * @author kuiwang Created by Administrator on 2018/10/19. */ public class CountDownLatchTest { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(2); new Thread(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + ",子執行緒開始執行..."); countDownLatch.countDown(); System.out.println(Thread.currentThread().getName() + ",子執行緒結束執行..."); } }).start(); new Thread(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + ",子執行緒開始執行..."); countDownLatch.countDown();// 計數器值每次減去1 System.out.println(Thread.currentThread().getName() + ",子執行緒結束執行..."); } }).start(); countDownLatch.await();// 減去為0,恢復任務繼續執行 System.out.println("兩個子執行緒執行完畢...."); System.out.println("主執行緒繼續執行....."); for (int i = 0; i < 10; i++) { System.out.println("main,i:" + i); } } }
2、CyclicBarrier(屏障)
CyclicBarrier初始化時規定一個數目,然後計算呼叫了CyclicBarrier.wait() 進入等帶的執行緒數。
當執行緒數達到了這個數目時,所有進入等帶狀態的執行緒數被喚醒並繼續。
CyclicBarrier 就像他的名字一樣,可以看成是個障礙,所有的執行緒必須到齊後才能一起通過這
障礙。
CyclicBarrier初始化時還可以帶一個Runnable的引數,此Runnable任務在CyclicBarrier的數目達到後,
所有其它執行緒被喚醒前被執行。
package com.javagi.test.thread; import java.util.concurrent.CyclicBarrier; class Writer extends Thread { private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("執行緒" + Thread.currentThread().getName() + ",正在寫入資料"); try { Thread.sleep(3000); } catch (Exception e) { // TODO: handle exception } System.out.println("執行緒" + Thread.currentThread().getName() + ",寫入資料成功....."); try { cyclicBarrier.await(); } catch (Exception e) { } System.out.println("所有執行緒執行完畢.........."); } } public class CyclicBarrierTest { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(5); for (int i = 0; i < 5; i++) { Writer writer = new Writer(cyclicBarrier); writer.start(); } } }
3、Semaphore(計數訊號量)
Semaphore是一種基於計數的訊號量。它可以設定一個閾值,基於此,多個執行緒競爭獲取許可訊號,
做自己的申請後歸還,超過閾值後,執行緒申請許可訊號將會被阻塞。Semaphore可以用來構建一些
物件池和資源池之類的,比如資料連線池,我們也可以建立計數為1 的Semaphore,將其作為
一種類似於互斥鎖的機制,這也叫二元訊號量,便是兩種互斥狀態。用法如下:
avaliablePermits 函式用來獲取當前可用的資源數量
wc.acquire(); // 申請資源
wc.release(); // 釋放資源
package com.javagi.test.thread; import java.util.concurrent.Semaphore; /** * 可以用來控制程式的併發量 * 當多餘的執行緒進來時 在後面排隊等待執行。 * @author kuiwang * */ public class SemaphoreTest { public static void main(String[] args) { // 建立一個計數閾值為5的訊號量物件 // 只能5個執行緒同時訪問 Semaphore semp = new Semaphore(5); try { // 申請許可 semp.acquire(); try { // 業務邏輯 } catch (Exception e) { } finally { // 釋放許可 semp.release(); } } catch (InterruptedException e) { } } }
案例
需求: 一個廁所只有3個坑位,但是有10個人來上廁所,那怎麼辦?假設10的人的編號分別為1-10,並且1號先到廁所,10號最後到廁所。那麼1-3號來的時候必然有可用坑位,順利如廁,4號來的時候需要看看前面3人是否有人出來了,如果有人出來,進去,否則等待。同樣的道理,4-10號也需要等待正在上廁所的人出來後才能進去,並且誰先進去這得看等待的人是否有素質,是否能遵守先來先上的規則。
程式碼示例:
package com.javagi.test.thread;
import java.util.Random;
import java.util.concurrent.Semaphore;
class ThradDemo001 extends Thread {
private String name;
private Semaphore wc;
public ThradDemo001(String name, Semaphore wc) {
this.name = name;
this.wc = wc;
}
@Override
public void run() {
// 剩下的資源
int availablePermits = wc.availablePermits();
if (availablePermits > 0) {
System.out.println(name + "天助我也,終於有茅坑了.....");
} else {
System.out.println(name + "怎麼沒有茅坑了...");
}
try {
// 申請資源
wc.acquire();
} catch (InterruptedException e) {
}
System.out.println(name + "終於上廁所啦.爽啊" + ",剩下廁所:" + wc.availablePermits());
try {
Thread.sleep(new Random().nextInt(1000));
} catch (Exception e) {
// TODO: handle exception
}
System.out.println(name + "廁所上完啦!");
// 釋放資源
wc.release();
}
}
public class SemaphoreTest2 {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 10; i++) {
ThradDemo001 thradDemo001 = new ThradDemo001("第" + i + "個人", semaphore);
thradDemo001.start();
}
}
}
4、併發佇列
在併發佇列上JDK提供了兩套實現,一個是ConcurrentLinkedQueue 為代表的高效能佇列非阻塞,一個是
以BlockingQueue介面為代表的阻塞佇列,無論哪種都繼承Queue。
1、阻塞佇列與非阻塞佇列
阻塞佇列與普通佇列的區別在於,當佇列是空的時候,從佇列中獲取元素的操作將會被阻塞,或者當佇列是
滿時,往佇列裡面新增元素的操作會內阻塞,檢視從空的阻塞佇列中獲取元素的執行緒將會被阻塞,直到其它的執行緒往空的佇列插入新的元素。同樣,檢視往已滿餓阻塞佇列中新增新元素的執行緒同時也會被阻塞,直到
其它的執行緒使佇列重新變得空閒起來,如從佇列中移除一個或者多個元素,或者完全清空佇列。
- ArrayDeque 陣列雙端佇列
- PriorityQueue 優先順序佇列
- ConcurrentLinkedQueue 基於連結串列的併發佇列
- DelayQueue 延期阻塞佇列 阻塞佇列實現了BlockingQueue介面
- ArrayBlockingQueue 基於資料的併發阻塞佇列
- LinkedBlockingQueue 基於連結串列的FIFO 阻塞佇列
- LlinkedBlockingDeque 基於連結串列的FIFO雙端阻塞佇列
- PriorityBlockingDeque 待優先順序的無界阻塞佇列
- SynchronousQueue 併發同步阻塞佇列
2、ConcurrentLinkedDeque
ConcurrentLinkedDeque:是一個使用於高併發場景下的佇列,通過無鎖的方式,實現了高併發狀態下的
高效能,通常ConcurrentLinkedDeque 效能好於 BlockingQueue 。它是一個基於連結節點的無界執行緒安全佇列。該佇列的元素遵循了先進先出的原則, 頭是最先加入的,尾是最近加入的,該佇列不允許null元素。
ConcurrentLinkedDeque重要方法:
add 和 offer () 都是加入元素的方法(在ConcurrentLinkedDeque 中這兩個方法沒有任何區別)
poll 和 peek() 都是取頭元素節點, 區別在於前者會刪除元素,後者不會。
public class ConcurrentLinkedDequeTest {
public static void main(String[] args) {
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
q.offer("若成風");
q.offer("碼雲");
q.offer("javagi");
q.offer("張傑");
q.offer("艾姐");
//從頭獲取元素,刪除該元素
System.out.println(q.poll());
//獲取總長度
System.out.println(q.size());
//從頭獲取元素,不刪除該元素
System.out.println(q.peek());
//獲取總長度
System.out.println(q.size());
}
}
3、BlockingQueue
阻塞佇列BlockingQueue:是一個支援兩個附件操作的佇列。這兩個附件的操作是:
在佇列為空時,獲取元素的執行緒會等待佇列變為非空。
當佇列滿時,儲存元素的執行緒會等待佇列可用。
阻塞佇列通常用於生產者和消費者的場景, 生產者是往佇列裡面新增元素的執行緒,
消費者是從佇列裡面拿出元素的執行緒。阻塞佇列就是生產者存放元素的容器,而消費者
也只是從容器裡拿出元素
BlockingQueue 即阻塞佇列,從阻塞這個詞可以看出,在某些情況下對阻塞佇列的訪問可能會造成阻塞。
被阻塞的情況主要分為兩種:
- 當佇列滿了的時候進行入佇列的操作
- 當佇列空了的時候進行初初佇列的操作
因此,當一個執行緒試圖對一個已經滿了的佇列進行入佇列操作時,他將會被阻塞,除非有另一個執行緒做了
出佇列的操作,同樣。當一個執行緒檢視對一個空佇列進行出佇列操作時,他將會被阻塞, 除非有另一個執行緒進行了入佇列的操作。
在Java中,BlockingQueue的介面位於java.util.concurrent 包中(在Java5版本開始提供),由上面介紹的阻塞佇列的特性可知,阻塞佇列是執行緒安全的。
在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高效並且執行緒安全的佇列類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場景。
認識BlockingQueue
阻塞佇列,顧名思義,首先它是一個佇列,而一個佇列在資料結構中所起的作用大致如下圖所示:
從上圖我們可以很清楚看到,通過一個共享的佇列,可以使得資料由佇列的一端輸入,從另外一端輸出;
常用的佇列主要有以下兩種:(當然通過不同的實現方式,還可以延伸出很多不同型別的佇列,DelayQueue就是其中的一種)
先進先出(FIFO):先插入的佇列的元素也最先出佇列,類似於排隊的功能。從某種程度上來說這種佇列也體現了一種公平性。
後進先出(LIFO):後插入佇列的元素最先出佇列,這種佇列優先處理最近發生的事件。
多執行緒環境中,通過佇列可以很容易實現資料共享,比如經典的“生產者”和“消費者”模型中,通過佇列可以很便利地實現兩者之間的資料共享。假設我們有若干生產者執行緒,另外又有若干個消費者執行緒。如果生產者執行緒需要把準備好的資料共享給消費者執行緒,利用佇列的方式來傳遞資料,就可以很方便地解決他們之間的資料共享問題。但如果生產者和消費者在某個時間段內,萬一發生資料處理速度不匹配的情況呢?理想情況下,如果生產者產出資料的速度大於消費者消費的速度,並且當生產出來的資料累積到一定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者執行緒),以便等待消費者執行緒把累積的資料處理完畢,反之亦然。然而,在concurrent包釋出以前,在多執行緒環境下,我們每個程式設計師都必須去自己控制這些細節,尤其還要兼顧效率和執行緒安全,而這會給我們的程式帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多執行緒領域:所謂阻塞,在某些情況下會掛起執行緒(即阻塞),一旦條件滿足,被掛起的執行緒又會自動被喚醒)
下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景:
1.ArrayBlockingQueue
ArrayBlockingQueue 是一個有邊界的阻塞佇列,他的內部實現是一個數組,有邊界的意思是他的容量是有險的
我們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可以改變。
ArrayBlockingQueue 是以先進先出的方式儲存資料,最新插入的物件是尾部,最新移出的物件是頭部,下面是初始化和使用ArrayBlockingQueue 的例子:
public class ArrayBlockingQueueTest {
public static void main(String[] args) {
ArrayBlockingQueue<String> arrays = new ArrayBlockingQueue<String>(3);
arrays.add("張三");
arrays.add("李四");
arrays.add("王五");
// 新增阻塞佇列
try {
arrays.offer("若成風", 1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2.LinkedBlockingQueue
LinkedBlockingQueue阻塞佇列大小的配置是可選的,如果我們初始化時指定一個大小,它就是有邊界的,如果不指定,它就是無邊界的。說是無邊界,其實是採用了預設大小為Integer.MAX_VALUE的容量 。它的內部實現是一個連結串列。
和ArrayBlockingQueue一樣,LinkedBlockingQueue 也是以先進先出的方式儲存資料,最新插入的物件是尾部,最新移出的物件是頭部。下面是一個初始化和使LinkedBlockingQueue的例子:
LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3);
linkedBlockingQueue.add("張三");
linkedBlockingQueue.add("李四");
linkedBlockingQueue.add("李四");
System.out.println(linkedBlockingQueue.size());
3.PriorityBlockingQueue
PriorityBlockingQueue是一個沒有邊界的佇列,它的排序規則和 java.util.PriorityQueue一樣。需要注
意,PriorityBlockingQueue中允許插入null物件。
所有插入PriorityBlockingQueue的物件必須實現 java.lang.Comparable介面,佇列優先順序的排序規則就
是按照我們對這個介面的實現來定義的。
另外,我們可以從PriorityBlockingQueue獲得一個迭代器Iterator,但這個迭代器並不保證按照優先順序順
序進行迭代。
下面我們舉個例子來說明一下,首先我們定義一個物件型別,這個物件需要實現Comparable介面:
4. SynchronousQueue
SynchronousQueue佇列內部僅允許容納一個元素。當一個執行緒插入一個元素後會被阻塞,除非這個元素被另一個執行緒消費。
4、使用BlockingQueue模擬生產者和消費者
package com.javagi.threadpool;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by Administrator on 2018/10/25.
*/
class ProducerThread implements Runnable {
private BlockingQueue<String> blockingQueue;
private AtomicInteger count = new AtomicInteger();
private volatile boolean FLAG = true;
public ProducerThread(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "生產者開始啟動。。。");
while (true) {
String data = count.incrementAndGet() + "";
try {
boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
if (offer) {
System.out.println(Thread.currentThread().getName() + ",生產佇列" + data + "成功..");
} else {
System.out.println(Thread.currentThread().getName() + ",生產佇列" + data + "失敗..");
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "生產者執行緒結束。。。");
}
}
public void stop() {
this.FLAG = FLAG;
}
}
class ConsumerThread implements Runnable {
private volatile boolean FLAG = true;
private BlockingQueue<String> blockingQueue;
public ConsumerThread(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "消費者開始啟動....");
while (FLAG) {
try {
String data = blockingQueue.poll(2, TimeUnit.SECONDS);
if (data == null || data == "") {
FLAG = false;
System.out.println("消費者超過2秒時間未獲取到訊息.");
return;
}
System.out.println("消費者獲取到佇列資訊成功,data:" + data);
} catch (Exception e) {
// TODO: handle exception
}
}
}
}
public class BlockingQueueTest {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(3);
ProducerThread producerThread = new ProducerThread(blockingQueue);
ConsumerThread consumerThread = new ConsumerThread(blockingQueue);
Thread t1 = new Thread(producerThread);
Thread t2 = new Thread(consumerThread);
t1.start();
t2.start();
//10秒後 停止執行緒..
try {
Thread.sleep(10*1000);
producerThread.stop();
} catch (Exception e) {
// TODO: handle exception
}
}
}