1. 程式人生 > 其它 >新浪網面試官:說說Java併發程式設計中CountDownLatch原理與應用

新浪網面試官:說說Java併發程式設計中CountDownLatch原理與應用

前言

控制併發流程的工具類,作用就是幫助我們程式設計師更容易的讓執行緒之間合作,讓執行緒之間相互配合來滿足業務邏輯。比如讓執行緒A等待執行緒B執行完畢後再執行等合作策略。

控制併發流程的工具類主要有:

簡介

背景

  • CountDownLatch是在Java1.5被引入,跟它一起被引入的工具類還有CyclicBarrier、Semaphore、ConcurrenthashMap和BlockingQueue。
  • 在java.util.cucurrent包下。

概念

  • CountDownLatch是一個同步計數器,他允許一個或者多個執行緒在另外一組執行緒執行完成之前一直等待,基於AQS共享模式實現的。
  • 是通過一個計數器來實現的,計數器的初始值是執行緒的數量。每當一個執行緒執行完畢後,計數器的值就-1,當計數器的值為0時,表示所有執行緒都執行完畢,然後在閉鎖上等待的執行緒就可以恢復工作來。

Java併發程式設計實戰筆記,感興趣的可以補補!

應用場景

Zookeeper分散式鎖,Jmeter模擬高併發等

場景1 讓多個執行緒等待:模擬併發,讓併發執行緒一起執行

為了模擬高併發,讓一組執行緒在指定時刻(秒殺時間)執行搶購,這些執行緒在準備就緒後,進行等待(CountDownLatch.await()),直到秒殺時刻的到來,然後一擁而上。這也是本地測試介面併發的一個簡易實現。

在這個場景中,CountDownLatch充當的是一個發令槍的角色;就像田徑賽跑時,運動員會在起跑線做準備動作,等到發令槍一聲響,運動員就會奮力奔跑。和上面的秒殺場景類似。

程式碼實現如下

packagecom.niuh.tools;

importjava.util.concurrent.CountDownLatch;

/**
*<p>
*CountDownLatch示例
*場景1讓多個執行緒等待:模擬併發,讓併發執行緒一起執行
*</p>
*/
publicclassCountDownLatchRunner1{

publicstaticvoidmain(String[]args)throwsInterruptedException{
CountDownLatchcountDownLatch=newCountDownLatch(1);
for(inti=0;i<5;i++){
newThread(()->{
try{
//準備完畢……運動員都阻塞在這,等待號令
countDownLatch.await();
Stringparter="【"+Thread.currentThread().getName()+"】";
System.out.println(parter+"開始執行……");
}catch(InterruptedExceptione){
e.printStackTrace();
}
}).start();
}

Thread.sleep(2000);//裁判準備發令
countDownLatch.countDown();//發令槍:執行發令
}
}

執行結果

【Thread-2】開始執行……
【Thread-4】開始執行……
【Thread-3】開始執行……
【Thread-0】開始執行……
【Thread-1】開始執行……

我們通過CountDownLatch.await(),讓多個參與者執行緒啟動後阻塞等待,然後在主執行緒 呼叫CountDownLatch.countdown(1) 將計數減為0,讓所有執行緒一起往下執行;以此實現了多個執行緒在同一時刻併發執行,來模擬併發請求的目的。

場景2 讓單個執行緒等待:多個執行緒(任務)完成後,進行彙總合併

很多時候,我們的併發任務,存在前後依賴關係;比如資料詳情頁需要同時呼叫多個介面獲取資料,併發請求獲取到資料後、需要進行結果合併;或者多個數據操作完成後,需要資料check;這其實都是:在多個執行緒(任務)完成後,進行彙總合併的場景。

程式碼實現如下

packagecom.niuh.tools;

importjava.util.concurrent.CountDownLatch;
importjava.util.concurrent.ThreadLocalRandom;

/**
*<p>
*CountDownLatch示例
*場景2讓單個執行緒等待:多個執行緒(任務)完成後,進行彙總合併
*</p>
*/
publicclassCountDownLatchRunner2{

publicstaticvoidmain(String[]args)throwsInterruptedException{
CountDownLatchcountDownLatch=newCountDownLatch(5);
for(inti=0;i<5;i++){
finalintindex=i;
newThread(()->{
try{
Thread.sleep(1000+ThreadLocalRandom.current().nextInt(1000));
System.out.println("finish"+index+Thread.currentThread().getName());
countDownLatch.countDown();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}).start();
}

countDownLatch.await();//主執行緒在阻塞,當計數器==0,就喚醒主執行緒往下執行。
System.out.println("主執行緒:在所有任務執行完成後,進行結果彙總");
}
}

執行結果

finish3Thread-3
finish0Thread-0
finish1Thread-1
finish4Thread-4
finish2Thread-2
主執行緒:在所有任務執行完成後,進行結果彙總

在每個執行緒(任務) 完成的最後一行加上CountDownLatch.countDown(),讓計數器-1;當所有執行緒完成-1,計數器減到0後,主執行緒往下執行彙總任務。

原始碼分析

本文基於JDK1.8

CountDownLatch 類圖

從圖中可以看出CountDownLatch是基於Sync類實現的,而Sync繼承AQS,使用的是AQS共享模式。

其內部主要變數和方法如下:

在我們方法中呼叫 awit() 和 countDown() 的時候,發生了幾個關鍵的呼叫關係,如下圖所示:

其與AQS互動原理如下:

建構函式

CountDownLatch類中只提供了一個構造器,引數count為計數器的大小

publicCountDownLatch(intcount){
if(count<0)thrownewIllegalArgumentException("count<0");
this.sync=newSync(count);
}

這裡需要注意,設定state的數量只有在初始化CountDownLatch的時候,如果該state被減成了0,就無法繼續使用這個CountDownLatch了,需要重新new一個,這就是這個類不可重用的原因,有另一個類也實現了類似的功能,但是可以重用,就是CyclicBarrier。

內部同步器

privatestaticfinalclassSyncextendsAbstractQueuedSynchronizer{
privatestaticfinallongserialVersionUID=4982264981922014374L;
//初始化,設定資源個數
Sync(intcount){
setState(count);
}
//獲取共享資源個數
intgetCount(){
returngetState();
}
//嘗試獲取共享鎖,只有當共享資源個數為0的時候,才會返回1,否則為-1
protectedinttryAcquireShared(intacquires){
return(getState()==0)?1:-1;
}
//釋放共享資源,通過CAS每次對state減1
protectedbooleantryReleaseShared(intreleases){
//Decrementcount;signalwhentransitiontozero
for(;;){
intc=getState();
if(c==0)
returnfalse;
intnextc=c-1;
if(compareAndSetState(c,nextc))
returnnextc==0;
}
}
}

主要方法

類中有三個方法是最重要的

//呼叫await()方法的執行緒會被掛起,它會等待直到count值為0才繼續執行
publicvoidawait()throwsInterruptedException{
sync.acquireSharedInterruptibly(1);
}

//和await()方法類似,只不過等待一定的時間後count值還沒變為0的化就會繼續執行
publicbooleanawait(longtimeout,TimeUnitunit)
throwsInterruptedException{
returnsync.tryAcquireSharedNanos(1,unit.toNanos(timeout));
}
//將count值減1
publicvoidcountDown(){
sync.releaseShared(1);
}

await()方法

//呼叫await()方法的執行緒會被掛起,它會等待直到count值為0才繼續執行
publicvoidawait()throwsInterruptedException{
sync.acquireSharedInterruptibly(1);
}

進入

AbstractQueuedSynchronizer #acquireSharedInterruptibly()方法.

publicfinalvoidacquireSharedInterruptibly(intarg)
throwsInterruptedException{
//等待過程不可中斷
if(Thread.interrupted())
thrownewInterruptedException();
//這裡的tryAcquireShared在AbstractQueuedSynchronizer中沒有實現,在上面介紹的Sync中實現的
if(tryAcquireShared(arg)<0)
doAcquireSharedInterruptibly(arg);
}

在上面介紹Sync類的時候#tryAcquireShared(),當AQS的state = 0的時候才會返回1,否則一直返回-1,如果返回-1,要執行#

doAcquireSharedInterruptibly(),進入該方法

privatevoiddoAcquireSharedInterruptibly(intarg)
throwsInterruptedException{
//這裡就把主執行緒加入佇列,佇列中有兩個節點,第一個是虛擬節點,第二個就是主執行緒節點
finalNodenode=addWaiter(Node.SHARED);
booleanfailed=true;
try{
for(;;){
//總共只有兩個節點,主執行緒前一個就是首節點
finalNodep=node.predecessor();
if(p==head){
//這裡又執行到CountDownLatch中Sync類中實現的方法,判斷state是否為0
intr=tryAcquireShared(arg);
if(r>=0){
setHeadAndPropagate(node,r);
p.next=null;//helpGC
failed=false;
return;
}
}
//如果state不為0,這裡會把主執行緒掛起阻塞
if(shouldParkAfterFailedAcquire(p,node)&&
parkAndCheckInterrupt())
thrownewInterruptedException();
}
}finally{
if(failed)
cancelAcquire(node);
}
}

這裡使用AQS很神奇,在阻塞佇列中就只加入了一個主執行緒,但是呢,只要其他執行緒沒有執行完,那state就不為0,那主執行緒就在這裡阻塞著,那問題了,誰來喚醒這個主執行緒呢?就是 countDown() 這個方法。

await(long timeout, TimeUnit unit)方法

該方法就是指定等待時間,如果在規定的等待時間中沒有完成,就直接返回false,在主執行緒中可以根據這個狀態進行後續的處理。

//和await()方法類似,只不過等待一定的時間後count值還沒變為0的化就會繼續執行
publicbooleanawait(longtimeout,TimeUnitunit)
throwsInterruptedException{
returnsync.tryAcquireSharedNanos(1,unit.toNanos(timeout));
}

countDown() 方法

//將count值減1
publicvoidcountDown(){
sync.releaseShared(1);
}

進入

AbstractQueuedSynchronizer #releaseShared方法

publicfinalbooleanreleaseShared(intarg){
//該方法同樣在AbstractQueuedSynchronizer中沒有實現,在CountDownLatch中實現
if(tryReleaseShared(arg)){
//喚醒主執行緒
doReleaseShared();
returntrue;
}
returnfalse;
}

在分析Sync類的時候,介紹了tryReleaseShared(),該方法會把AQS的state減1,如果減1操作成功,執行喚醒主執行緒操作,進入

AbstractQueuedSynchronizer#tryReleaseShared()方法

privatevoiddoReleaseShared(){
for(;;){
Nodeh=head;
if(h!=null&&h!=tail){
intws=h.waitStatus;
//首節點狀態為SIGNAL=-1
if(ws==Node.SIGNAL){
if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))
continue;//looptorecheckcases

//喚醒主執行緒,也就是佇列中的第二個節點,如果執行緒沒有執行完成,主執行緒被喚醒之後,發現state依然不為零,會再次阻塞
unparkSuccessor(h);
}
elseif(ws==0&&
!compareAndSetWaitStatus(h,0,Node.PROPAGATE))
continue;//looponfailedCAS
}
if(h==head)//loopifheadchanged
break;
}
}

總結

CountDownLatch 和 Semaphore 一樣都是共享模式下資源問題,這些原始碼實現AQS的模版方法,然後使用CAS+迴圈重試實現自己的功能。在RT多個資源呼叫,或者執行某種操作依賴其他操作完成下可以發揮這個計數器的作用,小編這裡也總結了一些網際網路大廠經常面試的Java併發程式設計面試真題 123道,感興趣的可以來實戰一下!