Java ExecutorService 多執行緒實踐(一)
需要實現一個多執行緒併發的業務場景,啟動若干子執行緒,最後要所有子執行緒執行結束才結束。(類似 .NET 裡的 Task WaitAll )
Java 中的 ExecutorService 多執行緒程式設計模型提供這樣一個機制,通過程式碼來介紹一下。
方法一:ExecutorService#awaitTermination
/** * Blocks until all tasks have completed execution after a shutdown * request, or the timeout occurs, or the current thread is * interrupted, whichever happens first. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return <tt>true</tt> if this executor terminated and * <tt>false</tt> if the timeout elapsed before termination * @throws InterruptedException if interrupted while waiting */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
通過 ExecutorService#submit(或者execute) 加入子執行緒並啟動,awaitTermination 阻塞等待所有子執行緒執行完畢。
sample:
ExecutorService es = Executors.newCachedThreadPool(); List<Future<String>> futureList = Lists.newArrayList(); futureList.add(es.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("Threading1 is running"); Thread.sleep(1000); return "1"; } })); futureList.add(es.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("Threading2 is running"); Thread.sleep(3000); return "2"; } })); futureList.add(es.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("Threading3 is running"); Thread.sleep(2000); return "3"; } })); es.shutdown(); try { boolean completed = es.awaitTermination(2000, TimeUnit.MILLISECONDS); System.out.println("all tasks completed: " + completed); //es.wait(3000); for(Future future : futureList) { try { if (future.isDone()) { System.out.println("result: " + future.get()); } } catch (CancellationException ex) { ex.printStackTrace(); } finally { future.cancel(true); } } } catch (Exception e) { e.printStackTrace(); }
輸出結果:
Threading 1 is running
Threading 2 is running
Threading 3 is running
all tasks completed: false
result: 1
result: 3
只有1和3輸出了結果,因為 awaitTermination 超時設定了 2000 ms,Threading2 模擬了 3000 ms 因此被超時取消了。通過 Future#isDone() 可以判斷對應執行緒是否處理完畢,
這個場景裡如果 isDone() == false 那麼就可以認為是被超時幹掉了。
需要注意的是,在新增完所有的子執行緒並啟動後呼叫了 ExecutorService#shutdown 方法:一方面是不再接受新的子執行緒"提交",另一方面ExecutorService 其實自己也是一個work執行緒,如果不shutdown 其實當前執行緒並不會結束。呼叫shutdown 和不呼叫shutdown 從main執行後控制檯狀態就可以看出差異。
最後在 finally 裡呼叫了 Future#cancel() 主要是當await之後,被超時處理的執行緒可能還在執行,直接取消掉。
方法二:ExecutorService#invokeAll
invokeAll 很像上述實現的整體包裝,但細節略有不同。首先將 Callable 統一建立好放在List裡交給 invokeAll 方法執行並設定超時時間。
ExecutorService es = Executors.newCachedThreadPool();
List<Callable<String>> tasks = Lists.newArrayList();
tasks.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("Threading 1 is running");
Thread.sleep(1000);
return "1";
}
});
tasks.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("Threading 2 is running");
Thread.sleep(3000);
return "2";
}
});
tasks.add(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("Threading 3 is running");
Thread.sleep(2000);
return "3";
}
});
try {
List<Future<String>> futureList = es.invokeAll(tasks, 2000, TimeUnit.MILLISECONDS);
es.shutdown();
for(Future future : futureList) {
try {
if (future.isDone()) {
System.out.println("result: " + future.get());
}
} catch (CancellationException ex) {
ex.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
輸出結果:Threading 1 is running
Threading 2 is running
Threading 3 is running
result: 1
result: 3
java.util.concurrent.CancellationException
ExecutorService#invokeAll() 執行後所有 future.isDone() 都是 true,在 future.get() 拿結果的時候,被超時的Future會丟擲 CancellationException 。因為在 invokeAll 內部呼叫了Future#cancel() 方法。
原始碼如下:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null || unit == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
long lastTime = System.nanoTime();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
Iterator<Future<T>> it = futures.iterator();
while (it.hasNext()) {
execute((Runnable)(it.next()));
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0)
return futures;
}
for (Future<T> f : futures) {
if (!f.isDone()) {
if (nanos <= 0)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
當然在 ExecutorService 程式設計模型外,自己定義Threading,通過CountDownLatch 控制也是可以實現的。