SpringBoot 非同步與多執行緒
1. @Async可以開啟非同步,但是要在 main 中EnableAsync
2.@Async既可以註解在方法上,也可以註解到類上
3.使用@Async時,請注意一定要對應bean name,否則或呼叫系統預設的SampleTaskExecutor,容易造成OOM
4.本人使用的SpringBoot 2.3.4 ,預設值maxPoolSize = 2147483647,queueCapacity = 2147483647, 建議在初始化時設定corePoolSize即可(百度到的例子中,大多數沒有講這一塊)
5.執行緒池對拒絕任務的處理策略處理,預設為new ThreadPoolExecutor.CallerRunsPolicy(),建議使用executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
6.如果Executor後臺執行緒池還沒有完成Callable的計算,這時呼叫返回Future物件的get()方法,會阻塞直到計算完成。
我為什麼要在這裡重點提第四點和第五點,目前百度到的大多文章都是相互抄的,在定義executor主動定義了queueCapacity ,maxPoolSize並沒有去看原始碼中對於queueCapacity ,maxPoolSize 的處理。
我的建議是,這倆值無需自定義,為了提高多執行緒的併發效率,可以考慮直接放大corePoolSize。
關於executort的使用程式碼我就不在此處多講,各位可以用此程式碼,測試系統中指定bean的taskExecutor中到底有多少任務在執行。
getBean見https://www.jianshu.com/p/3cd2d4e73eb7
使用方式如下
@Component @Slf4j public class TaskSchedule { @Autowired ApplicationContextProvider applicationContextProvider; // @Scheduled(fixedRate = 2000L, initialDelay = 5) public void getTaskExecutorState(){ Class<ThreadPoolTaskExecutor> clas = ThreadPoolTaskExecutor.class; ThreadPoolTaskExecutor threadPoolTaskExecutor = applicationContextProvider.getBean("taskExecutor", clas); ThreadPoolExecutor threadPoolExecutor = threadPoolTaskExecutor.getThreadPoolExecutor(); log.info("{}, taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}], MaximumPoolSize[{}], largestPoolSize[{}]", threadPoolTaskExecutor.getThreadNamePrefix(), threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size(), threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getLargestPoolSize()); } }
controller
@Autowired
private AsyncTask task;
@Autowired
private TaskSchedule taskSchedule;
@PostMapping("/consume") @ResponseBody public JSONObject consume(@RequestBody JSONObject params) throws InterruptedException, ExecutionException { count ++; JSONObject jsonObject = new JSONObject(); log.info("params flag {} ",params.getString("flag")); log.info("名稱 {}", params.getString("loginid")); jsonObject.put("loginidis",params.getString("loginid")); jsonObject.put("count", count); Future<String> task4 = task.task4(count); taskSchedule.getTaskExecutorState(); // task.task4(); // log.info("Future<String> {}", task4.get()); //呼叫返回Future物件的get()方法,會阻塞直到計算完成 // task.getTest1(); return jsonObject; }
import cn.hutool.core.util.RandomUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Component; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * 功能描述:非同步任務業務類(@Async也可新增在方法上) */ @Component @Async("taskExecutor") @Slf4j public class AsyncTask { //獲取非同步結果 public Future<String> task4(int index) throws InterruptedException { log.info("開始執行任務 task4 index:{}",index); long begin = System.currentTimeMillis(); // Thread.sleep(1000L*60*2); // int sleepTime = RandomUtil.randomInt(1000*60*3, 1000*60*5); int sleepTime = RandomUtil.randomInt(1000*30, 1000*60); log.info(" sleepTime is {}",sleepTime); Thread.sleep(sleepTime); long end = System.currentTimeMillis(); log.info("任務4執行完畢 index:"+index+" 耗時=" + (end - begin)); return new AsyncResult<String>("任務4"); } }
各位可以在程式碼中註釋掉
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
或者使用不同的拒絕策略測試效果。
import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @Slf4j public class ThreadPoolTaskConfig { private static final int corePoolSize = 2; // 核心執行緒數(預設執行緒數)執行緒池建立時候初始化的執行緒數 private static final int maxPoolSize = 5; // 最大執行緒數 執行緒池最大的執行緒數,只有在緩衝佇列滿了之後才會申請超過核心執行緒數的執行緒 private static final int keepAliveTime = 10; // 允許執行緒空閒時間(單位:預設為秒)當超過了核心執行緒之外的執行緒在空閒時間到達之後會被銷燬 private static final int queueCapacity = 10; // 緩衝佇列數 用來緩衝執行任務的佇列 private static final String threadNamePrefix = "Async-Service-"; // 執行緒池名字首 方便我們定位處理任務所在的執行緒池 @Bean("taskExecutor") // bean的名稱,預設為首字母小寫的方法名 // public ThreadPoolTaskExecutor taskExecutor(){ public ThreadPoolTaskExecutor taskExecutor(){ // public AsyncTaskExecutor taskExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); // executor.setKeepAliveSeconds(keepAliveTime); executor.setThreadNamePrefix(threadNamePrefix); // 執行緒池對拒絕任務的處理策略 採用了CallerRunsPolicy策略,當執行緒池沒有處理能力的時候,該策略會直接在 execute 方法的呼叫執行緒中執行被拒絕的任務;如果執行程式已關閉,則會丟棄該任務 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // executor.setRejectedExecutionHandler( // new RejectedExecutionHandler(){ // @Override // public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // try { // //繼續加入阻塞佇列執行,可自定義 // log.info("繼續加入阻塞佇列執行,可自定義"); // executor.getQueue().put(r); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // } // // ); // 初始化 executor.initialize(); return executor; } }