juc 之CyclicBarrier
CyclicBarrier是java.util.concurrent包下面的一個工具類,字面意思是可迴圈使用(Cyclic)的屏障(Barrier),通過它可以實現讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,所有被屏障攔截的執行緒才會繼續執行。
講解CyclicBarrier之前先說明CyclicBarrier能解決的實際問題,和CountDownLatch區別
CountDownLatch | CyclicBarrier |
---|---|
計數為0時,無法重置 | 計數達到0時,計數置為傳入的值重新開始 |
呼叫countDown()方法計數減一,呼叫await()方法只進行阻塞,對計數沒任何影響 | 呼叫await()方法計數減一,若減一後的值不等於0,則執行緒阻塞 |
不可重複使用 | 可重複使用 |
很顯然CyclicBarrier作用主要是計數的可重複使用,比如遊戲房間需要在個人進入才可以開始遊戲,可能會有三個屏障,1.進入遊戲 2.選擇角色 3.載入遊戲 這三個屏障都需要房間內的玩家都完成上一步之後才可以進行下一步. 類似這種需求就非常適合CyclicBarrier進行處理
CyclicBarrier構造方法:
// parties表示執行緒數,即遊戲中的人數 barrierAction表示越過屏障之後所要執行的任務 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } // 沒有任務的構造 public CyclicBarrier(int parties) { this(parties, null); } |
CyclicBarrier內建屬性
// 通過ReentrantLock(可重入鎖)實現同步,同步實現機制可先了解一下AQS (AbstractQueuedSynchronizer) private final ReentrantLock lock = new ReentrantLock(); // Condition 通過Condition實現執行緒等待,執行緒通訊 private final Condition trip = lock.newCondition(); // 柵欄數量 (遊戲房間內的人數) private final int parties; // 越過屏障之後需要執行的任務 private final Runnable barrierCommand; // 設定屏障狀態 如果執行中出現故障則狀態broken設定為true 然後重置barrier private Generation generation = new Generation(); // 當前某個屏障之前剩餘的等待數量 private int count; |
CyclicBarrier的核心方法為dowait方法,下面對該方法進行詳解
dowait 方法為private 方法通過await方法進行呼叫,兩個方式只是設定超時等待的區別 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { //把超時時間轉換為納秒單位 return dowait(true, unit.toNanos(timeout)); } |
//timed 是否有設定超時 nanos 納秒數 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //每執行一次dowait 維護的當前的count數量-- 如果等於0 表示所有執行緒都準備完畢, 可以進行下一步 int index = --count; if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; // 判斷barrier 是否有任務 if (command != null) command.run(); ranAction = true; // 重置barrier 進行下一步 nextGeneration(); return 0; } finally { // 如果任務沒有執行成功,則設定broken if (!ranAction) breakBarrier(); } } // 迴圈等待 直到喚醒 故障 中斷 或者超時 for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } |