# java多執行緒重要知識點整理一
目錄
java多執行緒重要知識點整理一
本系列文是對自己學習多執行緒和平時使用過程中的知識梳理,不適合基礎比較差的閱讀,適合看過java程式設計實戰做整體回顧的,想到了會不斷補充。
1. 執行緒池的使用
執行緒池其實在實際工作中有用到的話理解其實是非常簡單的,合理的利用執行緒池能極大的提高效率。主要說明下程池的使用和引數的意義(暫時不考慮定時執行緒池):
1. corePoolSize 執行緒池的最小大小 2. maximumPoolSize 執行緒池的最大大小 3. keepAliveTime 大於執行緒池最小大小的空餘執行緒的包活時間 4. workQueue 工作佇列用於存放任務 5. threadFactory 建立執行緒的執行緒工廠 6. handler 用於處理任務被拒絕的情況
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
執行緒池的任務投遞過程:
- 向執行緒池投遞一個任務時,首先看工作者執行緒有沒有小於corePoolSize,如果是,利用threadFactory建立一個執行緒將任務投遞
- 第一步如果大於corePoolSize,則將任務投遞到workQueue,這裡需要考慮workQueue是無界還是有界的情況,如果是無界肯定投遞成功返回。如果是有界,投遞成功則返回,否則看執行緒數有沒有小於maximumPoolSize,如果是則再開一個執行緒返回,不是的話則呼叫handler的拒絕邏輯。
需要注意的點:
- 如果放的workQueue是無界佇列,那麼maximumPoolSize這個引數其實就無效了,永遠不會建立超過corePoolSize的執行緒數量,所以任務永遠不會因為容量問題被拒絕,如果生產者速度一直大於消費者,很可能造成記憶體溢位
- 第1條說workQueue是無界佇列那麼任務永遠不會因為容量問題被拒絕,但是handler還是有用的,當你關閉了執行緒池池,繼續提交任務會用到這個來拒絕
- 使用者程式碼在提交任務時底層使用的阻塞佇列的offer方法,所有一般是不會阻塞的,要麼成功,要麼被阻絕。
關於執行緒池引數的設定:
corePoolSize,maximumPoolSize,workQueue核心引數: 根據你的業務場景,如果是cpu密集型,可以設定執行緒池固定為ncpu+1,佇列採用一個有界的,防止記憶體溢位。如果還是出現任務被拒絕是不是應該考慮加機器了。 如果是io密集型,需要根據io和cpu的比例來做相應的估算,這種也不是十分精確的,畢竟網路情況也會發生變化。這裡推薦書中的公式: ncpuucpu(1+w/c) ncpu:cpu個數 ucpu:每個cpu利用率 w/c: io/cpu 另外不同型別的任務最好採用不同的執行緒池有利於執行緒池的設定,混雜的任務很難設定。
threadFactory:我主要用於設定執行緒的名字,這樣在日誌區分度更高
handler:拒絕執行器,這個得根據業務場景類
關於Executors的工具方法:
alibaba編碼規約中建議是手動建立執行緒池,這樣根據自己的業務做更好的定製,而且自己也能更加的清理裡面的邏輯。如果一時圖快,可能在系統的負載不斷升高時出現問題,反而更加不好排查。
關於執行緒池的優雅停機:
在提高效能的同時不要忘記資料的安全性,因為執行緒池的特點,任務會被快取在佇列中,如果是不重要的應用可以直接將執行緒設定成守護執行緒,這樣在應用停機的時候直接停機。 對於重要的應用,如果應用重啟這些資料都是要考慮到的。這裡就需要十分清楚中斷機制,因為這裡涉及任務取消的邏輯,這些邏輯是要對自己的任務程式碼自己進行相應的處理。執行緒池shutdown方法執行之後後續的任務都會被拒絕,已經有的任務會繼續執行完,這個比較好理解。shutdownNow方法返回佇列中的所有任務,然後發中斷給正在執行的任務,這裡返回的任務你可以進行持久化,主要就是正在執行的任務的處理,對於短任務你可以不響應中斷,耗時任務必須得考慮程序退出時間過長被強殺。
實際應用場景舉例:
- 應用日誌的記錄:我們對於一些業務日誌可能寫到mysql中,如果每個操作插入一條日誌必然會很耗時,這是我們可以單獨開一個日誌執行緒,將日誌投遞到日誌執行緒物件的queue中,然後他定時掃,批量如庫,既提高吞吐量,有降低延遲
- 第三方介面對接:在對接第三方時候,很多時候是http,這種操作相當耗時,可以利用執行緒池來進行非同步化,如果需要得到返回介面可以利用Feature和CompletableFuture(後續講解)
- 耗時的執行緒不安全操作:這種場景比較少,但公司確實遇到了,具體就是後臺提交一個任務,這種任務可能會執行幾十分鐘,任務不能同時執行。這裡思路就是採用單執行緒池,然後利用提交時返回的Future來實現任務的取消功能。
關於異常:
只有execute提交的任務才會將異常交給未捕獲異常處理器,預設的未捕獲異常處理器會列印異常。但是如果是submit會將異常封裝到飯返回的Future中在get的時候才會丟擲,可以通過改寫執行緒池的afterExecute方法。 原始碼中的例子:
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
System.out.println(t);
}
2. java中斷機制
中斷機制對於javq多執行緒程式設計是一個十分基礎的東西,很多時候都是在使用類庫所以沒有注意到,對於自己更好的使用類庫和自己封裝類庫,中斷是十分重要的。對於中斷處理的好壞直接影響了編寫的api合理性和使用類庫時正確性。
在多執行緒中,很多時候會遇到需要停止一個執行緒中的任務這樣的需求。實現這樣的需求很容易想到在物件中放置一個標誌位,然後執行緒在執行的過程中去不停的檢測這個標誌位,如果標誌位被設定成false就退出。另外標誌位需要採用volatile來修飾,可以保證記憶體的可見性。例子如下:
public class Main {
public static void main(String[] args) throws InterruptedException {
//建立一個任務
Task task = new Task();
new Thread(task).start();
Thread.sleep(3_000);
task.stop = true;
}
}
@Slf4j
class Task implements Runnable {
/**
* 是否停止的標誌位
*/
public volatile boolean stop = false;
/**
* 執行次數計數器
*/
AtomicInteger adder = new AtomicInteger();
@Override
public void run() {
while (!stop) {
log.info("執行次數:{}", adder.incrementAndGet());
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.warn("退出執行!");
}
}
上面的程式碼是很多人一開始就能想到的方案,但是,仔細想想就會發現問題,上面的程式碼中的sleep函式是一個類庫封裝好的,所以如果設定了停止標誌位,那麼每次檢測執行都得等到while迴圈才行。這就是引入中斷的意義,jdk中很多函式都能響應中斷的操作。
先說一下java中斷的含義:java中斷是一種執行緒間協作機制,用來告訴一個執行緒可以停止了,但是具體那個執行緒是否響應以及採取什麼樣的動作,java語言沒有做任何強制規定。這裡就需要和作業系統的中的中斷明確區別,這兩種雖然中文名一樣,但是實際的意義卻差以千里。ava的中斷是協作的,相當於只是告訴一個執行緒,但是那個執行緒可以選擇不響應或者需要中斷。
第二個版本的程式碼如下:
public class MainInterrupt {
public static void main(String[] args) throws InterruptedException {
//建立一個任務
Thread thread = new Thread(new TaskInterrupt());
thread.start();
Thread.sleep(3_000);
thread.interrupt();
}
}
@Slf4j
class TaskInterrupt implements Runnable {
/**
* 執行次數計數器
*/
AtomicInteger adder = new AtomicInteger();
@Override
public void run() {
while (!Thread.interrupted()) {
log.info("執行次數:{}", adder.incrementAndGet());
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
log.warn("隨眠過程打斷退出!");
break;
}
}
log.warn("退出執行!");
}
}
中斷的api:
- public void interrupt() 給一個執行緒發起中斷
- public static boolean interrupted() 檢測執行緒是否處於中斷位,並清除標誌位,這也是唯一清除標誌位的 方法!
- public boolean isInterrupted() 檢測執行緒是否中斷
中斷的處理
封裝的自己類庫程式碼的時候一定要考慮清楚對於中斷的處理,例如BlockingQueue的E poll(long timeout, TimeUnit unit)throws InterruptedException
這個api,他的實現都是選擇了將異常丟擲,底層實現一般是Lock的lockInterruptibly()
方法阻塞丟擲。你站在使用者程式碼的角度去想,如果你實現了這個方法是阻塞的,然後又把異常吃了,怎麼去實現被取消後的邏輯,使用者程式碼又怎麼去針對取消做相應的動作。所以封裝類庫的時候最好還是重新丟擲。
另外還有一種就是重置中斷位,有些操作不能直接丟擲,像Runnable介面,還有丟擲異常前執行一些動作的情況。在處理完之後重置下中斷位,這樣就能讓使用者程式碼感知並做相應的處理。
3. 執行緒間通訊機制總結
- synchronize關鍵字
- 物件的wait,notify,notifyall
- Thread的join
- Semaphore,ReentrantLock,CyclicBarrier,CountDownLatch,(這些都是基於AQS的工具類)BlockingQueue
- FutureTask相關的