1. 程式人生 > >《Java原始碼分析》:CyclicBarrier(part one)

《Java原始碼分析》:CyclicBarrier(part one)

《Java原始碼分析》:CyclicBarrier (part one)

CyclicBarrier字面意思迴環柵欄,通過它可以實現讓一組執行緒等待至某個狀態之後再全部同時執行。叫做迴環是因為當所有等待執行緒都被釋放以後,CyclicBarrier可以被重用。我們暫且把這個狀態就叫做barrier,當呼叫await()方法之後,執行緒就處於barrier了。

舉個例子哈:當我們10個人相約包車去成都玩,車早上就在學校門口等著我們10個人上車(規定只有10個人全部上車後才能開車出發),現在陸陸續續的一個一個同學上車了,在第10個人到來之前,其它人都必須等待。在第10個人到來之後車就出發去成都了每個人就開始該幹嘛就幹嘛去了。

以上就是CyclicBarrier.

當然了,上面說的是正常不出意外的情況。CyclicBarrier和等車的例子一樣,會出現各種情況(意外):

例如:在車上等待的同學,突然有事(例如:老闆打電話有事)不去玩了,則就會告訴所有人,我不去了哈,於是大家就都不需要等待了,各自下車去忙自己的事去。

還有一些情況,這裡就不用例子一一來說明情況了。

CyclicBarrier常見的方法

有兩個建構函式

CyclicBarrier(int parties)
建立一個新的 CyclicBarrier,它將在給定數量的參與者(執行緒)處於等待狀態時啟動,但它不會在啟動 barrier 時執行預定義的操作。

CyclicBarrier(int parties, Runnable barrierAction)
建立一個新的 CyclicBarrier,它將在給定數量的參與者(執行緒)處於等待狀態時啟動,並在啟動 barrier 時執行給定的屏障操作,該操作由最後一個進入 barrier 的執行緒執行。

int await()
在所有參與者都已經在此 barrier 上呼叫 await 方法之前,將一直等待。 所有執行緒都到達barrier狀態再同時執行後續任務;

int await(long timeout, TimeUnit unit)
在所有參與者都已經在此屏障上呼叫 await 方法之前將一直等待,或者超出了指定的等待時間。

下面先看幾個關於CyclicBarrier的應用,然後再來從原始碼的角度來分析。

例子1

假設有3個執行緒,3個執行緒各自進行初始化,但是隻有3個執行緒都初始化完成之後才使得3個執行緒同時繼續執行幹其它的工作。

先看利用CyclicBarrier(int parties) 構造物件並呼叫await() 進行等待的簡單應用。

    public class CyclicBarrierDemo {
        private final static int NUM = 3;
        public static void main(String[] args) {
            final CyclicBarrier barrier = new CyclicBarrier(NUM);

            for(int i=0;i<NUM;i++){
                new Task(barrier).start();
            }

        }

        static class Task extends Thread{

            private CyclicBarrier barrier;
            public Task(CyclicBarrier barrier){
                this.barrier = barrier;
            }

            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "  初始化開始。。。");
                try {
                    Thread.sleep(2000);//模擬初始化
                    System.out.println(Thread.currentThread().getName() + "  初始化結束,等待其他task初始化結束,然後再繼續執行!");
                    barrier.await();//在所有執行緒均到達此barrier前,等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 其他task初始化結束,開始執行!");

            }

        }

    }

執行結果如下:

Thread-0  初始化開始。。。
Thread-1  初始化開始。。。
Thread-2  初始化開始。。。
Thread-0  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-2  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-1  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-2 其他task初始化結束,開始執行!
Thread-1 其他task初始化結束,開始執行!
Thread-0 其他task初始化結束,開始執行!

看了這個例子,CyclicBarrier是不是比較容易理解和應用。

例子2

在API文件中我們知道CyclicBarrier是可以重新利用的,通過下面這個例子就清楚了。

    public class CyclicBarrierDemo2 {
        private final static int NUM = 3;
        public static void main(String[] args) {
            final CyclicBarrier barrier = new CyclicBarrier(NUM);

            for(int i=0;i<NUM;i++){
                new Task(barrier).start();
            }

            try {
                Thread.sleep(20000);//等待其他執行緒執行完畢,當然我們這裡可以選擇使用CountDownLatch來實現。
                System.out.println("main執行緒休息結束!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //觀察下CyclicBarrier是否可以複用
            for(int i=0;i<NUM;i++){
                new Task(barrier).start();
            }


        }

        static class Task extends Thread{

            private CyclicBarrier barrier;
            public Task(CyclicBarrier barrier){
                this.barrier = barrier;
            }

            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "  初始化開始。。。");
                try {
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + "  初始化結束,等待其他task初始化結束,然後再繼續執行!");
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 其他task初始化結束,開始執行!");

            }

        }

    }

執行結果如下:

Thread-1  初始化開始。。。
Thread-0  初始化開始。。。
Thread-2  初始化開始。。。
Thread-1  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-0  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-2  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-2 其他task初始化結束,開始執行!
Thread-1 其他task初始化結束,開始執行!
Thread-0 其他task初始化結束,開始執行!
main執行緒休息結束!
Thread-3  初始化開始。。。
Thread-4  初始化開始。。。
Thread-5  初始化開始。。。
Thread-4  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-3  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-5  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-5 其他task初始化結束,開始執行!
Thread-4 其他task初始化結束,開始執行!
Thread-3 其他task初始化結束,開始執行!

結論:從結果可以看出,CyclicBarrier確實是可以重用的。而CountDownLatch是不可以重用的

例子3

這個例子來看下 int await(long timeout, TimeUnit unit) 的應用。

讓這些執行緒等待至一定的時間,如果還有執行緒沒有到達barrier狀態直接拋異常讓到達barrier的執行緒執行後續任務。

    public class CyclicBarrierDemo3 {
        private final static int NUM = 3;
        public static void main(String[] args){
            final CyclicBarrier barrier = new CyclicBarrier(NUM);

            for(int i=0;i<NUM;i++){
                if(i==NUM-1){
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //先休息再開啟執行緒
                    new Task(barrier).start();

                }
                else{
                    new Task(barrier).start();
                }

            }

        }

        static class Task extends Thread{

            private CyclicBarrier barrier;
            public Task(CyclicBarrier barrier){
                this.barrier = barrier;
            }

            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "  初始化開始。。。");
                try {
                    Thread.sleep(2000);//模擬初始化
                    System.out.println(Thread.currentThread().getName() + "  初始化結束,等待其他task初始化結束,然後再繼續執行!");
                    barrier.await(3, TimeUnit.SECONDS);//等待3秒,如果所有執行緒3秒後還沒有到達barrier,則會拋異常然後繼續往下面執行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 其他task初始化結束,開始執行!");

            }

        }

    }

執行結果如下:

Thread-1  初始化開始。。。
Thread-0  初始化開始。。。
Thread-1  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-0  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-2  初始化開始。。。
java.util.concurrent.TimeoutException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at com.wrh.readwritelock.CyclicBarrierDemo3$Task.run(CyclicBarrierDemo3.java:45)
Thread-0 其他task初始化結束,開始執行!
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at com.wrh.readwritelock.CyclicBarrierDemo3$Task.run(CyclicBarrierDemo3.java:45)
Thread-1 其他task初始化結束,開始執行!
Thread-2  初始化結束,等待其他task初始化結束,然後再繼續執行!
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
    at com.wrh.readwritelock.CyclicBarrierDemo3$Task.run(CyclicBarrierDemo3.java:45)
Thread-2 其他task初始化結束,開始執行!

結論:從結果可以看出,如果使用await(time,unit);當到達時間後,還存在其他執行緒沒有到達等待點,則已經到達等待點的執行緒就會直接拋異常繼續往下面執行

例子4

下面當建立CyclicBarrier物件時,指定一個Runnable物件。即利用CyclicBarrier(int parties, Runnable barrierAction) 建構函式來構造一個物件。

CyclicBarrier(int parties, Runnable barrierAction)
建立一個新的 CyclicBarrier,它將在給定數量的參與者(執行緒)處於等待狀態時啟動,並在啟動 barrier 時執行給定的屏障操作,該操作由最後一個進入 barrier 的執行緒執行。

    public class CyclicBarrierDemo4 {
        private final static int NUM = 3;
        public static void main(String[] args) {
            final CyclicBarrier barrier = new CyclicBarrier(NUM,new Runnable(){

                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(20);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "  執行barrier自帶的任務!");
                }

            });

            for(int i=0;i<NUM;i++){
                new Task(barrier).start();
            }

        }

        static class Task extends Thread{

            private CyclicBarrier barrier;
            public Task(CyclicBarrier barrier){
                this.barrier = barrier;
            }

            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "  初始化開始。。。");
                try {
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + "  初始化結束,等待其他task初始化結束,然後再繼續執行!");
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 其他task初始化結束,開始執行!");

            }

        }

    }

執行結果如下:

Thread-1  初始化開始。。。
Thread-0  初始化開始。。。
Thread-2  初始化開始。。。
Thread-1  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-0  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-2  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-0  執行barrier自帶的任務!
Thread-0 其他task初始化結束,開始執行!
Thread-1 其他task初始化結束,開始執行!
Thread-2 其他task初始化結束,開始執行!

線上程中我故意將CyclicBarrier 中的sleep時間設定的相當長,發現結果是這樣的。

當所有執行緒都到達barrier之後,然後呼叫CyclicBarrier自帶的Runnable裡面的run方法。
最後才是所有執行緒等待結束開始繼續執行。

至於是哪個執行緒來執行CyclicBarrier物件中的任務就需要我們看下原始碼的實現了,在API文件上看到的是:最後一個進入到barrier的執行緒將執行。在原始碼中我們可以看到確實是最後一個執行緒來執行的,可以看後面的原始碼分析。

例子5

這個例子來看下,當建立CyclicBarrier物件時,指定一個Runnable物件。即利用CyclicBarrier(int parties, Runnable barrierAction) 建構函式來構造一個物件。

當最後一個進行barrier處的執行緒執行barrierAction時耗時特別長,且每個執行緒呼叫的是await(time,TimeUnit);會不會拋異常

    public class CyclicBarrierDemo5 {
        private final static int NUM = 3;
        public static void main(String[] args) {
            final CyclicBarrier barrier = new CyclicBarrier(NUM,new Runnable(){

                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(20);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "  執行barrier自帶的任務!");
                }

            });

            for(int i=0;i<NUM;i++){
                new Task(barrier).start();
            }

        }

        static class Task extends Thread{

            private CyclicBarrier barrier;
            public Task(CyclicBarrier barrier){
                this.barrier = barrier;
            }

            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "  初始化開始。。。");
                try {
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + "  初始化結束,等待其他task初始化結束,然後再繼續執行!");
                    barrier.await(2,TimeUnit.SECONDS);//等待兩秒
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 其他task初始化結束,開始執行!");

            }

        }

    }

執行結果:

Thread-1  初始化開始。。。
Thread-0  初始化開始。。。
Thread-2  初始化開始。。。
Thread-1  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-0  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-2  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-0  執行barrier自帶的任務! 
Thread-0 其他task初始化結束,開始執行!
Thread-1 其他task初始化結束,開始執行!
Thread-2 其他task初始化結束,開始執行!

當代碼換成barrier.await(2,TimeUnit.SECONDS);雖然此時CyclicBarrier物件中的Runnable的執行時間為20s,但是不會拋任何異常。這說明barrier.await(time,TimeUnit)方法等待的其它執行緒到達barrier的時間,而不包括執行CyclicBarrier物件中的Runnable的時間,在原始碼分析中,會介紹到。

例子6:CyclicBarrier指定的任務被執行時發生了異常

可能有人會考慮這樣一個問題。

當我們利用CyclicBarrier(int parties, Runnable barrierAction) 建構函式指定barrierAction構造CyclicBarrier物件時,如果當最後一個到達barrier的執行緒執行barrierAction發生異常了會出現怎麼樣的結果呢??

看下面這個例子你就明白了

    public class CyclicBarrierDemo6 {
        private final static int NUM = 3;
        public static void main(String[] args) {
            final CyclicBarrier barrier = new CyclicBarrier(NUM,new Runnable(){

                @Override
                public void run() {

                    System.out.println(Thread.currentThread().getName() + "  執行barrier自帶的任務!此任務會拋異常");
                    int a = 1/0;//故意拋異常
                }

            });

            for(int i=0;i<NUM;i++){
                new Task(barrier).start();
            }

        }

        static class Task extends Thread{

            private CyclicBarrier barrier;
            public Task(CyclicBarrier barrier){
                this.barrier = barrier;
            }

            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + "  初始化開始。。。");
                try {
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + "  初始化結束,等待其他task初始化結束,然後再繼續執行!");
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " 其他task初始化結束,開始執行!");

            }

        }

    }

執行結果:

Thread-1  初始化開始。。。
Thread-0  初始化開始。。。
Thread-2  初始化開始。。。
Thread-1  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-0  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-2  初始化結束,等待其他task初始化結束,然後再繼續執行!
Thread-0  執行barrier自帶的任務!此任務會拋異常
Thread-2 其他task初始化結束,開始執行!
Thread-1 其他task初始化結束,開始執行!
Exception in thread "Thread-0" java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at com.wrh.readwritelock.CyclicBarrierDemo6$Task.run(CyclicBarrierDemo6.java:42)
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at com.wrh.readwritelock.CyclicBarrierDemo6$Task.run(CyclicBarrierDemo6.java:42)
java.lang.ArithmeticException: / by zero
    at com.wrh.readwritelock.CyclicBarrierDemo6$1.run(CyclicBarrierDemo6.java:16)
    at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:220)
    at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
    at com.wrh.readwritelock.CyclicBarrierDemo6$Task.run(CyclicBarrierDemo6.java:42)

結論:當所有執行緒都到達barrier之後,然後呼叫CyclicBarrier自帶的Runnable裡面的run方法。
最後才是所有執行緒等待結束開始繼續執行。當barrier 的任務拋異常,則只會在當前執行緒中傳播,其它執行緒沒有影響繼續正常執行。

小結

需要我們瞭解的知識點有以下幾點:

1、CyclicBarrier的用途是讓一組執行緒互相等待,直到到達某個公共屏障點才開始繼續工作。

2、CyclicBarrier是可以重複利用的,CountDownLatch不可以重複利用

3、在等待的只要有一個執行緒發生中斷,則其它執行緒就會被喚醒繼續正常執行。

4、CyclicBarrier指定的任務是進行barrier處最後一個執行緒來呼叫的,如果在執行這個任務發生異常時,則會傳播到此執行緒,其它執行緒不受影響繼續正常執行。