JAVA多線程提高十:同步工具CyclicBarrier與CountDownLatch
今天繼續學習其它的同步工具:CyclicBarrier與CountDownLatch
一、CyclicBarrier
CyclicBarrier是一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程後可以重用,所以稱它為循環 的 barrier。
CyclicBarrier類似於CountDownLatch也是個計數器, 不同的是CyclicBarrier數的是調用了CyclicBarrier.await()進入等待的線程數, 當線程數達到了CyclicBarrier初始時規定的數目時,所有進入等待狀態的線程被喚醒並繼續。 CyclicBarrier就象它名字的意思一樣,可看成是個障礙, 所有的線程必須到齊後才能一起通過這個障礙。 CyclicBarrier初始時還可帶一個Runnable的參數,此Runnable任務在CyclicBarrier的數目達到後,所有其它線程被喚醒前被執行。
構造方法摘要:
造方法摘要 |
---|
CyclicBarrier(int parties) 創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,但它不會在每個 barrier 上執行預定義的操作。 |
CyclicBarrier(int parties, Runnable barrierAction) 創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,並在啟動 barrier 時執行給定的屏障操作,該操作由最後一個進入 barrier 的線程執行 |
方法摘要
返回值 | 方法 |
---|---|
int | await() 在所有參與者都已經在此 barrier 上調用 await 方法之前,將一直等待。 |
int | await(long timeout, TimeUnit unit) 在所有參與者都已經在此屏障上調用 await 方法之前,將一直等待。 |
int | getNumberWaiting() 返回當前在屏障處等待的參與者數目。 |
int | getParties() 返回要求啟動此 barrier 的參與者數目。 |
boolean | isBroken() 查詢此屏障是否處於損壞狀態。 |
void | reset() 將屏障重置為其初始狀態。 |
代碼示例 :
示例一:
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(3);//創建CyclicBarrier對象並設置3個公共屏障點 for(int i=0;i<3;i++){ Runnable runnable = new Runnable(){ public void run(){ try { Thread.sleep((long)(Math.random()*10000)); System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點1,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候"); cb.await();//到此如果沒有達到公共屏障點,則該線程處於等待狀態,如果達到公共屏障點則所有處於等待的線程都繼續往下運行 Thread.sleep((long)(Math.random()*10000)); System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點2,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候"); cb.await(); Thread.sleep((long)(Math.random()*10000)); System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點3,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候"); cb.await(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } service.shutdown(); } }
輸出:
線程pool-1-thread-2即將到達集合地點1,當前已有0個已經到達,正在等候 線程pool-1-thread-1即將到達集合地點1,當前已有1個已經到達,正在等候 線程pool-1-thread-3即將到達集合地點1,當前已有2個已經到達,正在等候 線程pool-1-thread-3即將到達集合地點2,當前已有0個已經到達,正在等候 線程pool-1-thread-1即將到達集合地點2,當前已有1個已經到達,正在等候 線程pool-1-thread-2即將到達集合地點2,當前已有2個已經到達,正在等候 線程pool-1-thread-3即將到達集合地點3,當前已有0個已經到達,正在等候 線程pool-1-thread-2即將到達集合地點3,當前已有1個已經到達,正在等候 線程pool-1-thread-1即將到達集合地點3,當前已有2個已經到達,正在等候
示例二:
如果在構造CyclicBarrier對象的時候傳了一個Runnable對象進去,則每次到達公共屏障點的時候都最先執行這個傳進去的Runnable,然後再執行處於等待的Runnable。如果把上面的例子改成下面這樣:
package com.thread; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); //final CyclicBarrier cb = new CyclicBarrier(3);//創建CyclicBarrier對象並設置3個公共屏障點 final CyclicBarrier cb = new CyclicBarrier(3,new Runnable(){ @Override public void run() { System.out.println("********我最先執行***********"); } }); for(int i=0;i<3;i++){ Runnable runnable = new Runnable(){ public void run(){ try { Thread.sleep((long)(Math.random()*10000)); System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點1,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候"); cb.await();//到此如果沒有達到公共屏障點,則該線程處於等待狀態,如果達到公共屏障點則所有處於等待的線程都繼續往下運行 Thread.sleep((long)(Math.random()*10000)); System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點2,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候"); cb.await(); //這裏CyclicBarrier對象又可以重用 Thread.sleep((long)(Math.random()*10000)); System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點3,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候"); cb.await(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } service.shutdown(); } }
結果
線程pool-1-thread-1即將到達集合地點1,當前已有0個已經到達,正在等候 線程pool-1-thread-3即將到達集合地點1,當前已有1個已經到達,正在等候 線程pool-1-thread-2即將到達集合地點1,當前已有2個已經到達,正在等候 ********我最先執行*********** 線程pool-1-thread-1即將到達集合地點2,當前已有0個已經到達,正在等候 線程pool-1-thread-3即將到達集合地點2,當前已有1個已經到達,正在等候 線程pool-1-thread-2即將到達集合地點2,當前已有2個已經到達,正在等候 ********我最先執行*********** 線程pool-1-thread-1即將到達集合地點3,當前已有0個已經到達,正在等候 線程pool-1-thread-3即將到達集合地點3,當前已有1個已經到達,正在等候 線程pool-1-thread-2即將到達集合地點3,當前已有2個已經到達,正在等候 ********我最先執行***********
二、CountDownLatch
一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。
用給定的計數 初始化 CountDownLatch。由於調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之後,會釋放所有等待的線程,await 的所有後續調用都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。
CountDownLatch 是一個通用同步工具,它有很多用途。將計數 1 初始化的 CountDownLatch 用作一個簡單的開/關鎖存器,或入口:在通過調用 countDown() 的線程打開入口前,所有調用 await 的線程都一直在入口處等待。用 N 初始化的 CountDownLatch 可以使一個線程在 N 個線程完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。
CountDownLatch 的一個有用特性是,它不要求調用 countDown 方法的線程等到計數到達零時才繼續,而在所有線程都能通過之前,它只是阻止任何線程繼續通過一個 await。
構造方法摘要
CountDownLatch(int count) 構造一個用給定計數初始化的 CountDownLatch。
方法摘要
返回值 | 方法 |
---|---|
void | await() 使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷。 |
boolean | await(long timeout, TimeUnit unit) 使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷或超出了指定的等待時間。 |
void | countDown() 遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程。 |
long | getCount() 返回當前計數。 |
String | toString() 返回標識此鎖存器及其狀態的字符串。 |
代碼示例
一種典型用法是,將一個問題分成 N 個部分,用執行每個部分並讓鎖存器倒計數的 Runnable 來描述每個部分,然後將所有 Runnable 加入到 Executor 隊列。當所有的子部分完成後,協調線程就能夠通過 await。(當線程必須用這種方法反復倒計數時,可改為使用 CyclicBarrier。)
示例一:
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountdownLatchTest1 { public static void main(String[] args) { ExecutorService service = Executors. newFixedThreadPool(3); final CountDownLatch latch = new CountDownLatch(3); for (int i = 0; i < 3; i++) { Runnable runnable = new Runnable() { @Override public void run() { try { System. out.println("子線程" + Thread.currentThread().getName() + "開始執行"); Thread. sleep((long) (Math. random() * 10000)); System. out.println("子線程" + Thread.currentThread().getName() + "執行完成"); latch.countDown(); // 當前線程調用此方法,則計數減一 } catch (InterruptedException e) { e.printStackTrace(); } } }; service.execute(runnable); } try { System. out.println("主線程" + Thread.currentThread().getName() + "等待子線程執行完成..." ); latch.await(); // 阻塞當前線程,直到計時器的值為0 System. out.println("主線程" + Thread.currentThread().getName() + "開始執行..."); } catch (InterruptedException e) { e.printStackTrace(); } } }
示例二:百米賽跑,4名運動員選手到達場地等待裁判口令,裁判一聲口令,選手聽到後同時起跑,當所有選手到達終點,裁判進行匯總匯總排名。
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountdownLatchTest2 { public static void main(String[] args) { ExecutorService service = Executors. newCachedThreadPool(); final CountDownLatch cdOrder = new CountDownLatch(1); final CountDownLatch cdAnswer = new CountDownLatch(4); for (int i = 0; i < 4; i++) { Runnable runnable = new Runnable() { public void run() { try { System. out.println("選手" + Thread.currentThread().getName() + "正等待裁判發布口令"); cdOrder.await(); System. out.println("選手" + Thread.currentThread().getName() + "已接受裁判口令"); Thread. sleep((long) (Math. random() * 10000)); System. out.println("選手" + Thread.currentThread().getName() + "到達終點"); cdAnswer.countDown(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } try { Thread. sleep((long) (Math. random() * 10000)); System. out.println("裁判" + Thread.currentThread ().getName() + "即將發布口令" ); cdOrder.countDown(); System. out.println("裁判" + Thread.currentThread ().getName() + "已發送口令,正在等待所有選手到達終點" ); cdAnswer.await(); System. out.println("所有選手都到達終點" ); System. out.println("裁判" + Thread.currentThread ().getName() + "匯總成績排名" ); } catch (Exception e) { e.printStackTrace(); } service.shutdown(); } }
三、CountDownLatch與CyclicBarrier對比
CountDownLatch | CyclicBarrier |
---|---|
減計數方式 | 加計數方式 |
計算為0時釋放所有等待的線程 | 計數達到指定值時釋放所有等待線程 |
計數為0時,無法重置 | 計數達到指定值時,計數置為0重新開始 |
調用countDown()方法計數減一,調用await()方法只進行阻塞,對計數沒任何影響 | 調用await()方法計數加1,若加1後的值不等於構造方法的值,則線程阻塞 |
不可重復利用 | 可重復利用 |
其它學習資料:
CyclicBarrier的用法
CountDownLatch(倒計時計數器)使用說明
CyclicBarrier和CountDownLatch區別
參考:
《Java多線程與並發庫高級應用》張孝祥
JAVA多線程提高十:同步工具CyclicBarrier與CountDownLatch