1. 程式人生 > >Java ExecutorService 多執行緒實踐(一)

Java ExecutorService 多執行緒實踐(一)

需要實現一個多執行緒併發的業務場景,啟動若干子執行緒,最後要所有子執行緒執行結束才結束。(類似 .NET 裡的 Task WaitAll )

Java 中的 ExecutorService 多執行緒程式設計模型提供這樣一個機制,通過程式碼來介紹一下。

方法一:ExecutorService#awaitTermination

/**
     * Blocks until all tasks have completed execution after a shutdown
     * request, or the timeout occurs, or the current thread is
     * interrupted, whichever happens first.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return <tt>true</tt> if this executor terminated and
     *         <tt>false</tt> if the timeout elapsed before termination
     * @throws InterruptedException if interrupted while waiting
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

通過 ExecutorService#submit(或者execute) 加入子執行緒並啟動,awaitTermination 阻塞等待所有子執行緒執行完畢。

sample:

ExecutorService es = Executors.newCachedThreadPool();
		List<Future<String>> futureList = Lists.newArrayList();

		futureList.add(es.submit(new Callable<String>() {
			@Override
			public String call() throws Exception {
				System.out.println("Threading1 is running");
				Thread.sleep(1000);
				return "1";
			}
		}));

		futureList.add(es.submit(new Callable<String>() {
			@Override
			public String call() throws Exception {
				System.out.println("Threading2 is running");
				Thread.sleep(3000);
				return "2";
			}
		}));

		futureList.add(es.submit(new Callable<String>() {
			@Override
			public String call() throws Exception {
				System.out.println("Threading3 is running");
				Thread.sleep(2000);
				return "3";
			}
		}));

		es.shutdown();

		try {
			boolean completed = es.awaitTermination(2000, TimeUnit.MILLISECONDS);
			System.out.println("all tasks completed: " + completed);

			//es.wait(3000);

			for(Future future : futureList) {
				try {
					if (future.isDone()) {
						System.out.println("result: " + future.get());
					}
				} catch (CancellationException ex) {
					ex.printStackTrace();
				} finally {
					future.cancel(true);
				}
			}


		} catch (Exception e) {
			e.printStackTrace();
		}

輸出結果:

Threading 1 is running
Threading 2 is running
Threading 3 is running
all tasks completed: false
result: 1
result: 3


只有1和3輸出了結果,因為 awaitTermination 超時設定了 2000 ms,Threading2 模擬了 3000 ms 因此被超時取消了。通過 Future#isDone() 可以判斷對應執行緒是否處理完畢,
這個場景裡如果 isDone() == false 那麼就可以認為是被超時幹掉了。

需要注意的是,在新增完所有的子執行緒並啟動後呼叫了 ExecutorService#shutdown 方法:一方面是不再接受新的子執行緒"提交",另一方面ExecutorService 其實自己也是一個work執行緒,如果不shutdown 其實當前執行緒並不會結束。呼叫shutdown 和不呼叫shutdown 從main執行後控制檯狀態就可以看出差異。

最後在 finally 裡呼叫了 Future#cancel() 主要是當await之後,被超時處理的執行緒可能還在執行,直接取消掉。

方法二:ExecutorService#invokeAll

invokeAll 很像上述實現的整體包裝,但細節略有不同。首先將 Callable 統一建立好放在List裡交給 invokeAll 方法執行並設定超時時間。

ExecutorService es = Executors.newCachedThreadPool();
		List<Callable<String>> tasks = Lists.newArrayList();

		tasks.add(new Callable<String>() {
			@Override
			public String call() throws Exception {
				System.out.println("Threading 1 is running");
				Thread.sleep(1000);
				return "1";
			}
		});

		tasks.add(new Callable<String>() {
			@Override
			public String call() throws Exception {
				System.out.println("Threading 2 is running");
				Thread.sleep(3000);
				return "2";
			}
		});

		tasks.add(new Callable<String>() {
			@Override
			public String call() throws Exception {
				System.out.println("Threading 3 is running");
				Thread.sleep(2000);
				return "3";
			}
		});


		try {
			List<Future<String>> futureList = es.invokeAll(tasks, 2000, TimeUnit.MILLISECONDS);

			es.shutdown();

			for(Future future : futureList) {
				try {
					if (future.isDone()) {
						System.out.println("result: " + future.get());
					}
				} catch (CancellationException ex) {
					ex.printStackTrace();
				}
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		}
輸出結果:
Threading 1 is running
Threading 2 is running
Threading 3 is running
result: 1
result: 3
java.util.concurrent.CancellationException

ExecutorService#invokeAll() 執行後所有 future.isDone() 都是 true,在 future.get() 拿結果的時候,被超時的Future會丟擲 CancellationException 。因為在 invokeAll 內部呼叫了Future#cancel() 方法。

原始碼如下:

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null || unit == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            long lastTime = System.nanoTime();

            // Interleave time checks and calls to execute in case
            // executor doesn't have any/much parallelism.
            Iterator<Future<T>> it = futures.iterator();
            while (it.hasNext()) {
                execute((Runnable)(it.next()));
                long now = System.nanoTime();
                nanos -= now - lastTime;
                lastTime = now;
                if (nanos <= 0)
                    return futures;
            }

            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    if (nanos <= 0)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

當然在 ExecutorService 程式設計模型外,自己定義Threading,通過CountDownLatch 控制也是可以實現的。