1. 程式人生 > >ReentrantLock在Java中Lock的實現原理拿鎖過程分析

ReentrantLock在Java中Lock的實現原理拿鎖過程分析

import java.util.concurrent.locks.ReentrantLock;

public class App {

    public static void main(String[] args) throws Exception {
        final int[] counter = {0};

        ReentrantLock lock = new ReentrantLock();

        for (int i= 0; i < 50; i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    lock.lock();
                    try {
                        int a = counter[0];
                        counter[0] = a + 1;
                    }finally {
                        lock.unlock();
                    }
                }
            }).start();
        }

        // 主執行緒休眠,等待結果
        Thread.sleep(5000);
        System.out.println(counter[0]);
    }
}

拿鎖過程 lock.lock();

public void lock() {
    sync.lock();
}
final void lock() {
    if (compareAndSetState(0, 1)) //預設非公平鎖,所有執行緒在這裡進行搶佔鎖,直接試圖獲取鎖,如果獲取不成功,再放入隊尾
        setExclusiveOwnerThread(Thread.currentThread()); //拿到鎖之後執行緒操作
    else
        acquire(1); //未拿到鎖的進行操作
}

拿到鎖進行操作

setExclusiveOwnerThread(Thread thread)

為AOS中方法

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread; //設定當前擁有獨佔訪問的執行緒。
}

看到這裡,我們基本有了一個大概的瞭解,還記得之前AQS中的int型別的state值,這裡就是通過CAS(樂觀鎖)去修改state的值。lock的基本操作還是通過樂觀鎖來實現的

獲取鎖通過CAS,那麼沒有獲取到鎖,等待獲取鎖是如何實現的?我們可以看一下else分支的邏輯,acquire方法:

acquire()在AQS中實現的,它的原始碼如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&   
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
(01) “當前執行緒”首先通過tryAcquire()嘗試獲取鎖。獲取成功的話,直接返回;嘗試失敗的話,進入到等待佇列排序等待(前面還有可能有需要執行緒在等待該鎖)。 
(02) “當前執行緒”嘗試失敗的情況下,先通過addWaiter(Node.EXCLUSIVE)來將“當前執行緒”加入到”CLH佇列(非阻塞的FIFO佇列)”末尾。CLH佇列就是執行緒等待佇列。 

任何時刻拿到鎖的只有一個執行緒,未拿到鎖的執行緒會打包成節點(node),然後將節點通過CAS自旋的方式,從佇列尾部放入同步佇列中。(03) 再執行完addWaiter(Node.EXCLUSIVE)之後,會呼叫acquireQueued()來獲取鎖。(04) “當前執行緒”在執行acquireQueued()時,會進入到CLH佇列中休眠等待,直到獲取鎖了才返回!如果“當前執行緒”在休眠等待過程中被中斷過,acquireQueued會返回true,此時”當前執行緒”會呼叫selfInterrupt()來自己給自己產生一箇中斷。

  1. 呼叫tryAcquire方法,試圖獲取鎖
  2. 當獲取不到鎖,執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)  接著往下走看tryAcquire的實現。
protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }

竟然是一個空實現,說明此處需要子類實現此方法,既然需要子類實現此方法,為什麼不寫成抽象方法呢,當我們瞭解了AQS的實現全貌就全明白了。  這裡我們看下ReentrantLock中公平鎖FairSync的tryAcquire實現。

/**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {//首先通過變數state判斷鎖是否被佔用,0代表未被佔用
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {//hasQueuedPredecessors判斷佇列中是否有其他執行緒在等待,如果沒有執行緒在等待,設定鎖的狀態為1(被佔用),因為獲取鎖的過程是多執行緒同時獲取的,所以需要使用CAS。
                        setExclusiveOwnerThread(current);//設定佔用排它鎖的執行緒是當前執行緒
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {//當前鎖已被佔用,但是佔用鎖的是當前執行緒本身
    //還記得我們上面說過ReentrantLock是支援重入的,下面就是重入的實現,當已經獲取鎖的執行緒每多獲取一次鎖,state就執行加1操作
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }

當獲取到鎖的時候返回true,當獲取不到鎖的時候返回false。  我們再來看下AQS中acquire方法的實現,當獲取不到鎖就執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。  在瞭解acquireQueued方法之前,先看下addWaiter方法的實現。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
  1. 判斷尾節點是否存在,如果存在,則直接將新節點插入到尾節點之後,然後修改尾節點指向。
  2. 如果尾節點不存在,則執行enq方法,該方法是一個死迴圈,保證新增的節點一定會被加入到佇列中

此時AQS佇列的結構如下 

增加尾節點為什麼要用cas,因為會存在多個執行緒競爭尾節點。

  addWaiter保證了,節點一定會被插入到佇列中。  接著看acquireQueued的實現

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {//當前節點的上個節點如果是頭結點,並且可以獲取鎖,則切換頭結點
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//鎖獲取失敗,首先,第一次迴圈設定當前節點的前一個節點的waitStatus為SIGNAL(待通知),第二次迴圈執行parkAndCheckInterrupt,掛起當前執行緒LockSupport.park(this);
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
此時AQS佇列的結構如下 
 
如果再有第三個執行緒過來,則AQS佇列結構如下 
 
acquireQueued主要做了兩件事 
1. 使當前節點的上一個節點的狀態為SIGNAL狀態. 
2. 阻塞當前節點的執行緒

unLock的內部實現

unLock內部也是通過sync物件來實現的,具體實現如下:

public void unlock() {
        sync.release(1);
    }

relese方法是AQS的方法

public final boolean release(int arg) {
        if (tryRelease(arg)) {//判斷是否能釋放鎖
    //釋放下一個執行緒,刪除原頭結點的指向
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

在獲取鎖的時候,我們知道tryAcquire是一個空方法,需要子類去實現。可以猜測tryRelease方法應該也是空方法。

protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

果不其然,是空方法,我們來看下ReentrantLock對tryRelase的重寫。

protected final boolean tryRelease(int releases) {
        int c = getState() - releases;//鎖的重入次數減1
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

此時AQS的資料結構如下:   此時僅僅能說明,第二個執行緒可以去獲取鎖,但是並不能代表它已經獲取到鎖,因為頭結點並沒有變。  當第二個執行緒的阻塞狀態被釋放後,acquireQueued方法

for (;;) {
        final Node p = node.predecessor();
        if (p == head && tryAcquire(arg)) {
            setHead(node);
            p.next = null; // help GC
            failed = false;
            return interrupted;
        }
        if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
            interrupted = true;
    }
此時AQS的資料結構如下: