1. 程式人生 > >java實現多個執行緒達到一個闕伐值後一起執行

java實現多個執行緒達到一個闕伐值後一起執行

給大家推薦個靠譜的公眾號程式設計師探索之路,大家一起加油

1. CountDownLatch
1.1 簡介
CountDownLatch是一個同步輔助類,通過它可以完成類似於阻塞當前執行緒的功能,即:一個執行緒或多個執行緒一直等待,直到其他執行緒執行的操作完成。CountDownLatch用一個給定的計數器來初始化,該計數器的操作是原子操作,即同時只能有一個執行緒去操作該計數器。呼叫該類await方法的執行緒會一直處於阻塞狀態,直到其他執行緒呼叫countDown方法使當前計數器的值變為零,每次呼叫countDown計數器的值減1。當計數器值減至零時,所有因呼叫await()方法而處於等待狀態的執行緒就會繼續往下執行。這種現象只會出現一次,因為計數器不能被重置,如果業務上需要一個可以重置計數次數的版本,可以考慮使用CycliBarrier。
1.2 API
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
用指定的值初始化計數器。

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

呼叫該方法的執行緒進入等待狀態,直到計數器的值減至0或者該執行緒被其他執行緒Interrupted。

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

呼叫該方法的執行緒進入等待狀態,直到計數器的值減至0或者該執行緒被其他執行緒Interrupted或者等待時間超過指定的時間。

    public void countDown() {
        sync.releaseShared(1);
    }

減少計數器當前的值,每次呼叫值減少1。

    public long getCount() {
        return sync.getCount();
    }

獲取計數器當前的值
1.3 使用場景
在某些業務場景中,程式執行需要等待某個條件完成後才能繼續執行後續的操作;典型的應用如平行計算,當某個處理的運算量很大時,可以將該運算任務拆分成多個子任務,等待所有的子任務都完成之後,父任務再拿到所有子任務的運算結果進行彙總。

public static void main(String[] args) throws InterruptedException {
        Runnable taskTemp = new Runnable() {
            private int num = 0;
            @Override
            public void run() {
                for (int i = 0;i < 10;i++){
                    HttpUtil.getURLContent("https://www.baidu.com/");
                        num++;
                        System.out.println(System.nanoTime() + " [" + Thread.currentThread().getName() + "] iCounter = " + num);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
//        Thread t = new Thread(){
//            public void run(){
//                taskTemp.run();
//            }
//        };
//        t.start();
        Test test = new Test();
        test.startTaskAllInOnce(5, taskTemp);
//        test.startNThreadsByBarrier(5, taskTemp);
    }

    public long startTaskAllInOnce(int threadNums, final Runnable task) throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(threadNums);
        for(int i = 0; i < threadNums; i++) {
            Thread t = new Thread() {
                public void run() {
                    try {
                        // 使執行緒在此等待,當開始門開啟時,一起湧入門中
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            // 將結束門減1,減到0時,就可以開啟結束門了
//                            endGate.countDown();
                        }
                    } catch (InterruptedException ie) {
                        ie.printStackTrace();
                    }
                }
            };
            t.start();
        }
        long startTime = System.nanoTime();
        System.out.println(startTime + " [" + Thread.currentThread() + "] All thread is ready, concurrent going...");
        // 因開啟門只需一個開關,所以立馬就開啟開始門
        startGate.countDown();
        // 等等結束門開啟
        endGate.await();
        long endTime = System.nanoTime();
        System.out.println(endTime + " [" + Thread.currentThread() + "] All thread is completed.");
        return endTime - startTime;
    }

2. CyclicBarrier
2.1 簡介
CyclicBarrier也是一個同步輔助類,它允許一組執行緒相互等待,直到到達某個公共屏障點(common barrier point)。通過它可以完成多個執行緒之間相互等待,只有當每個執行緒都準備就緒後,才能各自繼續往下執行後面的操作。類似於CountDownLatch,它也是通過計數器來實現的。當某個執行緒呼叫await方法時,該執行緒進入等待狀態,且計數器加1,當計數器的值達到設定的初始值時,所有因呼叫await進入等待狀態的執行緒被喚醒,繼續執行後續操作。因為CycliBarrier在釋放等待執行緒後可以重用,所以稱為迴圈barrier。CycliBarrier支援一個可選的Runnable,在計數器的值到達設定值後(但在釋放所有執行緒之前),該Runnable執行一次,注,Runnable在每個屏障點只執行一個。
2.2 API
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

用指定值和Runnable初始化CyclicBarrier

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

用指定值初始化CyclicBarrier

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }

呼叫該方法的執行緒進入等待狀態,並且計數器加1,直到呼叫該方法的執行緒數達到設定值後或該執行緒被其他Interrputed

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

呼叫該方法的執行緒進入等待狀態,並且計數器加1,直到呼叫該方法的執行緒數達到設定值後或該執行緒被其他Interrputed或者等待時間超過指定時間。

    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

判斷該Barrier是否處於broker狀態

    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

重置barrier進入初始狀態。
2.3  使用場景
使用場景類似於CountDownLatch
2.4  與CountDownLatch的區別
CountDownLatch主要是實現了1個或N個執行緒需要等待其他執行緒完成某項操作之後才能繼續往下執行操作,描述的是1個執行緒或N個執行緒等待其他執行緒的關係。CyclicBarrier主要是實現了多個執行緒之間相互等待,直到所有的執行緒都滿足了條件之後各自才能繼續執行後續的操作,描述的多個執行緒內部相互等待的關係。
CountDownLatch是一次性的,而CyclicBarrier則可以被重置而重複使用。

public static void main(String[] args) throws InterruptedException {
        Runnable taskTemp = new Runnable() {
            private int num = 0;
            @Override
            public void run() {
                for (int i = 0;i < 10;i++){
                    HttpUtil.getURLContent("https://www.baidu.com/");
                        num++;
                        System.out.println(System.nanoTime() + " [" + Thread.currentThread().getName() + "] iCounter = " + num);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
//        Thread t = new Thread(){
//            public void run(){
//                taskTemp.run();
//            }
//        };
//        t.start();
        Test test = new Test();
//        test.startTaskAllInOnce(5, taskTemp);
        test.startNThreadsByBarrier(5, taskTemp);
    }
public void startNThreadsByBarrier(int threadNums, Runnable finishTask) throws InterruptedException {
        // 設定柵欄解除時的動作,比如初始化某些值  注意 這裡的finishTask執行的 時間是 該CyclicBarrier達到闕伐值後的回撥的一個任務
        CyclicBarrier barrier = new CyclicBarrier(threadNums, finishTask);
        // 啟動 n 個執行緒,與柵欄閥值一致,即當執行緒準備數達到要求時,柵欄剛好開啟,從而達到統一控制效果
        for (int i = 0; i < threadNums; i++) {
//            Thread.sleep(100);
            new Thread(new CounterTask(barrier)).start();
        }
        System.out.println(Thread.currentThread().getName() + " out over...");
    }
    class CounterTask implements Runnable {

        private CyclicBarrier barrier;

        public CounterTask(final CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        public void run() {
            System.out.println(Thread.currentThread().getName() + " - " + System.currentTimeMillis() + " is ready...");
            try {
                // 設定柵欄,使在此等待,到達位置的執行緒達到要求即可開啟大門
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " - " + System.currentTimeMillis() + " started...");
        }
    }