java並發基礎(三)
第6章開始是第二部分,講解結構化並發應用程序,大多數並發應用程序都是圍繞“任務執行”構造的,任務通常是一些抽象的且離散的工作單元。
一、線程池
大多數服務器應用程序都提供了一種自然的任務邊界:以獨立的客戶請求為邊界。現在我們要實現自己的web服務器,你一定見過這樣的代碼:
class SingleThreadWebServer { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { Socket connection = socket.accept(); //處理請求 handleRequest(connection); } } }
這種串行的執行任務的方法當然不可行,它使程序失去可伸縮性。我們將它改進:
class ThreadPerTaskWebServer{ public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80);while (true) { final Socket connection = socket.accept(); Runnable task = new Runnable() { @Override public void run() { //處理請求 handleRequest(connection); } };new Thread(task).start(); } } }
我們為每個任務分配一個線程,但它仍存在很多問題,如,線程生命周期開銷非常高、消耗過多的資源,尤其是內存、可創建線程的數量上有一個上限,如果超出,可能會拋出OutOfMemoryError異常。我在讀這本書之前,最多也就是理解到這裏。但其實java對任務執行提供了支持,也就是Executor。
public interface Executor{ void execute(Runnable command); }
雖然Executor是一個簡單的接口,但它卻為靈活且強大的異步任務執行框架提供了基礎。現在把上例修改為基於線程池的web服務器:
class TaskExecutionWebServer{ private static final int NTHREADS = 100; private static final Executor exec = Executors.newFixedThreadPool(NTHREADS); public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable task = new Runnable() { @Override public void run() { //處理請求 handleRequest(connection); } }; exec.execute(task); } } }
每當看到下面這種形式的代碼時:
new Thread(task).start();
並且你希望獲得一種更靈活的執行策略時,請考慮使用Executor來代替Thread。
Executor基於生產者-消費者模式。提交任務的線程相當於生產者,執行任務的線程相當於消費者。生產者將任務提交給隊列,消費者從隊列中獲得任務執行。這樣將任務的提交過程與執行過程解耦。
java提供了一個靈活的線程池以及一些有用的默認配置。可以通過調用Executors中的靜態工廠方法來創建:
1、newFixedThreadPool newFixedThreadPool將創建一個固定長度的線程池,每當提交一個任務時就創建一個線程,直到達到線程池的最大數量,這時線程池的規模將不再變化。(如果某個線程由於發生了未預期的Exception而結束,那麽線程池將補充一個新的線程) 。
2、newCachedThreadPool newCachedThreadPool將創建一個可緩存的線程池,如果線程池的當前規模超過了處理需求時,那麽將回收空閑線程,當需求增加時,可以添加新的線程,線程池的規模不存在任何限制。
3、newSingleThreadExecutor newSingleThreadExecutor是一個單線程的Executor,它創建單個工作者線程來執行任務,如果這個線程異常結束,會創建另一個線程來代替。newSingleTheadExecutor能確保依照任務在隊列中的順序來串行執行。(例如:FIFO、LIFO、優先級)
4、newScheduledThreadPool newScheduledThreadPool創建了一個固定長度的線程池,而且以延遲或者定時的方式執行任務。
二、Executor的生命周期
我們已經知道如何創建一個Executor,但如何關閉它呢?Executor的實現通常會創建線程來執行任務。但JVM只有在所有(非守護)線程全部終止後才會退出。因此,如果無法正確關閉Executor,那麽JVM將無法結束。
為了解決執行服務的生命周期問題,Executor擴展了ExecutorService接口,添加了一些用於生命周期管理的方法,如下:
public interface ExecutorServce implements Executor{ void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException; }
ExecutorService的生命周期有3中狀態:運行、關閉、已終止。ExecutorService在初始創建時處於運行狀態。shutdown方法將執行平緩的關閉過程:不再接受新的任務,同時等待已經提交的任務執行完成。shutdownNow方法將執行粗暴的關閉過程:它將嘗試取消所有運行中的任務,並且不再啟動隊列中尚未開始執行的任務。
我們再將web服務器改進成支持關閉的形式:
class LifecycleWebServer{ private final ExecutorService exec = Executors.newCachedThreadPool(); public void start() throws IOException{ ServerSocket socket = new ServerSocket(80); while (!exec.isShutdown()) { try { final Socket conn = socket.accept(); exec.execute(new Runnable() { @Override public void run() { handleRequest(conn); } }); } catch (RejectedExecutionException e) { if (!exec.isShutdown()) { log("task submission rejected",e); } } } } public void stop(){ exec.shutdown(); } void handleRequest(Socket connection){ Request req = readRequest(connection); if (isShutdownRequest(req)) { stop(); }else { dispatchRequest(req); } } }
三、攜帶結果的任務Callable與Future
Executor框架使用Runnable作為其基本的任務表示形式。Runnable是一種有很大局限的抽象,它不能返回一個值或者拋出一個受檢查的異常。Callable是一種更好的抽象:它認為主入口點將返回一個值,並可能拋出一個異常。Future表示一個任務的生命周期,並提供了相應的方法來判斷是否已經完成或取消,以及獲得任務的結果或者取消任務等。
public interface Callable<V>{ V call() throws Exception; } public interface Future<V>{ boolean cancel(boolean mayInterruptIFRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException,ExecutionException,CancellationException; V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,CancellationException,TimeoutException; }
如果任務已經完成,那麽get會立即返回或者拋出一個Exception,如果任務沒有完成,那麽get將阻塞並直到任務完成。如果任務拋出了異常,那麽get將該異常封裝為ExecutionException並重新拋出。如果任務被取消,那麽get將拋出CannellationException。如果get拋出了ExecutionException,那麽可以通過getCause來獲得被封裝的初始異常。ExecutorService中的所有方法都將返回一個Future。
例子:使用Future實現頁面渲染器
為了使頁面渲染器實現更高的並發性,首先將渲染過程分解為兩個任務,一個是渲染所有的文本,另一個是下載所有的圖像。(因為其中一個任務是CPU密集型,另一個任務是I/O密集型,因此這種方法即使在單CPU系統上也能提升性能。)
偽代碼:
public class FutureRenderer{ private final ExecutorService executor = Executors.newCachedThreadPool(); void renderPage(CharSequence source){ final List<ImageInfo> imageInfos = scanForImageInfo(source); Callable<List<ImageData>> task = new Callable<List<ImageData>>() { @Override public List<ImageData> call() throws Exception { List<ImageData> result = new ArrayList<ImageData>(); for (ImageInfo imageInfo:imageInfos) { result.add(imageInfo.downloadImage()); } return result; } }; Future<List<ImageData>> future = executor.submit(task); renderText(source); try { List<ImageData> imageData = future.get(); for (ImageData data : imageData) { renderImage(data); } } catch (InterruptedException e) { //重新設置線程的中斷狀態 Thread.currentThread().interrupt(); //由於不需要結果,因此取消任務 future.cancel(true); }catch (ExecutionException e) { throw launderThrowable(e.getCause()); } } }
FutureRenderer中創建了一個Callable來下載所有的圖像,並將其提交到一個ExecutorService。這將返回一個描述任務執行情況的Future。當主任務需要圖像時,它會等待Future.get的調用結果。如果幸運的話,當請求開始時所有的圖像都已經下載完成了,即使沒有,至少圖像的下載任務也已經提前開始了。
問題:如果渲染文本的速度遠遠高於下載圖像的速度,那麽程序的最終性能與串行執行時的性能差別不大,而代碼卻變得更復雜了。所以,只有當大量相互獨立且同構的任務可以並發進行處理時,才能體現出將程序的工作負載分配到多個任務中帶來的真正性能提升。
解決:為每一幅圖像的下載都創建一個獨立任務,並在線程池中執行它們。如果向線程池提交一組任務,並希望在計算完成後獲得結果,可以保留與每個計算結果相關聯的Future,然後反復的使用get方法,然後通過輪詢判斷任務是否已經完成,這種方法可行,但很繁瑣。幸運的是,還有一種更好的方法:完成服務(CompletionService)。
CompletionService將Executor和BlockingQueue的功能融合在一起。你可以將Callable任務交給它來執行,然後使用類似於隊列操作的take和poll方法來獲得已經完成的結果。ExecutorCompletionService實現了CompletionService。
public class Render { private final ExecutorService executor; public Render(ExecutorService executor) { super(); this.executor = executor; } void renderPage(CharSequence source){ List<ImageInfo> info = scanForImageInfo(source); CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor); //提交每張圖片的下載任務 for (final ImageInfo imageInfo: info) { completionService.submit(new Callable<ImageData>(){ public ImageData call(){ return imageInfo.downloadImage(); } }); } //加載文本 renderText(source); try { for (int t = 0 t = info.size(); t < n ;t++) { //獲得已經完成的任務,每次獲得一個 Future<ImageData> f = completionService.take(); ImageData imageData = f.get(); renderImage(imageData); } } catch (InterruptedException e) { //線程中斷會執行兩個操作:1.清除線程的中斷狀態 2.拋出InterruptedException //所以捕獲異常後使該線程仍處於中斷狀態 Thread.currentThread().interrupt(); }catch (ExecutionException e) { throw launderThrowable(e.getCause()); } } }
java並發基礎(三)