1. 程式人生 > 程式設計 >AQS原始碼分析及核心方法解析

AQS原始碼分析及核心方法解析

1 概述

AQS提供了一個基於FIFO佇列實現鎖和同步器的基本框架。

j.c.u中使用AQS的類有:

  • CountDownLatch.Sync
  • ThreadPoolExecutor.Worker
  • ReentrantLock.FairSync/NonfairSync
  • ReentrantReadWriteLock.FairSync/NonfairSync
  • Semaphore.Sync

官方檔案:docs.oracle.com/javase/8/do…

2 AQS如何使用

2.1 相關概念

state:同步狀態值,用於子類中實現狀態記錄;

acquire:修改state,可以理解為獲取鎖;

release

:與acquire相反操作的修改state,可以理解為釋放鎖。

predecessor:前驅,successor:後繼

Node.EXCLUSIVE:獨佔模式,Node.SHARED:共享模式

2.2 實現約定

使用AQS實現一個鎖或同步器,需要依據以下幾點去實現:

  • 1 同步器需要以一個單獨的數字表示狀態。

  • 2 同步器需要定義一個繼承AQS的內部類去實現同步屬性。

  • 3 內部類繼承AQS後,必須根據需要實現 tryAcquire*/tryRelease* 去改變state。

  • 4 有exclusive mode(預設)和shared兩種模式:

    • exclusive mode:獨佔模式,其他執行緒嘗試acquire不會成功;
    • shared mode:共享模式,多個執行緒嘗試acquire都會成功。
  • 5 一般只支援其中一種模式即可,但有特例:ReadWriteLock同時支援兩種模式。

  • 6 AbstractQueuedSynchronizer.ConditionObject 可以在子類中作為Condition的實現使用。

2.3 實現步驟

2.3.1 獨佔鎖

基本步驟如下:

  • 1 建立繼承AQS的內部類;
  • 2 實現tryAcquire/tryRelease
  • 3 lock時呼叫acquire,unlock時呼叫release
  • 4 判斷當前是否有任意執行緒獲取鎖時判斷getState() != 0

java.util.concurrent.locks.ReentrantLock

實現公平鎖(FairSync)為例:

1 建立繼承AQS的內部類

abstract static class Sync extends AbstractQueuedSynchronizer{....}
static final class FairSync extends Sync{....}
複製程式碼

2 FairSync實現tryAcquire/tryRelease

tryAcquire:

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0,acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
複製程式碼

tryRelease:

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
複製程式碼

3 lock時呼叫acquire,unlock時呼叫release


    // in ReentrantLock.FairSync
    final void lock() {
        acquire(1);
    }
        
    // in ReentrantLock
    public void unlock() {
        sync.release(1);
    }
複製程式碼

4 判斷當前是否有任意執行緒獲取鎖時判斷getState() != 0

        // in ReentrantLock.Sync
        final boolean isLocked() {
            return getState() != 0;
        }
複製程式碼
2.3.2 共享鎖

基本步驟如下:

  • 1 建立繼承AQS的內部類;
  • 2 實現tryAcquireShared/tryReleaseShared
  • 3 加鎖時呼叫acquireShared,釋放鎖時呼叫releaseShared

java.util.concurrent.CountDownLatch為例:

1 建立繼承AQS的內部類

private static final class Sync extends AbstractQueuedSynchronizer{....}
複製程式碼

2 實現tryAcquireShared/tryReleaseShared

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c,nextc))
                    return nextc == 0;
            }
        }
複製程式碼

3 加鎖時呼叫acquireShared,釋放鎖時呼叫releaseShared

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public void countDown() {
        sync.releaseShared(1);
    }
複製程式碼

3 主要屬性

FIFO佇列,由Node節點組成的連結串列:

           +------+  prev +-----+       +-----+
      head |      | <---- |     | <---- |     |  tail
           +------+       +-----+       +-----+

3.1 head

private transient volatile Node head;

功能:佇列的頭節點。

說明

  • 延遲初始化(在enq中初始化)。
  • 除了初始化的時候,其他時候只有呼叫setHead(Node)時才會修改。
  • head.waitStatus不會是CANCELLED狀態。

相關方法:

  • setHead(Node node)
  • setHeadAndPropagate(Node node,int propagate):呼叫了setHead,setHeadAndPropagatedoAcquireShared*方法中呼叫;
  • compareAndSetHead(Node update) 僅在enq方法中被呼叫過。

3.2 tail

private transient volatile Node tail;

功能:佇列的尾節點。

說明

  • 延遲初始化(在enq中使用tail = head初始化)。
  • 僅當呼叫方法enq(Node)新增新的等待節點時才會修改。

相關方法:

  • enq(final Node node):將節點加入到隊尾
  • findNodeFromTail(Node node): 從tail開始查詢節點
  • compareAndSetTail(Node expect,Node update)

3.3 state

private volatile int state;

功能:用來標識同步狀態的狀態值。

相關方法

  • getState()
  • setState(int newState)
  • compareAndSetState(int expect,int update)

4 Node

4.1 屬性

4.1.1 waitStatus

volatile int waitStatus:狀態

  • CANCELLED = 1 : 表示當前節點被取消。

當前節點被取消,比如執行緒park後timeout或interrupt,狀態會變更為CANCELLED。

當一個節點狀態變為CANCELLED後,不會再變成其他狀態,其執行緒也不會再次被阻塞。

  • SIGNAL = -1 : 表示當前節點的 後續節點的執行緒 需要被喚醒執行(unpark)。

說明當前節點的 後續節點 是block狀態(或即將是block狀態),即其執行緒被park了。

所以當前節點在release或cancel時,必須unpark它的後續節點。

為了避免競爭,acquire方法必須先表明需要一個訊號,然後再嘗試acquire,失敗則阻塞。

  • CONDITION = -2 :

表示當前節點的執行緒在condition佇列中(ConditionObject類中設定),該節點直到其狀態被設定為0時,才會轉變成為同步佇列中的節點。

這個狀態只在ConditionObject中使用。

  • PROPAGATE = -3 : 當前場景下後續的acquireShared將會無條件被傳播。

releaseShared(共享式的釋放同步狀態)需要被傳播給其他節點;該狀態在 doReleaseShared方法中被設定(僅僅適用於頭節點)以保證傳播,即使其它操作介入也不會中斷。

  • 0 : 當前節點在同步佇列中,等待acquire。

waitStatus預設值:

  • 同步佇列:0;
  • condition佇列:是CONDITION(-2)。
4.1.2 prev

volatile Node prev:前驅節點,只對非ConditionObject的節點定義啟用。

相關方法

  • Node predecessor():返回當前節點的前置節點;
  • boolean hasQueuedPredecessors():查詢同步佇列中,是否有比自己更優先的執行緒在等待。
4.1.3 next

volatile Node next:後繼節點,只對非ConditionObject的節點定義啟用。

相關方法

  • void unparkSuccessor(Node node):uppark節點node的後繼節點
4.1.4 thread

volatile Thread thread:當前節點的執行緒,Initialized on construction and nulled out after use.

4.1.5 nextWaiter

Node nextWaiter:condition佇列中的後繼節點,只對ConditionObject內的節點定義啟用,即當waitStatus==CONDITION時,才有值。

4.2 同步佇列&Condition佇列中Node的區別

比較 同步佇列 Condition佇列
使用的欄位 waitStatus/thread/prev/next/nextWaiter waitStatus/thread/nextWaiter
nextWaiter含義 表示模式,固定為Node.EXCLUSIVE 或 Node.SHARED 下一個節點
waitStatus初始值 0 CONDITION
waitStatus可選值 CANCELLED/SIGNAL/PROPAGATE/0 CANCELLED/CONDITION

5 AQS子類的實現約定:

  • 根據需要,實現tryAcquire*/tryRelease*方法;
  • 使用getState/setState(int)/compareAndSetState(int,int) 去監視或修改同步狀態。

5.1 tryAcquire(int)

獨佔模式下嘗試acquire。

方法實現時,需要查詢當前狀態是否允許acquire,然後再使用compareAndSetState進行acquire。

tryAcquireacquire(int arg)中被呼叫。

如果呼叫失敗,並且呼叫執行緒沒在佇列中,acquire方法會將該執行緒放入佇列,直到被其他執行緒執行release釋放。通常用來實現Lock.tryLock()。

返回值:true表示執行成功(state更新成功,成功acquire)。

5.2 tryRelease(int)

獨佔模式下,嘗試修改state進行release。

返回值:true表示當前物件是一個完全release的狀態,其他任何等待執行緒都可以嘗試acquire。

5.3 tryAcquireShared(int)

共享模式下嘗試acquire。

返回值:

  • 負數:獲取失敗;
  • 0 : 獲取成功,但接下來共享模式下的請求不會成功,即沒有剩餘資源可用;
  • 正數:獲取成功,接下來共享模式下的acquire仍有可能成功, 即仍有剩餘資源可用。 剩下的的等待執行緒必須檢測物件的狀態是否允許acquire。

5.4 tryReleaseShared(int)

共享模式下,嘗試修改state進行release。

返回值:true表示share mode下release成功,即允許一個等待的acquire(shared or exclusive)成功進行。

5.5 isHeldExclusively()

當前執行緒是否以獨佔方式進行了同步(即狀態被佔用),也就是說,當前執行緒是否在獨佔資源

這個方法在ConditionObject的方法中被呼叫,所以只有用到condition才需要去實現。

以上5個方法,預設的實現都是丟擲 UnsupportedOperationException.

6 內部方法

內部方法有的用private修飾,有的沒有。但都是在AQS內部使用的方法。

6.1 addWaiter

說明:

  • 1 使用當前執行緒給定模式建立節點;
  • 2 新建立的節點加在隊尾,成為新的tail;

返回:建立的新節點

引數mode:Node.EXCLUSIVENode.SHARED

private Node addWaiter(Node mode) {
        // 建立節點
        Node node = new Node(Thread.currentThread(),mode);
        
        Node pred = tail;
        
        if (pred != null) {
            // tail不為null,處理流程和enq(Node)中tail!=null時的處理流程一樣。
            
            node.prev = pred; //當前節點prev指向原來的tail
            if (compareAndSetTail(pred,node)) {  //將tail物件由原來的tail修改為當前新節點
                pred.next = node; //原來的tail的next指向當前節點
                return node;
            }
        }
        // tail為null則呼叫enq方法。
        enq(node);
        return node;
    }
複製程式碼

6.2 enq

說明:

  • 1 先判斷tail是否為null;
  • 2 如果tail==null:初始化head,並tail = head,然後重新迴圈;
  • 3 將新節點(引數node)加入到隊尾;

返回:原來的tail(即新節點的前置節點)

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { 
               // tail為null,初始化head/tail。
                if (compareAndSetHead(new Node()))
                    tail = head; //tail指向head
            } else {
                node.prev = t;  //當前節點prev指向原來的tail
                if (compareAndSetTail(t,node)) { // 將tail物件由原來的tail修改為當前新節點
                    t.next = node; //原來的tail的next指向當前節點
                    return t;
                }
            }
        }
    }
複製程式碼

這裡有個疑問,如果tail是null,但是head不是null,則tail不是一直不會被初始化了嗎?

初始化head的方法是compareAndSetHead,而根據compareAndSetHead裡的註釋,只在enq中呼叫:

     /**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this,headOffset,null,update);
    }
複製程式碼

所以:head和tail,是在enq方法中同時初始化的。

addWaiterenqtail!=null時處理流程都是一樣的,都是先將當前節點prev指向原來的tail,然後呼叫compareAndSetTail將新節點設定為tail,然後將原來的tail.next指向新節點。

6.3 shouldParkAfterFailedAcquire

說明:

  • 1 該方法的功能是acquire失敗後,判斷node是否應該block
  • 2 引數pred必須是引數node的前置

判斷邏輯

如果node的前置pred的waitStatus == Node.SIGNAL,那麼node應該block;

否則做以下處理:

  • 1 如果pred.waitStatus == Node.CANCELLED:說明前置節點已經取消。持續向前查詢,直到找到一個waitStatus <= 0的節點,作為node的新前置,並把CANCELLED的幾點從佇列中取消;
  • 2 如果pred.waitStatus是0或PROPAGATE(不會是CONDITION):因為0是初始狀態,PROPAGATE是傳播狀態,所以前置如果是這兩種狀態,都應該將前置的狀態修改為SIGNAL,但不應該park。
    private static boolean shouldParkAfterFailedAcquire(Node pred,Node node) {
        
        // 前驅節點的狀態
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            // 當前節點(即pred的下一個節點)已經是等待狀態,需要一個release去喚醒它。所以node可以安全的park.
            return true;
        if (ws > 0) { // CANCELLED
            /*
               waitStatus > 0 說明是canceled狀態。
               前驅是canceled狀態,則表明前驅節點已經超時或者被中斷,需要從同步佇列中取消。
               持續向前找,直到找到waitStatus<=0的節點,作為node的前驅.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
              前驅節點waitStatus為 0 或 PROPAGATE,置換為SIGNAL.
              不會是CONDITION,因為CONDITION用在ConditionObject中。
             */
            compareAndSetWaitStatus(pred,ws,Node.SIGNAL);
        }
        return false;
    }
複製程式碼

6.4 unparkSuccessor

喚醒當前節點的後續節點

處理流程

  • 1 如果節點不是CANCELLED,則將waitStatus設定為0;
  • 2 如果後續節點是null或是CANCELLED狀態,則對node之後第一個滿足條件(waitStatus <= 0)的節點unpark。
private void unparkSuccessor(Node node) {
       
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node,0); // waitStatus設定為0

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            /*
             node的後繼節點是null或是CANCELLED狀態,釋放該後繼節點;
             並從tail開始向前找,找到離node最近的一個非CANCELLED(waitStatus <= 0)的節點,作為要unpark的節點。
            */
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // unpark後繼節點。
        if (s != null)
            LockSupport.unpark(s.thread);
    }
複製程式碼

6.5 acquireQueued

說明

  • 1 獨佔模式下,已經在佇列中的執行緒進行acquire;
  • 2 在acquire方法或者ConditionObject.await*方法中使用。

處理流程

  • 1 只有node的前驅是head時,才輪到node嘗試獲取鎖,然後呼叫setHead將當前節點設定為head;
  • 2 否則檢查node是否應該park(node的前置節點waitStatus是不是等於SIGNAL),是的話則park;然後等待其他執行緒對當前執行緒unpark。

返回:等待時是否中斷過。

    final boolean acquireQueued(final Node node,int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                /*
                只有前驅節點是head時,才輪到該節點爭奪鎖。
                當前節點的前驅是head 並且 tryAcquire成功:
                把當前要獲取的節點設定為head,
                */
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                /*
                如果當前節點不是head 或 tryAcquire失敗,則檢驗當前節點的執行緒(此時即當前執行緒)是否應park。
                parkAndCheckInterrupt會park當前執行緒,因此在這裡block;將來有其他執行緒對當前執行緒unpark時,將繼續此迴圈。
                否則繼續迴圈嘗試acquire。
                */
                if (shouldParkAfterFailedAcquire(p,node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製程式碼

6.6 doAcquireShared

說明

  • 1 共享模式下,已經在佇列中的執行緒進行acquire;
  • 2 在acquireShared方法使用。

處理流程

  • 1 通過呼叫addWaiter(Node.SHARED)想隊尾新增一個新的節點;
  • 3 只有node的前驅是head時,才輪到node嘗試獲取鎖,然後呼叫setHeadAndPropagate將當前節點設定為head,並向後傳播tryAcquireShared返回的值;
  • 3 否則檢查node是否應該park(node的前置節點waitStatus是不是等於SIGNAL),是的話則park;然後等待其他執行緒對當前執行緒unpark。
    private void doAcquireShared(int arg) {
        
        // 建立Node.SHARED模式的新節點(新增在隊尾)
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            
            // 迴圈
            for (;;) {
            /*
            1 獲取當前節點的前置節點;
            2 前置節點是head,則tryAcquireShared,如果成功,則將當前節點設定為head,並將結果向後傳播。
            */
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                    
                        //設定head值,並將tryAcquireShared結果向後傳播
                        setHeadAndPropagate(node,r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                
                // 前置節點不是head,當前執行緒是否park;
                // 如果park當前執行緒,將來有其他執行緒對當前執行緒unpark時,將繼續此迴圈。
                if (shouldParkAfterFailedAcquire(p,node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製程式碼

acquireQueueddoAcquireShared在判斷處理節點時,流程都是一樣的;不同點是:

  • acquireQueued:使用setHead設定頭節點;
  • doAcquireShared:使用setHeadAndPropagate設定頭節點並傳播tryAcquireShared返回的值。

6.7 isOnSyncQueue

說明

  • 主要用於Condition流程中的判斷,返回節點是否在同步佇列中。

// Internal support methods for Conditions

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            // waitStatus是CONDITION,或prev是null
            return false;
        if (node.next != null) 
            // 根據文章前面對Node的描述,next只會在同步佇列中有值。
            return true;
        /*
           因為CAS操作替換值的時候可能會失敗,所以有可能出現:
           node.prev不為null,但是沒在同步佇列中。
           
           所以需要從隊尾向前再遍歷一遍。
        */
        return findNodeFromTail(node);
    }

    /**
     從同步佇列的tail開始向前遍歷,查詢是否有節點node。
     */
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }
複製程式碼

7 public方法

public方法中,同步中需要呼叫acquire*/release*相關方法,除此之外,其它一些public方法可能也會需要用到。

下面列出了除acquire*/acquire*之外的public方法。

7.1 hasQueuedPredecessors

查詢同步佇列中,是否有比自己更優先的執行緒在等待。

比自己更優先:其實就是比自己先進入佇列,排在自己前面的執行緒(因為更先進入佇列,所以比自己等待時間更長)。

同步佇列同時滿足下面三個條件,則返回true:

  • 1 head/tail都不為null,且head != tail;
  • 2 head.next != null;
  • 3 head.next.thread != currentThread。
public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
複製程式碼

7.2 hasQueuedThreads

是否還有等待的執行緒。判斷很簡單,head != tail時,就認為還有等待的執行緒。

    public final boolean hasQueuedThreads() {
        return head != tail;
    }
複製程式碼

7.3 setExclusiveOwnerThread

設定當前擁有獨佔鎖的執行緒,比如Thread.currentThread()。

一般在tryAcquirecompareAndSetState成功後設定;或在tryRelease中設定setExclusiveOwnerThread(null)

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
複製程式碼

7.4 getExclusiveOwnerThread

返回setExclusiveOwnerThread中設定的當前擁有獨佔鎖的執行緒。

一般用於判斷某執行緒(如當前執行緒)是否正在持有當前獨佔鎖,如if (current == getExclusiveOwnerThread()){....}

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
複製程式碼

7.5 isQueued

判斷執行緒thread是否在同步佇列中

    public final boolean isQueued(Thread thread) {
        if (thread == null)
            throw new NullPointerException();
        
        // 從tail向前查詢
        for (Node p = tail; p != null; p = p.prev)
            if (p.thread == thread)
                return true;
        return false;
    }
複製程式碼

7.6 getExclusiveQueuedThreads

返回獨佔模式的佇列中等待的執行緒

 public final Collection<Thread> getExclusiveQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            if (!p.isShared()) {
                Thread t = p.thread;
                if (t != null)
                    list.add(t);
            }
        }
        return list;
    }
複製程式碼

7.7 getSharedQueuedThreads

返回共享模式的佇列中等待的執行緒

public final Collection<Thread> getSharedQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            if (p.isShared()) {
                Thread t = p.thread;
                if (t != null)
                    list.add(t);
            }
        }
        return list;
    }
複製程式碼

8 獨佔模式同步

獨佔模式,需要子類的內部類,實現tryAcquire/tryRelease,並在lock和unlock時呼叫acquire/release,如ReentrantLock.FairSync

8.1 acquire

  • 1 tryAcquire(arg) 是否成功
  • 2 不成功就呼叫addWaiter建立Node,並呼叫acquireQueued加入節點
  • 3 acquireQueued返回true,表示中斷過,則呼叫selfInterrupt。
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
            selfInterrupt();
    }
複製程式碼

8.2 release

  • 1 執行tryRelease,成功則繼續執行,失敗則返回false;
  • 2 head不為空且nextWaiter!=0(0是同步佇列中的節點的nextWaiter的初始值),則執行unparkSuccessor喚醒後繼者,然後返回true。
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//unpark節點head的後繼者
            return true;
        }
        return false;
    }
複製程式碼

9 共享模式同步

共享模式,需要子類的內部類,實現tryAcquireShared/tryReleaseShared,並在lock和unlock時呼叫acquireShared/tryReleaseShared,如CountDownLatch.Sync

9.1 acquireShared

tryAcquireShared失敗,則執行doAcquireShared。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
複製程式碼

9.2 releaseShared

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) { // head != tail
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { // head.waitStatus == Node.SIGNAL
                    if (!compareAndSetWaitStatus(h,Node.SIGNAL,0))
                        continue;            //  如果將head的waitStatus由SIGNAL置換為0失敗,則繼續迴圈。
                    unparkSuccessor(h); // unpark頭節點的後續節點
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h,0,Node.PROPAGATE))
                    continue;                // 如果head的waitStatus由0置換為PROPAGATE失敗,則繼續迴圈。
            }
            if (h == head)                   // 如果head沒有改變,則退出;否則繼續迴圈。
                break;
        }
    }
    
複製程式碼

10 總結

  • 1 AQS使用一個Node連結串列來實現執行緒的排隊執行,即實現同步的功能;
  • 2 對執行緒的控制,都是使用LockSupport.park*/LockSupport.unpark*實現;
  • 3 用shouldParkAfterFailedAcquire來判斷是否park,用LockSupport.park*來實現自旋的目的,如acquireQueued
  • 4 使用unparkSuccessor喚醒後繼節點以便讓後繼節點執行,如releaseunparkSuccessor(head)
  • 5 只有前驅節點是head的節點,才能去爭奪鎖,否則park;
  • 6 總的流程,就是新增執行緒節點到隊尾,然後park該執行緒,等該節點成為head的後繼節點時再喚醒,以達到同步器的目的。