1. 程式人生 > >死磕Java併發:J.U.C之AQS同步狀態的獲取與釋放

死磕Java併發:J.U.C之AQS同步狀態的獲取與釋放

640?wx_fmt=jpeg

本文轉載自公號:Java技術驛站

在前面提到過,AQS是構建Java同步元件的基礎,我們期待它能夠成為實現大部分同步需求的基礎。AQS的設計模式採用的模板方法模式,子類通過繼承的方式,實現它的抽象方法來管理同步狀態,對於子類而言它並沒有太多的活要做,AQS提供了大量的模板方法來實現同步,主要是分為三類:獨佔式獲取和釋放同步狀態、共享式獲取和釋放同步狀態、查詢同步佇列中的等待執行緒情況。自定義子類使用AQS提供的模板方法就可以實現自己的同步語義。

獨佔式

獨佔式,同一時刻僅有一個執行緒持有同步狀態。

1、獨佔式同步狀態獲取

acquire(int arg)方法為AQS提供的模板方法,該方法為獨佔式獲取同步狀態,但是該方法對中斷不敏感,也就是說由於執行緒獲取同步狀態失敗加入到CLH同步佇列中,後續對執行緒進行中斷操作時,執行緒不會從同步佇列中移除。程式碼如下:

publicfinalvoid acquire(int arg){if(!tryAcquire(arg)&&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}

各個方法定義如下:

  1. tryAcquire:去嘗試獲取鎖,獲取成功則設定鎖狀態並返回true,否則返回false。該方法自定義同步元件自己實現,該方法必須要保證執行緒安全的獲取同步狀態。

  2. addWaiter:如果tryAcquire返回FALSE(獲取同步狀態失敗),則呼叫該方法將當前執行緒加入到CLH同步佇列尾部。

  3. acquireQueued:當前執行緒會根據公平性原則來進行阻塞等待(自旋),直到獲取鎖為止;並且返回當前執行緒在等待過程中有沒有中斷過。

  4. selfInterrupt:產生一箇中斷。

  5. acquireQueued方法為一個自旋的過程,也就是說當前執行緒(Node)進入同步佇列後,就會進入一個自旋的過程,每個節點都會自省地觀察,當條件滿足,獲取到同步狀態後,就可以從這個自旋過程中退出,否則會一直執行下去。如下:

finalboolean acquireQueued(finalNode node,int arg){boolean failed =true;try{//中斷標誌boolean interrupted 
=false;/*             * 自旋過程,其實就是一個死迴圈而已             */for(;;){//當前執行緒的前驅節點finalNode p = node.predecessor();//當前執行緒的前驅節點是頭結點,且同步狀態成功if(p == head && tryAcquire(arg)){                    setHead(node);                    p.next=null;// help GC                    failed =false;return interrupted;}//獲取失敗,執行緒等待--具體後面介紹if(shouldParkAfterFailedAcquire(p, node)&&                        parkAndCheckInterrupt())                    interrupted =true;}}finally{if(failed)                cancelAcquire(node);}}

從上面程式碼中可以看到,當前執行緒會一直嘗試獲取同步狀態,當然前提是隻有其前驅節點為頭結點才能夠嘗試獲取同步狀態,理由:保持FIFO同步佇列原則。

頭節點釋放同步狀態後,將會喚醒其後繼節點,後繼節點被喚醒後需要檢查自己是否為頭節點。

acquire(int arg)方法流程圖如下:

640?wx_fmt=png

2、獨佔式獲取響應中斷

AQS提供了acquire(int arg)方法以供獨佔式獲取同步狀態,但是該方法對中斷不響應,對執行緒進行中斷操作後,該執行緒會依然位於CLH同步佇列中等待著獲取同步狀態。為了響應中斷,AQS提供了acquireInterruptibly(int arg)方法,該方法在等待獲取同步狀態時,如果當前執行緒被中斷了,會立刻響應中斷丟擲異常InterruptedException。

publicfinalvoid acquireInterruptibly(int arg)throwsInterruptedException{if(Thread.interrupted())thrownewInterruptedException();if(!tryAcquire(arg))
            doAcquireInterruptibly(arg);}

首先校驗該執行緒是否已經中斷了,如果是則丟擲InterruptedException,否則執行tryAcquire(int arg)方法獲取同步狀態,如果獲取成功,則直接返回,否則執行doAcquireInterruptibly(int arg)。doAcquireInterruptibly(int arg)定義如下:

privatevoid doAcquireInterruptibly(int arg)throwsInterruptedException{finalNode node = addWaiter(Node.EXCLUSIVE);boolean failed =true;try{for(;;){finalNode p = node.predecessor();if(p == head && tryAcquire(arg)){
                    setHead(node);
                    p.next=null;// help GC
                    failed =false;return;}if(shouldParkAfterFailedAcquire(p, node)&&
                    parkAndCheckInterrupt())thrownewInterruptedException();}}finally{if(failed)
                cancelAcquire(node);}}

doAcquireInterruptibly(int arg)方法與acquire(int arg)方法僅有兩個差別。

1.方法宣告丟擲InterruptedException異常;

2.在中斷方法處不再是使用interrupted標誌,而是直接丟擲InterruptedException異常。

3、獨佔式超時獲取

AQS除了提供上面兩個方法外,還提供了一個增強版的方法:tryAcquireNanos(int arg,long nanos)。該方法為acquireInterruptibly方法的進一步增強,它除了響應中斷外,還有超時控制。即如果當前執行緒沒有在指定時間內獲取同步狀態,則會返回false,否則返回true。如下

publicfinalboolean tryAcquireNanos(int arg,long nanosTimeout)throwsInterruptedException{if(Thread.interrupted())thrownewInterruptedException();return tryAcquire(arg)||
            doAcquireNanos(arg, nanosTimeout);}

tryAcquireNanos(int arg, long nanosTimeout)方法超時獲取最終是在doAcquireNanos(int arg, long nanosTimeout)中實現的,如下:

privateboolean doAcquireNanos(int arg,long nanosTimeout)throwsInterruptedException{//nanosTimeout <= 0if(nanosTimeout <=0L)returnfalse;//超時時間finallong deadline =System.nanoTime()+ nanosTimeout;//新增Node節點finalNode node = addWaiter(Node.EXCLUSIVE);boolean failed =true;try{//自旋for(;;){finalNode p = node.predecessor();//獲取同步狀態成功if(p == head && tryAcquire(arg)){
                    setHead(node);
                    p.next=null;// help GC
                    failed =false;returntrue;}/*
                 * 獲取失敗,做超時、中斷判斷
                 *///重新計算需要休眠的時間
                nanosTimeout = deadline -System.nanoTime();//已經超時,返回falseif(nanosTimeout <=0L)returnfalse;//如果沒有超時,則等待nanosTimeout納秒//注:該執行緒會直接從LockSupport.parkNanos中返回,//LockSupport為JUC提供的一個阻塞和喚醒的工具類,後面做詳細介紹if(shouldParkAfterFailedAcquire(p, node)&&
                        nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);//執行緒是否已經中斷了if(Thread.interrupted())thrownewInterruptedException();}}finally{if(failed)
                cancelAcquire(node);}}

針對超時控制,程式首先記錄喚醒時間deadline ,deadline = System.nanoTime() + nanosTimeout(時間間隔)。如果獲取同步狀態失敗,則需要計算出需要休眠的時間間隔nanosTimeout(= deadline – System.nanoTime()),如果nanosTimeout <= 0 表示已經超時了,返回false,如果大於spinForTimeoutThreshold(1000L)則需要休眠nanosTimeout ,如果nanosTimeout <= spinForTimeoutThreshold ,就不需要休眠了,直接進入快速自旋的過程。

原因在於 spinForTimeoutThreshold 已經非常小了,非常短的時間等待無法做到十分精確,如果這時再次進行超時等待,相反會讓nanosTimeout 的超時從整體上面表現得不是那麼精確,所以在超時非常短的場景中,AQS會進行無條件的快速自旋。

整個流程如下:

640?wx_fmt=png

4、獨佔式同步狀態釋放

當執行緒獲取同步狀態後,執行完相應邏輯後就需要釋放同步狀態。AQS提供了release(int arg)方法釋放同步狀態:

publicfinalboolean release(int arg){if(tryRelease(arg)){Node h = head;if(h !=null&& h.waitStatus !=0)
                unparkSuccessor(h);returntrue;}returnfalse;}

該方法同樣是先呼叫自定義同步器自定義的tryRelease(int arg)方法來釋放同步狀態,釋放成功後,會呼叫unparkSuccessor(Node node)方法喚醒後繼節點(如何喚醒LZ後面介紹)。

這裡稍微總結下:

在AQS中維護著一個FIFO的同步佇列,當執行緒獲取同步狀態失敗後,則會加入到這個CLH同步佇列的對尾並一直保持著自旋。在CLH同步佇列中的執行緒在自旋時會判斷其前驅節點是否為首節點,如果為首節點則不斷嘗試獲取同步狀態,獲取成功則退出CLH同步佇列。當執行緒執行完邏輯後,會釋放同步狀態,釋放後會喚醒其後繼節點。

共享式

共享式與獨佔式的最主要區別在於同一時刻獨佔式只能有一個執行緒獲取同步狀態,而共享式在同一時刻可以有多個執行緒獲取同步狀態。例如讀操作可以有多個執行緒同時進行,而寫操作同一時刻只能有一個執行緒進行寫操作,其他操作都會被阻塞。

1、共享式同步狀態獲取

AQS提供acquireShared(int arg)方法共享式獲取同步狀態:

publicfinalvoid acquireShared(int arg){if(tryAcquireShared(arg)<0)//獲取失敗,自旋獲取同步狀態
            doAcquireShared(arg);}

從上面程式可以看出,方法首先是呼叫tryAcquireShared(int arg)方法嘗試獲取同步狀態,如果獲取失敗則呼叫doAcquireShared(int arg)自旋方式獲取同步狀態,共享式獲取同步狀態的標誌是返回 >= 0 的值表示獲取成功。自選式獲取同步狀態如下:

privatevoid doAcquireShared(int arg){/共享式節點
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //前驅節點finalNode p = node.predecessor();//如果其前驅節點,獲取同步狀態if(p == head){//嘗試獲取同步int r = tryAcquireShared(arg);if(r >=0){
                        setHeadAndPropagate(node, r);
                        p.next=null;// help GCif(interrupted)
                            selfInterrupt();
                        failed =false;return;}}if(shouldParkAfterFailedAcquire(p, node)&&
                        parkAndCheckInterrupt())
                    interrupted =true;}}finally{if(failed)
                cancelAcquire(node);}}

tryAcquireShared(int arg)方法嘗試獲取同步狀態,返回值為int,當其 >= 0 時,表示能夠獲取到同步狀態,這個時候就可以從自旋過程中退出。

acquireShared(int arg)方法不響應中斷,與獨佔式相似,AQS也提供了響應中斷、超時的方法,分別是:acquireSharedInterruptibly(int arg)、tryAcquireSharedNanos(int arg,long nanos),這裡就不做解釋了。

2、共享式同步狀態釋放

獲取同步狀態後,需要呼叫release(int arg)方法釋放同步狀態,方法如下:

publicfinalboolean releaseShared(int arg){if(tryReleaseShared(arg)){
            doReleaseShared();returntrue;}returnfalse;}

因為可能會存在多個執行緒同時進行釋放同步狀態資源,所以需要確保同步狀態安全地成功釋放,一般都是通過CAS和迴圈來完成的。

- END -

 往期推薦:

  • 死磕Java系列:

……

  • Spring系列:

……

號外:

最近在做幾個有意思的開源專案,感興趣的朋友可以看看。

地址:

https://github.com/dyc87112/swagger-butler

可關注我的公眾號

640?wx_fmt=jpeg

深入交流、更多福利

掃碼加入我的知識星球

640?wx_fmt=png

點選“閱讀原文”