1. 程式人生 > >JAVA多線程提高十:同步工具CyclicBarrier與CountDownLatch

JAVA多線程提高十:同步工具CyclicBarrier與CountDownLatch

將在 con 構造方法 interrupt getc bool 區別 成績 tco

今天繼續學習其它的同步工具:CyclicBarrier與CountDownLatch

一、CyclicBarrier

CyclicBarrier是一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程後可以重用,所以稱它為循環 的 barrier。

CyclicBarrier類似於CountDownLatch也是個計數器, 不同的是CyclicBarrier數的是調用了CyclicBarrier.await()進入等待的線程數, 當線程數達到了CyclicBarrier初始時規定的數目時,所有進入等待狀態的線程被喚醒並繼續。 CyclicBarrier就象它名字的意思一樣,可看成是個障礙, 所有的線程必須到齊後才能一起通過這個障礙。 CyclicBarrier初始時還可帶一個Runnable的參數,此Runnable任務在CyclicBarrier的數目達到後,所有其它線程被喚醒前被執行。

構造方法摘要:

造方法摘要
CyclicBarrier(int parties) 創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,但它不會在每個 barrier 上執行預定義的操作。
CyclicBarrier(int parties, Runnable barrierAction) 創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,並在啟動 barrier 時執行給定的屏障操作,該操作由最後一個進入 barrier 的線程執行

方法摘要

返回值方法
int await() 在所有參與者都已經在此 barrier 上調用 await 方法之前,將一直等待。
int await(long timeout, TimeUnit unit) 在所有參與者都已經在此屏障上調用 await 方法之前,將一直等待。
int getNumberWaiting() 返回當前在屏障處等待的參與者數目。
int getParties() 返回要求啟動此 barrier 的參與者數目。
boolean isBroken() 查詢此屏障是否處於損壞狀態。
void reset() 將屏障重置為其初始狀態。

代碼示例 :

示例一:

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class CyclicBarrierTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final  CyclicBarrier cb = new CyclicBarrier(3);//創建CyclicBarrier對象並設置3個公共屏障點
        for(int i=0;i<3;i++){
            Runnable runnable = new Runnable(){
                    public void run(){
                    try {
                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("線程" + Thread.currentThread().getName() + 
                                "即將到達集合地點1,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候");                        
                        cb.await();//到此如果沒有達到公共屏障點,則該線程處於等待狀態,如果達到公共屏障點則所有處於等待的線程都繼續往下運行

                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("線程" + Thread.currentThread().getName() + 
                                "即將到達集合地點2,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候");                        
                        cb.await();    
                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("線程" + Thread.currentThread().getName() + 
                                "即將到達集合地點3,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候");                        
                        cb.await();                        
                    } catch (Exception e) {
                        e.printStackTrace();
                    }                
                }
            };
            service.execute(runnable);
        }
        service.shutdown();
    }
}

輸出:

線程pool-1-thread-2即將到達集合地點1,當前已有0個已經到達,正在等候
線程pool-1-thread-1即將到達集合地點1,當前已有1個已經到達,正在等候
線程pool-1-thread-3即將到達集合地點1,當前已有2個已經到達,正在等候
線程pool-1-thread-3即將到達集合地點2,當前已有0個已經到達,正在等候
線程pool-1-thread-1即將到達集合地點2,當前已有1個已經到達,正在等候
線程pool-1-thread-2即將到達集合地點2,當前已有2個已經到達,正在等候
線程pool-1-thread-3即將到達集合地點3,當前已有0個已經到達,正在等候
線程pool-1-thread-2即將到達集合地點3,當前已有1個已經到達,正在等候
線程pool-1-thread-1即將到達集合地點3,當前已有2個已經到達,正在等候

示例二:

如果在構造CyclicBarrier對象的時候傳了一個Runnable對象進去,則每次到達公共屏障點的時候都最先執行這個傳進去的Runnable,然後再執行處於等待的Runnable。如果把上面的例子改成下面這樣:

package com.thread;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class CyclicBarrierTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        //final  CyclicBarrier cb = new CyclicBarrier(3);//創建CyclicBarrier對象並設置3個公共屏障點
        final  CyclicBarrier cb = new CyclicBarrier(3,new Runnable(){
            @Override
            public void run() {
                System.out.println("********我最先執行***********");
            }
        });
        for(int i=0;i<3;i++){
            Runnable runnable = new Runnable(){
                    public void run(){
                    try {
                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("線程" + Thread.currentThread().getName() + 
                                "即將到達集合地點1,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候");                        
                        cb.await();//到此如果沒有達到公共屏障點,則該線程處於等待狀態,如果達到公共屏障點則所有處於等待的線程都繼續往下運行

                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("線程" + Thread.currentThread().getName() + 
                                "即將到達集合地點2,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候");                        
                        cb.await();    //這裏CyclicBarrier對象又可以重用
                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("線程" + Thread.currentThread().getName() + 
                                "即將到達集合地點3,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候");                        
                        cb.await();                        
                    } catch (Exception e) {
                        e.printStackTrace();
                    }                
                }
            };
            service.execute(runnable);
        }
        service.shutdown();
    }
}

結果

線程pool-1-thread-1即將到達集合地點1,當前已有0個已經到達,正在等候
線程pool-1-thread-3即將到達集合地點1,當前已有1個已經到達,正在等候
線程pool-1-thread-2即將到達集合地點1,當前已有2個已經到達,正在等候
********我最先執行***********
線程pool-1-thread-1即將到達集合地點2,當前已有0個已經到達,正在等候
線程pool-1-thread-3即將到達集合地點2,當前已有1個已經到達,正在等候
線程pool-1-thread-2即將到達集合地點2,當前已有2個已經到達,正在等候
********我最先執行***********
線程pool-1-thread-1即將到達集合地點3,當前已有0個已經到達,正在等候
線程pool-1-thread-3即將到達集合地點3,當前已有1個已經到達,正在等候
線程pool-1-thread-2即將到達集合地點3,當前已有2個已經到達,正在等候
********我最先執行***********

二、CountDownLatch

一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。

用給定的計數 初始化 CountDownLatch。由於調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之後,會釋放所有等待的線程,await 的所有後續調用都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。

CountDownLatch 是一個通用同步工具,它有很多用途。將計數 1 初始化的 CountDownLatch 用作一個簡單的開/關鎖存器,或入口:在通過調用 countDown() 的線程打開入口前,所有調用 await 的線程都一直在入口處等待。用 N 初始化的 CountDownLatch 可以使一個線程在 N 個線程完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。

CountDownLatch 的一個有用特性是,它不要求調用 countDown 方法的線程等到計數到達零時才繼續,而在所有線程都能通過之前,它只是阻止任何線程繼續通過一個 await。

構造方法摘要
CountDownLatch(int count) 構造一個用給定計數初始化的 CountDownLatch。

方法摘要

返回值方法
void await() 使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷。
boolean await(long timeout, TimeUnit unit) 使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷或超出了指定的等待時間。
void countDown() 遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程。
long getCount() 返回當前計數。
String toString() 返回標識此鎖存器及其狀態的字符串。

代碼示例

一種典型用法是,將一個問題分成 N 個部分,用執行每個部分並讓鎖存器倒計數的 Runnable 來描述每個部分,然後將所有 Runnable 加入到 Executor 隊列。當所有的子部分完成後,協調線程就能夠通過 await。(當線程必須用這種方法反復倒計數時,可改為使用 CyclicBarrier。)

示例一:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountdownLatchTest1 {

        public static void main(String[] args) {
              ExecutorService service = Executors. newFixedThreadPool(3);
               final CountDownLatch latch = new CountDownLatch(3);
               for (int i = 0; i < 3; i++) {
                     Runnable runnable = new Runnable() {

                            @Override
                            public void run() {
                                   try {
                                         System. out.println("子線程" + Thread.currentThread().getName() + "開始執行");
                                         Thread. sleep((long) (Math. random() * 10000));
                                         System. out.println("子線程" + Thread.currentThread().getName() + "執行完成");
                                         latch.countDown(); // 當前線程調用此方法,則計數減一
                                  } catch (InterruptedException e) {
                                         e.printStackTrace();
                                  }
                           }
                     };
                     service.execute(runnable);
              }

               try {
                     System. out.println("主線程" + Thread.currentThread().getName() + "等待子線程執行完成..." );
                     latch.await(); // 阻塞當前線程,直到計時器的值為0
                     System. out.println("主線程" + Thread.currentThread().getName() + "開始執行...");
              } catch (InterruptedException e) {
                     e.printStackTrace();
              }
       }
}

示例二:百米賽跑,4名運動員選手到達場地等待裁判口令,裁判一聲口令,選手聽到後同時起跑,當所有選手到達終點,裁判進行匯總匯總排名。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountdownLatchTest2 {

        public static void main(String[] args) {
              ExecutorService service = Executors. newCachedThreadPool();
               final CountDownLatch cdOrder = new CountDownLatch(1);
               final CountDownLatch cdAnswer = new CountDownLatch(4);
               for (int i = 0; i < 4; i++) {
                     Runnable runnable = new Runnable() {
                            public void run() {
                                   try {
                                         System. out.println("選手" + Thread.currentThread().getName() + "正等待裁判發布口令");
                                         cdOrder.await();
                                         System. out.println("選手" + Thread.currentThread().getName() + "已接受裁判口令");
                                         Thread. sleep((long) (Math. random() * 10000));
                                         System. out.println("選手" + Thread.currentThread().getName() + "到達終點");
                                         cdAnswer.countDown();
                                  } catch (Exception e) {
                                         e.printStackTrace();
                                  }
                           }
                     };
                     service.execute(runnable);
              }
               try {
                     Thread. sleep((long) (Math. random() * 10000));

                     System. out.println("裁判" + Thread.currentThread ().getName() + "即將發布口令" );
                     cdOrder.countDown();
                     System. out.println("裁判" + Thread.currentThread ().getName() + "已發送口令,正在等待所有選手到達終點" );
                     cdAnswer.await();
                     System. out.println("所有選手都到達終點" );
                     System. out.println("裁判" + Thread.currentThread ().getName() + "匯總成績排名" );
              } catch (Exception e) {
                     e.printStackTrace();
              }
              service.shutdown();

       }
}

三、CountDownLatch與CyclicBarrier對比

CountDownLatchCyclicBarrier
減計數方式 加計數方式
計算為0時釋放所有等待的線程 計數達到指定值時釋放所有等待線程
計數為0時,無法重置 計數達到指定值時,計數置為0重新開始
調用countDown()方法計數減一,調用await()方法只進行阻塞,對計數沒任何影響 調用await()方法計數加1,若加1後的值不等於構造方法的值,則線程阻塞
不可重復利用 可重復利用

其它學習資料:

CyclicBarrier的用法
CountDownLatch(倒計時計數器)使用說明

CyclicBarrier和CountDownLatch區別

參考:

《Java多線程與並發庫高級應用》張孝祥

JAVA多線程提高十:同步工具CyclicBarrier與CountDownLatch