1. 程式人生 > >Java多執行緒程式設計模式實戰指南:Two-phase Termination模式

Java多執行緒程式設計模式實戰指南:Two-phase Termination模式

文章來源:

文章程式碼地址:

停止執行緒是一個目標簡單而實現卻不那麼簡單的任務。首先,Java沒有提供直接的API用於停止執行緒。此外,停止執行緒時還有一些額外的細節需要考慮,如待停止的執行緒處於阻塞(等待鎖)或者等待狀態(等待其它執行緒)、尚有未處理完的任務等。本文介紹的Two-phase Termination模式提供了一種通用的用於優雅地停止執行緒的方法。

一、Two-phase Termination模式簡介

Java並沒有提供直接的API用於停止執行緒。Two-phase Termination模式通過將停止執行緒這個動作分解為準備階段和執行階段這兩個階段,以應對停止執行緒過程中可能存在的問題。

準備階段。該階段主要動作是“通知”目標執行緒(欲停止的執行緒)準備進行停止。這一步會設定一個標誌變數用於指示目標執行緒可以準備停止了。但是,由於目標執行緒可能正處於阻塞狀態(等待鎖的獲得)、等待狀態(如呼叫Object.wait)或者I/O(如InputStream.read)等待等狀態,即便設定了這個標誌,目標執行緒也無法立即“看到”這個標誌而做出相應動作。因此,這一階段還需要通過呼叫目標執行緒的interrupt方法,以期望目標執行緒能夠通過捕獲相關的異常偵測到該方法呼叫,從而中斷其阻塞狀態、等待狀態。對於能夠對interrupt方法呼叫作出響應的方法(參見表1),目標執行緒程式碼可以通過捕獲這些方法丟擲的InterruptedException來偵測執行緒停止訊號。但也有一些方法(如InputStream.read)並不對interrupt呼叫作出響應,此時需要我們手工處理,如同步的Socket I/O操作中通過關閉socket,使處於I/O等待的socket丟擲java.net.SocketException。

這裡寫圖片描述
表 1. 能夠對Thread.interrupt作出響應的一些方法

執行階段。該階段的主要動作是檢查準備階段所設定的執行緒停止標誌和訊號,在此基礎上決定執行緒停止的時機,並進行適當的“清理”操作。清理操作很重要,否則會引起記憶體溢位或者洩露等等難以排查的問題。

二、Two-phase Termination模式的架構

Two-phase Termination模式的主要參與者有以下幾種。其類圖如圖1所示。

這裡寫圖片描述
圖 1. Two-phase Termination模式的類圖
  • ThreadOwner:目標執行緒的擁有者。Java語言中,並沒有執行緒的擁有者的概念,但是執行緒的背後是其要處理的任務或者其所提供的服務,因此我們不能在不清楚某個執行緒具體是做什麼的情況下貿然將其停止。一般地,我們可以將目標執行緒的建立者視為該執行緒的擁有者,並假定其“知道”目標執行緒的工作內容,可以安全地停止目標執行緒。
  • TerminatableThread:可停止的執行緒。其主要方法及職責如下:

    1、terminate:設定執行緒停止標誌,併發送停止“訊號”給目標執行緒。
    2、doTerminate:留給子類實現執行緒停止時所需的一些額外操作,如目標執行緒程式碼中包含Socket I/O,子類可以在該方法中關閉Socket以達到快速停止執行緒,而不會使目標執行緒等待I/O完成才能偵測到執行緒停止標記。
    3、doRun:留給子類實現執行緒的處理邏輯。相當於Thread.run,只不過該方法中無需關心停止執行緒的邏輯,因為這個邏輯已經被封裝在TerminatableThread的run方法中了。
    4、doCleanup:留給子類實現執行緒停止後可能需要的一些清理動作。
    
  • TerminationToken:執行緒停止標誌。toShutdown用於指示目標執行緒可以停止了。reservations可用於反映目標執行緒還有多少數量未完成的任務,以支援等目標執行緒處理完其任務後再行停止。

準備階段的序列圖如圖2所示:

這裡寫圖片描述

1、客戶端程式碼呼叫執行緒擁有者的shutdown方法。

2、shutdown方法呼叫目標執行緒的terminate方法。

3~4、terminate方法將terminationToken的toShutdown標誌設定為true。

5、terminate方法呼叫由TerminatableThread子類實現的doTerminate方法,使得子類可以為停止目標執行緒做一些其它必要的操作。

6、若terminationToken的reservations屬性值為0,則表示目標執行緒沒有未處理完的任務或者ThreadOwner在停止執行緒時不關心其是否有未處理的任務。此時,terminate方法會呼叫目標執行緒的interrupt方法。

7、terminate方法呼叫結束。

8、shutdown呼叫返回,此時目標執行緒可能還仍然在執行。

執行階段由目標執行緒的程式碼去檢查terminationToken的toShutdown屬性、reservations屬性的值,並捕獲由interrupt方法呼叫丟擲的相關異常以決定是否停止執行緒。線上程停止前由TerminatableThread子類實現的doCleanup方法會被呼叫。

三、Two-phase Termination模式的評價與實現考量

Two-phase Termination模式使得我們可以對各種形式的目標執行緒進行優雅的停止。如目標執行緒呼叫了能夠對interrupt方法呼叫作出響應的阻塞方法、目標執行緒呼叫了不能對interrupt方法呼叫作出響應的阻塞方法、目標執行緒作為消費者處理其它執行緒生產的“產品”在其停止前需要處理完現有“產品”等。Two-phase Termination模式實現的執行緒停止可能出現延遲,即客戶端程式碼呼叫完ThreadOwner.shutdown後,該執行緒可能仍在執行。

讀者若要自行實現Two-phase Termination模式,可能需要注意以下幾個問題。

1、執行緒停止標誌

本文案例使用了TerminationToken作為目標執行緒可以準備停止的標誌。從清單3的程式碼我們可以看到,TerminationToken使用了toShutdown這個boolean變數作為主要的停止標誌,而非使用Thread.isInterrupted()。這是因為,呼叫目標執行緒的interrupt方法無法保證目標執行緒的isInterrupted()方法返回值為true:目標執行緒可能呼叫一些能夠捕獲InterruptedException而不保留執行緒中斷狀態的程式碼。另外,toShutdown這個變數為了保證記憶體可見性而又能避免使用顯式鎖的開銷,採用了volatile修飾。這點也很重要,筆者曾經見過一些採用boolean變數作為執行緒停止標誌的程式碼,只是這些變數沒有用volatile修飾,對其訪問也沒有加鎖,這就可能無法停止目標執行緒。

2、生產者——消費者問題中的執行緒停止

在多執行緒程式設計中,許多問題和一些多執行緒程式設計模式都可以看作生產者——消費者問題。停止處於生產者——消費者問題中的執行緒,需要考慮更多的問題:需要注意執行緒的停止順序,如果消費者執行緒比生產者執行緒先停止則會導致生產者生產的新”產品“無法被處理,而如果先停止生產者執行緒又可能使消費者執行緒處於空等待(如生產者消費者採用阻塞佇列中轉”產品“)。並且,停止消費者執行緒前是否考慮要等待其處理完所有待處理的任務或者將這些任務做個備份也是個問題。本文案例部分地展示生產者——消費者問題中執行緒停止的處理,其核心就是通過使用TerminationToken的reservations變數:生產者每”生產“一個產品,Two-phase Termination模式的呼叫方程式碼要使reservations變數值增加1(terminationToken.reservations.incrementAndGet());消費者執行緒每處理一個產品,Two-phase Termination模式的呼叫方程式碼要使reservations變數值減少1(terminationToken.reservations.decrementAndGet())。當然,在停止消費者執行緒時如果我們不關心其待處理的任務,Two-phase Termination模式的呼叫方程式碼可以忽略對reservations變數的操作。清單4展示了一個完整的停止生產者——消費者問題中的執行緒的例子:

清單 4. 停止生產者——消費者問題中的執行緒的例子

public class ProducerConsumerStop {
    class SampleConsumer<P> {
        private final BlockingQueue<P> queue = new LinkedBlockingQueue<P>();

        private AbstractTerminatableThread workThread 
                = new AbstractTerminatableThread() {
            @Override
            protected void doRun() throws Exception {
                terminationToken.reservations.decrementAndGet();
                P product = queue.take();
                // ...
                System.out.println(product);
            }

        };

        public void placeProduct(P product) {
            if (workThread.terminationToken.isToShutdown()) {
                throw new IllegalStateException("Thread shutdown");
            }
            try {
                queue.put(product);
                workThread.terminationToken.reservations.incrementAndGet();
            } catch (InterruptedException e) {

            }
        }

        public void shutdown() {
            workThread.terminate();
        }

        public void start() {
            workThread.start();
        }
    }

    public void test() {
        final SampleConsumer<String> aConsumer = new SampleConsumer<String>();

        AbstractTerminatableThread aProducer = new AbstractTerminatableThread() {
            private int i = 0;

            @Override
            protected void doRun() throws Exception {
                aConsumer.placeProduct(String.valueOf(i));
            }

            @Override
            protected void doCleanup(Exception cause) {
                // 生產者執行緒停止完畢後再請求停止消費者執行緒
                aConsumer.shutdown();
            }

        };

        aProducer.start();
        aConsumer.start();
    }
}

3、隱藏而非暴露可停止的執行緒

為了保證可停止的執行緒不被其它程式碼誤停止,一般我們將可停止執行緒隱藏線上程擁有者背後,而使系統中其它程式碼無法直接訪問該執行緒,正如本案例程式碼(見清單1)所展示:AlarmMgr定義了一個private欄位alarmSendingThread用於引用告警傳送執行緒(可停止的執行緒),系統中的其它程式碼只能通過呼叫AlarmMgr的shutdown方法來請求該執行緒停止,而非通過引用該執行緒物件自身來停止它。