1. 程式人生 > >J.U.C原始碼分析:CountDownLatch

J.U.C原始碼分析:CountDownLatch

        CountDownLoad是在併發程式設計中使用較多的一個類,可以完成多個執行緒之間的相互等待和協作,原始碼內容不多但功能強大且使用場景複雜多樣。

        原始碼中對CountDownLoad功能的定義非常簡單:

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
允許一個或多個執行緒等待的同步輔助在其他執行緒中執行的一組操作完成。

        簡單的說CountDownLoad實現了一個計數器的功能,使用CountDownLoad的時候需要設定一個值作為初始化使用。這個值也就是CountDownLoad在計數的時候最終所需要達到的值。

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

        CountDownLoad只有一個建構函式,就是說沒有無參建構函式,必須要在初始化的時候就指定count值。count值必須要大於0,否則丟擲IllegalArgumentException非法引數。count值會被傳給Syuc類。

Syuc是CountDownLoad的靜態內部類,繼承自AbstractQueuedSynchronizer,final修飾不可被繼承。

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
            return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
            return nextc == 0;
        }
    }
}

Syuc初始化的時候會呼叫setState(count)方法,這個方法來自於父類。

protected final void setState(int newState) {
    state = newState;
}

其中的state變數,是父類AbstractQueuedSynchronizer的成員變數,volatile修改保證可見性,讓多執行緒的情況下也能獲取到最新的值。

private volatile int state;

        getCount()獲取最新的state值,判斷與預先設定的值還差多少。

        tryAcquireShared(int acquires)方法,判斷state是否歸零,也就是CountDownLoad設定的預期值是否已經達到。

        tryReleaseShared(int releases)方法,嘗試釋放掉執行緒(這裡的並真的進行釋放,僅僅意味這個執行緒可以被釋放了),如果無需釋放返回false,如果還有需要釋放的執行緒返回false,如果釋放最後一個需要釋放的執行緒則返回true。如果釋放執行緒失敗,將會一直迴圈並嘗試釋放執行緒,直到釋放掉一個執行緒。

        tryReleaseShared採用了compareAndSetState(int expect, int update)方法,將狀態變為關閉,採用CAS原理,增加其效能。

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

CountDownLoad中的await()方法,阻塞當前執行緒,直到count值為0

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

await()方法會通過Syuc的父類的acquireSharedInterruptibly(int arg)來嘗試佔用這個執行緒,造成堵塞(通過tryAcquireShared(arg)方法實現)

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

         doAcquireSharedInterruptibly(arg)方法則會對狀態作出判斷,如果當前計數為0,則理解返回。如果當前計數大於零,執行緒被禁止排程,並且一直睡眠,直到count值歸0或者當前執行緒被其他執行緒中斷。

private void doAcquireSharedInterruptibly(int arg)
     throws InterruptedException {
     final Node node = addWaiter(Node.SHARED); //將當前的節點設定為共享節點
     boolean failed = true;
     try {
         for (;;) {
             final Node p = node.predecessor();//獲取當前節點的前一個節點
             if (p == head) {//如果前一個節點為head節點,按照FIFO的原則,可以直接嘗試獲取鎖。
                 int r = tryAcquireShared(arg);
                 if (r >= 0) {
                     setHeadAndPropagate(node, r); //獲取這個節點並且將它放到AQS的佇列列頭處,AQS列頭處的節點表示正在獲取鎖的節點
                     p.next = null; // help GC
                     failed = false;
                     return;
                 }
             }
             if (shouldParkAfterFailedAcquire(p, node) && //檢查下是否需要將當前節點掛起
                 parkAndCheckInterrupt())
                 throw new InterruptedException();
         }
     } finally {
         if (failed)
             cancelAcquire(node);
     }
 }

        這裡需要補充一下AQS佇列是一個雙向佇列,節點中儲存在next和pre變數分別指向前一個節點和後一個節點,每個節點中都包含一個執行緒和一個表示節點型別的變數:這個變數可以表示是獨佔節點還是共享節點。節點頭中的執行緒表示佔有鎖的執行緒,其他節點中執行緒則等待獲取鎖。

        await(long timeout, TimeUnit unit)類似於await(),可以設定等待時間,當等待時間過期之後,執行緒變繼續執行。

        countDown()方法,會呼叫releaseShared(int arg)方法,會先嚐試獲取一個執行緒並且釋放他,tryReleaseShared()之前有說過只有當所有執行緒都被釋放的瞬間才會為true。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

doReleaseShared()會放出解除阻塞執行緒的訊號(這時候才會將被標記為可以釋放的執行緒釋放掉)。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; // loop on failed CAS
        }
        if (h == head) // loop if head changed
            break;
    }
}

總結: 多個執行緒呼叫await()方法被阻塞在一個連結串列裡面,然後這些執行緒會逐一呼叫countDown()方法,每呼叫一次count值便減1,直接count為0這些執行緒將會被釋放