1. 程式人生 > 實用技巧 >SpringBoot 非同步與多執行緒

SpringBoot 非同步與多執行緒

1. @Async可以開啟非同步,但是要在 main 中EnableAsync

2.@Async既可以註解在方法上,也可以註解到類上

3.使用@Async時,請注意一定要對應bean name,否則或呼叫系統預設的SampleTaskExecutor,容易造成OOM

4.本人使用的SpringBoot 2.3.4 ,預設值maxPoolSize = 2147483647,queueCapacity = 2147483647, 建議在初始化時設定corePoolSize即可(百度到的例子中,大多數沒有講這一塊)

5.執行緒池對拒絕任務的處理策略處理,預設為new ThreadPoolExecutor.CallerRunsPolicy(),建議使用executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

6.如果Executor後臺執行緒池還沒有完成Callable的計算,這時呼叫返回Future物件的get()方法,會阻塞直到計算完成。

我為什麼要在這裡重點提第四點和第五點,目前百度到的大多文章都是相互抄的,在定義executor主動定義了queueCapacity ,maxPoolSize並沒有去看原始碼中對於queueCapacity ,maxPoolSize 的處理。

我的建議是,這倆值無需自定義,為了提高多執行緒的併發效率,可以考慮直接放大corePoolSize。

關於executort的使用程式碼我就不在此處多講,各位可以用此程式碼,測試系統中指定bean的taskExecutor中到底有多少任務在執行。

getBean見https://www.jianshu.com/p/3cd2d4e73eb7

使用方式如下

@Component
@Slf4j
public class TaskSchedule {

    @Autowired
    ApplicationContextProvider applicationContextProvider;

//    @Scheduled(fixedRate = 2000L, initialDelay = 5)
    public void getTaskExecutorState(){
        Class<ThreadPoolTaskExecutor> clas = ThreadPoolTaskExecutor.class
; ThreadPoolTaskExecutor threadPoolTaskExecutor = applicationContextProvider.getBean("taskExecutor", clas); ThreadPoolExecutor threadPoolExecutor = threadPoolTaskExecutor.getThreadPoolExecutor(); log.info("{}, taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}], MaximumPoolSize[{}], largestPoolSize[{}]", threadPoolTaskExecutor.getThreadNamePrefix(), threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size(), threadPoolExecutor.getMaximumPoolSize(), threadPoolExecutor.getLargestPoolSize()); } }

controller

@Autowired
private AsyncTask task;

@Autowired
private TaskSchedule taskSchedule;

@PostMapping("/consume") @ResponseBody public JSONObject consume(@RequestBody JSONObject params) throws InterruptedException, ExecutionException { count ++; JSONObject jsonObject = new JSONObject(); log.info("params flag {} ",params.getString("flag")); log.info("名稱 {}", params.getString("loginid")); jsonObject.put("loginidis",params.getString("loginid")); jsonObject.put("count", count); Future<String> task4 = task.task4(count); taskSchedule.getTaskExecutorState(); // task.task4(); // log.info("Future<String> {}", task4.get()); //呼叫返回Future物件的get()方法,會阻塞直到計算完成 // task.getTest1(); return jsonObject; }

import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * 功能描述:非同步任務業務類(@Async也可新增在方法上)
 */
@Component
@Async("taskExecutor")
@Slf4j
public class AsyncTask {

    //獲取非同步結果
    public Future<String> task4(int index) throws InterruptedException {
        log.info("開始執行任務 task4 index:{}",index);
        long begin = System.currentTimeMillis();
//        Thread.sleep(1000L*60*2);
//        int sleepTime = RandomUtil.randomInt(1000*60*3, 1000*60*5);
        int sleepTime = RandomUtil.randomInt(1000*30, 1000*60);
        log.info(" sleepTime is {}",sleepTime);
        Thread.sleep(sleepTime);
        long end = System.currentTimeMillis();
        log.info("任務4執行完畢 index:"+index+" 耗時=" + (end - begin));
        return new AsyncResult<String>("任務4");
    }

}

各位可以在程式碼中註釋掉

        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
或者使用不同的拒絕策略測試效果。
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@Slf4j
public class ThreadPoolTaskConfig {

    private static final int corePoolSize = 2;               // 核心執行緒數(預設執行緒數)執行緒池建立時候初始化的執行緒數
    private static final int maxPoolSize = 5;                // 最大執行緒數 執行緒池最大的執行緒數,只有在緩衝佇列滿了之後才會申請超過核心執行緒數的執行緒
    private static final int keepAliveTime = 10;            // 允許執行緒空閒時間(單位:預設為秒)當超過了核心執行緒之外的執行緒在空閒時間到達之後會被銷燬
    private static final int queueCapacity = 10;            // 緩衝佇列數 用來緩衝執行任務的佇列
    private static final String threadNamePrefix = "Async-Service-"; // 執行緒池名字首 方便我們定位處理任務所在的執行緒池

    @Bean("taskExecutor") // bean的名稱,預設為首字母小寫的方法名
//    public ThreadPoolTaskExecutor taskExecutor(){
    public ThreadPoolTaskExecutor taskExecutor(){
//    public AsyncTaskExecutor taskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
//        executor.setKeepAliveSeconds(keepAliveTime);
        executor.setThreadNamePrefix(threadNamePrefix);

        // 執行緒池對拒絕任務的處理策略 採用了CallerRunsPolicy策略,當執行緒池沒有處理能力的時候,該策略會直接在 execute 方法的呼叫執行緒中執行被拒絕的任務;如果執行程式已關閉,則會丟棄該任務
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

//        executor.setRejectedExecutionHandler(
//                new RejectedExecutionHandler(){
//                    @Override
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        try {
//                            //繼續加入阻塞佇列執行,可自定義
//                            log.info("繼續加入阻塞佇列執行,可自定義");
//                            executor.getQueue().put(r);
//                        } catch (InterruptedException e) {
//                            e.printStackTrace();
//                        }
//                    }
//                }
//
//        );
        // 初始化
        executor.initialize();
        return executor;
    }

}