1. 程式人生 > 程式設計 >ReentrantLock原始碼簡析

ReentrantLock原始碼簡析

Structure

ReentrantLock的結構如下圖所示,其中Sync是ReentrantLock的抽象內部靜態類,繼承自AQS。FairSync和NonfairSync繼承Sync.

image-20190421141918076
AQS使用了模板方法的模式,一個模板方法是定義在抽象類中的,把基本操作方法組合在一起形成一個總演演算法或一個總行為的方法。一個抽象類可以有任意多個模板方法,而不限於一個。每一個模板方法都可以呼叫任意多個具體方法。 AQS中的模板方法有 acquire,acquireShared,release,releaseShared等,而模板方法通過呼叫子類實現的具體方法(鉤子)來實現。 AQS中定義的鉤子方法:

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

鉤子方法不需要全部重寫,沒重寫的鉤子方法預設會丟擲UnsupportedOperationException。

image-20190421145309101

Lock

ReentrantLok建構函式時候可以指定使用公平性,不指定的話預設使用非公平鎖。所謂公平就是按鎖等待時間排序,非公平就是lock時候立即嘗試獲取一次能搶到就用,搶不到再去acquire排隊。非公平效率更高一些所以預設使用非公平。但是非公平可能導致執行緒飢餓,即一個執行緒一直被其他執行緒搶佔鎖導致長時間獲取不到鎖。

public
ReentrantLock()
{ sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } 複製程式碼
//ReentrantLock的lock方法
public void lock() {
        sync.lock();
		}
//NoneFairSycn的lock實現
 final void lock() {
   			// 先立即嘗試一下加鎖(非公平)
        if (compareAndSetState(0
,1)) setExclusiveOwnerThread(Thread.currentThread()); else //獲取不到再去acquire acquire(1) } //compareAndSetState通過unsafe的CAS實現(jdk9之後用的是VarHandler) protected final boolean compareAndSetState(int expect,int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this,stateOffset,expect,update); } 複製程式碼

ReentrantLock中的lock方法最後會呼叫AQS中的模板方法acquire,acquire方法會呼叫tryAcquire嘗試獲取鎖,獲取不到(返回false)的話再使用acquireQueue方法排隊等待。注意這裡selfInterrupt()不會中斷,只會設定下中斷標記,只有使用acquireInterruptibly時才會響應中斷,不過那個方法也不會調到這裡來而是呼叫doAcquireInterruptibly()

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
      //acquireQueued這個方法會block住,如果block的過程中被中斷了會返回true,此時if成立 執行selfInterrupt(Thread.interrupt,不會拋異常)
        selfInterrupt();
}
複製程式碼

非公平鎖的tryAcquire實現:

//NonfairSync 中的tryAcquire實現
protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
  //state為0說明沒有執行緒佔用鎖
    if (c == 0) {
      //直接cas嘗試佔用
        if (compareAndSetState(0,acquires)) {
          //exclusiveOwnerThread是非volatile的,其getter和setter也是未加鎖的
          //所以使用時要確認不會有其他執行緒嘗試set
          //這裡(compareAndSetState(0,acquires)保證只有當前執行緒操作
            setExclusiveOwnerThread(current);
            return true;
        }
    }
  //如果鎖的當前持有者就是當前執行緒,那麼就直接獲取(state+1),實現了ReentrantLock的可重入
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
  //都不滿足搶佔鎖失敗,返回false
    return false;
}
複製程式碼

addWaiter將當前執行緒是封裝為Node物件,加入同步佇列(實際上就是個Node的連結串列)

private Node addWaiter(Node mode) {
				//mode用來標識是獨佔還是共享 
        Node node = new Node(Thread.currentThread(),mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
           	//先快速嘗試一遍把節點插到尾部,cas失敗的話再使用enq方法插入
            if (compareAndSetTail(pred,node)) {
                pred.next = node;
                return node;
            }
        }
  			//enq就是在自旋鎖裡使用cas插到佇列尾部
        enq(node);
        return node;
    }
複製程式碼

Node的結構以及waitstatus:

 static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
				//在同步佇列中等待的執行緒等待超時或者被中斷,需要從同步佇列中取消等待。
        static final int CANCELLED =  1;
				//後繼節點處於等待狀態,當前節點如果釋放了同步狀態或者被取消,
				//將會通知後繼節點,使後繼節點得以執行.
        static final int SIGNAL    = -1;
        //節點在等待佇列中,等其他執行緒呼叫了condition的signal之後
        //該節點將會從等待佇列中轉移到同步佇列中
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
				//初始狀態0
        volatile int waitStatus;
				//前節點
        volatile Node prev;
				//後節點
        volatile Node next;
      	//當前節點裝載的執行緒
        volatile Thread thread;
				//等待佇列中的後繼節點,因為只有獨佔鎖才有等待佇列所以如果當前節點是共享的
				//複用該欄位作為節點型別標記(常量SHARED). (我個人很討厭這種騷操作)
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node,or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided,but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread,Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread,int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
複製程式碼

acquireQueued是AQS中的方法,通過自旋鎖+park(停滯)的方式將一個node插入同步佇列中

這裡在tryAcquire之前會判斷前驅節點是否是頭結點,這保證了同步佇列的FIFO,如果當前節點被提前喚醒(中斷或者前驅節點cancelAcquire導致parkSuccessor),只有前驅節點是head的時候才會嘗試去獲取鎖

final boolean acquireQueued(final Node node,int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //這裡用一個死迴圈自旋
        for (;;) {
        		//p是當前節點的前驅節點
            final Node p = node.predecessor();
            //如果前置節點是頭結點(即當前節點是下一個要獲取鎖的節點),再去嘗試獲取鎖
            if (p == head && tryAcquire(arg)) {
            //將當前節點置為頭結點
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
          //短路操作,嘗試獲取鎖失敗之後判斷是否可以park
          //可以park,呼叫parkAndCheckInterrupt方法park當前執行緒
            if (shouldParkAfterFailedAcquire(p,node) &&
                parkAndCheckInterrupt())
              //騷寫法,parkAndCheckInterrupt()返回true時才能執行到這裡,說明有中斷
                interrupted = true;
        }
    } finally {
      //如果在tryAcquire成功之前異常了,取消acquire,刪除當前節點
        if (failed)
            cancelAcquire(node);
    }
}
複製程式碼

shouldParkAfterFailedAcquire判斷是否可以park當前程式,程式碼如下:

private static boolean shouldParkAfterFailedAcquire(Node pred,Node node) {
    int ws = pred.waitStatus;
  	//如果前驅節點waitStatus是signal,前驅節點釋放了後會通知當前node,可以直接park
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it,so it can safely park.
         */
        return true;
  //唯一大於0的狀態就是cancel,前驅節點狀態為cancel說明前驅節點被中斷或超時了,那麼向前繼續尋找
  //知道找到狀態>=0的節點,插在該節點後面
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
      //第一次呼叫這個方法時都會進到這個分支,因為前驅節點的狀態為0.然後回到自旋流程
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal,but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
      //cas將前驅接節點狀態設定為signal
        compareAndSetWaitStatus(pred,ws,Node.SIGNAL);
    }
  //前驅節點不為signal,繼續自旋
    return false;
}
複製程式碼

park阻塞當前執行緒,呼叫unpark或者中斷時候才會從park方法返回,返回之後通過Thread.interrupted()確認執行緒是否是被中斷.

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

複製程式碼

Unlock(release)

通過呼叫ReentrantLock的unlock方法來釋放一個鎖

public void unlock() {
    sync.release(1);
}

複製程式碼

release是AQS裡的模板方法 先呼叫tryRelease把state減到0 再呼叫unparkSuccessor unpark後繼節點.

這裡沒有對同步佇列的連結串列做操作,連結串列的頭結點替換是acquireQueue裡做的

public final boolean release(int arg) {
  //呼叫reentrantLock裡的tryRelease具體實現
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
          //unpark後繼節點
            unparkSuccessor(h);
        return true;
    }
    return false;
}
複製程式碼

java.util.concurrent.locks.ReentrantLock.Sync#tryRelease:

protected final boolean tryRelease(int releases) {
		//state release一次減1(可能重入過多次)
    int c = getState() - releases;
  	//未拿到鎖的執行緒不能釋放其他執行緒的鎖
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
  	//state減到0釋放鎖
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
複製程式碼

釋放鎖之後unpark後繼節點java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor:

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e.,possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node,0);

    /*
     * Thread to unpark is held in successor,which is normally
     * just the next node.  But if cancelled or apparently null,* traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
  //如果後繼節點為null或者被cancel了 從尾部往前找,node後面的第一個ws小於0的節點
  //然後unpark這個節點
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
複製程式碼

Condition

condition是為了提供類似Object監視器中的wait/notify方法

ConditionObject是AQS的內部類,實現Conditon介面. 每個Condition物件都包含一個等待佇列,和同步佇列複用了Node物件.當前執行緒呼叫Conditon#awai方法會以當前執行緒構造節點,並將節點插入等待佇列尾部.

一個同步器擁有一個同步佇列和多個等待佇列,等待佇列持有同步佇列的引用.

image-20190423155406586

通過ReentrantLock的newCondition方法建立一個condition

//java.util.concurrent.locks.ReentrantLock#newCondition
public Condition newCondition() {
    return sync.newCondition();
}
//java.util.concurrent.locks.ReentrantLock.Sync#newCondition
final ConditionObject newCondition() {
            return new ConditionObject();
        }
複製程式碼

看一個使用示例:

@Test
public void testAQSCondition() {
    ReentrantLock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    Thread threadA = new Thread(() -> {
      //先獲取鎖
        lock.lock();
        condition.signalAll();
        System.out.println("threadA signal all");
        try {
            Thread.sleep(5 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("threadA unlock");
          //釋放鎖
            lock.unlock();
        }
    });

    lock.lock();
    threadA.start();
    try {
      //等待並釋放鎖
        condition.await();
        System.out.println("wake from await");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}

input:
threadA signal all
threadA unlock
wake from await
複製程式碼

在上面的例子中主執行緒先獲取鎖,然後再呼叫await方法阻塞當前執行緒,該方法會釋放鎖.之後執行緒A才會獲得鎖.threadA呼叫signalAll之後還要釋放鎖主執行緒才會從await處返回,可以看到和監視器鎖的wait/notify十分類似.

public final void await() throws InterruptedException {
  //如果當前執行緒被中斷了丟擲異常
    if (Thread.interrupted())
        throw new InterruptedException();
  //新建一個Node,新增到等待佇列
    Node node = addConditionWaiter();
  //釋放當前鎖
    int savedState = fullyRelease(node);
    int interruptMode = 0;
  //自旋,確定node轉移到同步佇列退出
    while (!isOnSyncQueue(node)) {
      //park阻塞
        LockSupport.park(this);
      //喚醒之後如果有中斷退出迴圈
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
  //先插入同步佇列 獲得鎖之後再判斷中斷型別
    if (acquireQueued(node,savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
  //根據中斷模式處理中斷(拋異常或者Thread.interrupt)
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
複製程式碼

SharedLock

ReentrantReadWriteLock 通過writeLock和readLock分別獲取讀寫鎖

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
複製程式碼

這玩意就是內部包了兩個靜態內部類,一個WriteLock一個ReadLock,兩個都實現了Lock介面,擁有一個實現了AQS的sync field. witeLock是獨佔鎖,readlock是共享鎖.

讀寫鎖和reentrantLock一樣,提供公平性選擇,也是可重入的.

讀寫鎖也是使用一個volatile的state來表示鎖狀態. 做了按位分割.高16位用來表示讀狀態,低16位用來表示寫狀態.

如果存在讀鎖 不能獲取寫鎖 因為寫操作應該對讀操作可見,只有所有讀鎖都是放了才能獲取寫鎖

如果其他執行緒獲取了寫鎖 當前執行緒不能獲得讀鎖 只有當前執行緒獲得了寫鎖或者寫鎖沒有被任何執行緒獲取 才能獲得讀鎖

不想寫了 煩 程式碼先不貼了 有時間再說吧