JAVA並發之ReentrantLock源碼(二)
阿新 • • 發佈:2018-07-18
pat success next ava 並且 skip eas link lease
上一篇我們講到了ReentrantLock通過使用AQS框架實現了tryAcquire、tryRelease方法,從ReentrantLock層面分析源碼,本次我們將進一步深入AQS類,查看AQS底層是如何實現線程同步的。
1.acquire()
首先自然從加鎖開始看起,從lock.lock調用AQS中的acquire方法,我們已經進入了AQS源碼層面,一個看起來很簡潔的acquire如下:
1 /** 2 * Acquires in exclusive mode, ignoring interrupts. Implemented 3 * by invoking at least once {acquire進行了如下的操作: 1、tryAcquire(arg):嘗試獲取需要的資源,如果成功返回true,這個方法就完成啦,如果失敗,將會進入下一個步驟 在這個方法是需要使用者自己實現的語意,可以查看上一篇博客中reentrantlock中的實現。 2、addWaiter(Node.EXCLUSIVE), arg):新建一個當前線程的節點(獨占),將其放入CLH隊列中等待:@link #tryAcquire}, 4 * returning on success. Otherwise the thread is queued, possibly 5 * repeatedly blocking and unblocking, invoking {@link 6 * #tryAcquire} until success. This method can be used 7 * to implement method {@link Lock#lock}. 8 * 9 * @param arg the acquire argument. This value is conveyed to10 * {@link #tryAcquire} but is otherwise uninterpreted and 11 * can represent anything you like. 12 */ 13 public final void acquire(int arg) { 14 if (!tryAcquire(arg) && 15 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 16 selfInterrupt();17 }
1 private Node addWaiter(Node mode) { 2 Node node = new Node(Thread.currentThread(), mode); 3 // Try the fast path of enq; backup to full enq on failure 4 Node pred = tail; 5 if (pred != null) {//如果原來有頭有尾 6 node.prev = pred;//設置當前節點的前置節點 7 if (compareAndSetTail(pred, node)) {//CAS嘗試將其放入末尾 8 pred.next = node; 9 return node; 10 } 11 } 12 enq(node);//當原來沒頭沒尾的時候要同時CAS設置頭尾節點為當前節點 13 return node; 14 } 15 16 private Node enq(final Node node) { 17 for (;;) {//自旋不斷的用cas設置頭尾節點 18 Node t = tail; 19 if (t == null) { // Must initialize 20 if (compareAndSetHead(new Node())) 21 tail = head; 22 } else { 23 node.prev = t; 24 if (compareAndSetTail(t, node)) { 25 t.next = node; 26 return t; 27 } 28 } 29 } 30 }
3、acquireQueued:會獲取中斷,如果中斷了,就通過返回值true使acquire方法執行selfInterrupt()再將線程變成中斷狀態。讓我們來看看acquireQueued內部的實現:
1 final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true; 3 try { 4 boolean interrupted = false; 5 for (;;) {//不斷自旋 6 final Node p = node.predecessor(); 7 if (p == head && tryAcquire(arg)) {//如果當前節點的前一個節點排隊排到了頭的位置並且可以獲取到資源 8 setHead(node); 9 p.next = null; // help GC 10 failed = false; 11 return interrupted; 12 } 13 if (shouldParkAfterFailedAcquire(p, node) && 14 parkAndCheckInterrupt())//判斷是否需要去休息(如果是的話),就執行parkAndCheckInterrupt()方法去休息 15 interrupted = true; 16 } 17 } finally { 18 if (failed) 19 cancelAcquire(node); 20 } 21 } 22 23 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 24 int ws = pred.waitStatus; 25 if (ws == Node.SIGNAL) 26 /* 27 * This node has already set status asking a release 28 * to signal it, so it can safely park. 29 * 如果前一個節點已經變成 SIGNAL(會提醒下一個節點)狀態,當前節點的線程就可以去“睡覺”了 30 */ 31 return true; 32 if (ws > 0) { 33 /* 34 * Predecessor was cancelled. Skip over predecessors and 35 * indicate retry. 36 * 如果前一個線程已經被取消了,那麽將他們跳過不斷重復 37 */ 38 do { 39 node.prev = pred = pred.prev; 40 } while (pred.waitStatus > 0); 41 pred.next = node; 42 } else { 43 /* 44 * waitStatus must be 0 or PROPAGATE. Indicate that we 45 * need a signal, but don‘t park yet. Caller will need to 46 * retry to make sure it cannot acquire before parking. 47 * 如果前一個節點沒有變成SIGNAL狀態,就把它置為signal狀態 48 */ 49 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 50 } 51 return false; 52 } 53 54 private final boolean parkAndCheckInterrupt() { 55 LockSupport.park(this);//通過park方法讓當前線程阻塞 56 return Thread.interrupted();//返回當前線程的中斷情況 57 }
到這裏,acquire方法已經看完了,通過AQS簡介以及ReentranLock的tryAcquire方法,我們基本理解了ReentranLock加鎖的整個流程:
嘗試獲取資源->獲取失敗則加一個獨占的節點到CLH隊列排隊->讓前一個節點變為signal狀態->把自己阻塞等待前一個節點喚醒(當然喚醒操作會在release中實現)。 相信看到這裏,都會迫不及待的點開release方法查看當前線程是怎麽被前一個線程喚醒的吧,那我們就一起來看一看!1 public final boolean release(int arg) { 2 if (tryRelease(arg)) {//1 3 Node h = head; 4 if (h != null && h.waitStatus != 0) 5 unparkSuccessor(h); 6 return true; 7 } 8 return false; 9 } 10 private void unparkSuccessor(Node node) { 11 /* 12 * If status is negative (i.e., possibly needing signal) try 13 * to clear in anticipation of signalling. It is OK if this 14 * fails or if status is changed by waiting thread. 15 */ 16 int ws = node.waitStatus; 17 if (ws < 0) 18 compareAndSetWaitStatus(node, ws, 0); 19 /* 20 * Thread to unpark is held in successor, which is normally 21 * just the next node. But if cancelled or apparently null, 22 * traverse backwards from tail to find the actual 23 * non-cancelled successor. 24 */ 25 Node s = node.next; 26 if (s == null || s.waitStatus > 0) {//找到下一個不是取消狀態的節點 27 s = null; 28 for (Node t = tail; t != null && t != node; t = t.prev) 29 if (t.waitStatus <= 0) 30 s = t; 31 } 32 if (s != null)//節點不為空就把下一個節點喚醒 33 LockSupport.unpark(s.thread); 34 }
1、嘗試釋放資源(一般來說都是會成功的,除非被中斷了)
2、找到頭節點,如果頭節點不為空,並且當前節點的等待狀態不為0(有下一個節點改變過當前節點的等待狀態) 3、unparkSuccessor方法找到下一個不是取消狀態的節點,如果不是空,就把它喚醒 看完這個,結合三篇博客,我們對ReentranLock的流程,以及AQS在其中做了哪些重要的事情都弄明白了,也對使用AQS構造同步變量需要實現哪些方法有了一定的理解,雖然不一定能立刻上手實現一個,但是給你一個實現的同步器比如CountdownLatch等,查看它的內部實現一定不會出現翻源碼繞暈的情況了。 那麽本篇ReentranLock基於AQS層面的實現就到此結束啦,也算是填了之前挖的一個坑。JAVA並發之ReentrantLock源碼(二)