1. 程式人生 > 程式設計 >java多執行緒之Phaser的使用詳解

java多執行緒之Phaser的使用詳解

前面的文章中我們講到了CyclicBarrier、CountDownLatch的使用,這裡再回顧一下CountDownLatch主要用在一個執行緒等待多個執行緒執行完畢的情況,而CyclicBarrier用在多個執行緒互相等待執行完畢的情況。

Phaser是java 7 引入的新的併發API。他引入了新的Phaser的概念,我們可以將其看成一個一個的階段,每個階段都有需要執行的執行緒任務,任務執行完畢就進入下一個階段。所以Phaser特別適合使用在重複執行或者重用的情況。

基本使用

在CyclicBarrier、CountDownLatch中,我們使用計數器來控制程式的順序執行,同樣的在Phaser中也是通過計數器來控制。在Phaser中計數器叫做parties, 我們可以通過Phaser的建構函式或者register()方法來註冊。

通過呼叫register()方法,我們可以動態的控制phaser的個數。如果我們需要取消註冊,則可以呼叫arriveAndDeregister()方法。

我們看下arrive:

 public int arrive() {
  return doArrive(ONE_ARRIVAL);
 }

Phaser中arrive實際上呼叫了doArrive方法,doArrive接收一個adjust引數,ONE_ARRIVAL表示arrive,ONE_DEREGISTER表示arriveAndDeregister。

Phaser中的arrive()、arriveAndDeregister()方法,這兩個方法不會阻塞,但是會返回相應的phase數字,當此phase中最後一個party也arrive以後,phase數字將會增加,即phase進入下一個週期,同時觸發(onAdvance)那些阻塞在上一phase的執行緒。這一點類似於CyclicBarrier的barrier到達機制;更靈活的是,我們可以通過重寫onAdvance方法來實現更多的觸發行為。

下面看一個基本的使用:

 void runTasks(List<Runnable> tasks) {
  final Phaser phaser = new Phaser(1); // "1" to register self
  // create and start threads
  for (final Runnable task : tasks) {
   phaser.register();
   new Thread() {
    public void run() {
     phaser.arriveAndAwaitAdvance(); // await all creation
     task.run();
    }
   }.start();
  }

  // allow threads to start and deregister self
  phaser.arriveAndDeregister();
 }

上面的例子中,我們在執行每個Runnable之前呼叫register()來註冊, 然後呼叫arriveAndAwaitAdvance()來等待這一個Phaser週期結束。最後我們呼叫 phaser.arriveAndDeregister();來取消註冊主執行緒。

多個Phaser週期

Phaser的值是從0到Integer.MAX_VALUE,每個週期過後該值就會加一,如果到達Integer.MAX_VALUE則會繼續從0開始。

如果我們執行多個Phaser週期,則可以重寫onAdvance方法:

 protected boolean onAdvance(int phase,int registeredParties) {
  return registeredParties == 0;
 }

onAdvance將會在最後一個arrive()呼叫的時候被呼叫,如果這個時候registeredParties為0的話,該Phaser將會呼叫isTerminated方法結束該Phaser。

如果要實現多週期的情況,我們可以重寫這個方法:

protected boolean onAdvance(int phase,int registeredParties) {
    return phase >= iterations || registeredParties == 0;
   }

上面的例子中,如果phase次數超過了指定的iterations次數則就會自動終止。

我們看下實際的例子:

 void startTasks(List<Runnable> tasks,final int iterations) {
  final Phaser phaser = new Phaser() {
   protected boolean onAdvance(int phase,int registeredParties) {
    return phase >= iterations || registeredParties == 0;
   }
  };
  phaser.register();
  for (final Runnable task : tasks) {
   phaser.register();
   new Thread() {
    public void run() {
     do {
      task.run();
      phaser.arriveAndAwaitAdvance();
     } while (!phaser.isTerminated());
    }
   }.start();
  }
  phaser.arriveAndDeregister(); // deregister self,don't wait
 }

上面的例子將會執行iterations次。

本文的例子請參考https://github.com/ddean2009/learn-java-concurrency/tree/master/Phaser

到此這篇關於java多執行緒之Phaser的使用的文章就介紹到這了,更多相關java多執行緒Phaser內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!