1. 程式人生 > 實用技巧 >執行緒池工具類幾種實現

執行緒池工具類幾種實現

一 執行緒池工具類
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**

  • @Description 執行緒池工具類
    */
    public class ThreadPoolUtil {

    /**

    • 核心執行緒數,會一直存活,即使沒有任務,執行緒池也會維護執行緒的最少數量
      /
      private static final int SIZE_CORE_POOL = 5;
      /
      *
    • 執行緒池維護執行緒的最大數量
      /
      private static final int SIZE_MAX_POOL = 10;
      /
      *
    • 執行緒池維護執行緒所允許的空閒時間
      /
      private static final long ALIVE_TIME = 2000;
      /
      *
    • 執行緒緩衝佇列
      */
      private static BlockingQueue bqueue = new ArrayBlockingQueue(100);
      private static ThreadPoolExecutor pool = new ThreadPoolExecutor(SIZE_CORE_POOL, SIZE_MAX_POOL, ALIVE_TIME, TimeUnit.MILLISECONDS, bqueue, new ThreadPoolExecutor.CallerRunsPolicy());

    static {
    pool.prestartAllCoreThreads();
    }

    public static ThreadPoolExecutor getPool() {
    return pool;
    }
    }

測試類
import com.dashuai.cloud.consulconsumer.util.ThreadPoolUtil;

public class TestUtil {
public static void main(String[] args) {
ThreadPoolUtil.getPool().execute(new Runnable() {
@Override
public void run() {
System.out.println("執行緒池呼叫");
}
});
}
}

二 執行緒池支援多執行緒返回結果
import org.springframework.stereotype.Service;

/**

  • ClassName:CommenThreadPoolUtil
  • Function:執行緒池公共入口處理類.

*/
@Service
public class CommonThreadPoolUtil {

// 核心執行緒數(預設初始化為10)
private int cacheCorePoolSize = 8;

// 核心執行緒控制的最大數目
private int maxCorePoolSize = 160;

// 佇列等待執行緒數閾值
private int blockingQueueWaitSize = 16;

// 核心執行緒數自動調整的增量幅度
private int incrementCorePoolSize = 4;

// 初始化執行緒物件ThreadLocal,重寫initialValue(),保證ThreadLocal首次執行get方法時不會null異常
private ThreadLocal<List<Future<?>>> threadlocal = new ThreadLocal<List<Future<?>>>() {

    protected List<Future<?>> initialValue() {

        return new ArrayList<Future<?>>();
    }
};

// 初始化執行緒池
private MyselfThreadPoolExecutor ThreadPool = new MyselfThreadPoolExecutor(cacheCorePoolSize, cacheCorePoolSize, 0L,
        TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());

/**
 *
 * dealTask:(執行緒池執行操作-包含每個程序返回結果). <br/>
 * 1、運用場景:例如,需要同時校驗很多不同的邏輯,依賴於獲取校驗結果響應給使用者; 2、具體實現java類:implements
 * 的Callable介面,重寫call方法即可,支援返回值
 *
 * @author
 * @param callable
 * @return
 */
public Map<String, Object> dealTask(Callable<?> callable) {

    try {
        // 動態更改核心執行緒數大小
        dynamicTuningPoolSize();
        // 執行執行緒業務邏輯及獲取返回結果
        Future<?> result = ThreadPool.submit(callable);
        // 獲取當前程序的區域性變數
        List<Future<?>> threadLocalResult = threadlocal.get();
        // 疊加主程序對應的多個程序處理結果
        threadLocalResult.add(result);
        // 設定最新的threadLocal變數到當前主程序
        threadlocal.set(threadLocalResult);
    } catch (Exception e) {
        e.printStackTrace();
        return errorResp("執行緒池發生異常-Future", null);
    }
    return successResp(null);
}

/**
 *
 * dealTask:(執行緒池執行操作-不包含每個程序返回結果). <br/>
 * 1、運用場景:例如,不依賴於響應給使用者執行結果的業務邏輯 ; 2、具體實現java類:implements
 * 的Runnable介面,重寫run方法,沒有返回值
 *
 * @author
 * @param runnable
 * @return
 */
public Map<String, Object> dealTask(Runnable runnable) {

    try {
        // 動態更改核心執行緒數大小
        dynamicTuningPoolSize();
        // 執行執行緒業務邏輯
        ThreadPool.execute(runnable);
    } catch (Exception e) {
        e.printStackTrace();
        return errorResp("執行緒池發生異常", null);
    }
    return successResp(null);
}

/**
 * obtainTaskFuture:(獲取執行緒池執行結果:此為阻塞執行緒,即所有執行緒都執行完成才能獲取結果,故應將執行時間稍長的業務邏輯先執行,
 * 減少等待時間). <br/>
 * 此方法只能呼叫一次,即呼叫之後清除ThreadLocal變數,以便於同一程序再次呼叫執行緒池獲取最新的執行結果以及釋放記憶體, 防止記憶體洩露
 *
 * @author
 * @return
 */
public Map<String, Object> obtainTaskFuture() {

    List<Future<?>> threadLocalResult = null;
    try {
        // 獲取當前程序變數
        threadLocalResult = threadlocal.get();
        if (threadLocalResult == null || threadLocalResult.size() == 0) {
            return errorResp("獲取執行緒池執行結果為空", null);
        } else {
            return successResp(threadLocalResult);
        }
    } catch (Exception e) {
        return errorResp("獲取執行緒池執行結果發生異常:" + e.getMessage(), null);
    } finally {
        // 1、釋放記憶體;2、防止主程序再次呼叫執行緒池方法時對結果互有影響。
        threadlocal.remove();
    }

}

/**
 *
 * dynamicTuningPoolSize:(動態改變核心執行緒數). <br/>
 *
 * @author
 * @return
 */
private void dynamicTuningPoolSize() {

    // 佇列等待任務數(此為近似值,故採用>=判斷)
    int queueSize = ThreadPool.getQueueSize();
    // 動態更改核心執行緒數大小
    if (queueSize >= blockingQueueWaitSize) {
        // 核心執行緒數小於設定的最大執行緒數才會自動擴充套件執行緒數
        if (cacheCorePoolSize <= maxCorePoolSize) {
            // 原有核心執行緒數
            int corePoolSize = ThreadPool.getCorePoolSize();
            // 將要累積的核心執行緒數
            int currentcorePoolSize = corePoolSize + incrementCorePoolSize;
            ThreadPool.setCorePoolSize(currentcorePoolSize);
            ThreadPool.setMaximumPoolSize(currentcorePoolSize);
            cacheCorePoolSize = currentcorePoolSize;
            System.out.println("動態改變執行緒池大小====原核心執行緒池數目為:" + corePoolSize + ";現累加為:" + currentcorePoolSize);
        } else {
            System.out.println("動態改變執行緒池大小====核心執行緒池數目已累加為:" + cacheCorePoolSize + ";不會繼續無限增加");
        }
    }
}

/**
 * 獲取核心執行緒數 getCacheCorePoolSize:(). <br/>
 *
 * @author
 * @return
 */
public int getCacheCorePoolSize() {

    return ThreadPool.getCorePoolSize();
}

/**
 * 設定核心執行緒數 setCacheCorePoolSize:(). <br/>
 *
 * @author
 * @param cacheCorePoolSize
 */
public void setCacheCorePoolSize(int cacheCorePoolSize) {

    ThreadPool.setCorePoolSize(cacheCorePoolSize);
    ThreadPool.setMaximumPoolSize(cacheCorePoolSize);
    this.cacheCorePoolSize = cacheCorePoolSize;
}

/**
 *
 * successResp:(正確響應資訊). <br/>
 *
 * @author
 * @param data
 * @return
 */
private Map<String, Object> successResp(Object data) {

    Map<String, Object> result = new HashMap<String, Object>();
    result.put("status", "0");
    result.put("data", data);
    return result;

}

/**
 *
 * errorResp:(錯誤響應資訊). <br/>
 *
 * @author
 * @param errorMsg
 * @param data
 * @return
 */
public Map<String, Object> errorResp(String errorMsg, Object data) {

    Map<String, Object> result = new HashMap<String, Object>();
    result.put("status", "1");
    result.put("msg", errorMsg);
    result.put("data", data);
    return result;

}

}

建立執行緒池類

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyselfThreadPoolExecutor extends ThreadPoolExecutor {

// 初始化父類建構函式及startTime
public MyselfThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
		long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {

	super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

// 按過去執行已提交任務的順序發起一個有序的關閉,但是不接受新任務(已執行的任務不會停止)
@Override
public void shutdown() {

	super.shutdown();

}

// 嘗試停止所有的活動執行任務、暫停等待任務的處理,並返回等待執行的任務列表。在從此方法返回的任務佇列中排空(移除)這些任務。並不保證能夠停止正在處理的活動執行任務,但是會盡力嘗試。
@Override
public List<Runnable> shutdownNow() {

	return super.shutdownNow();

}

// 在執行給定執行緒中的給定 Runnable 之前呼叫的方法.可用於重新初始化ThreadLocals或者執行日誌記錄。
@Override
protected void beforeExecute(Thread t, Runnable r) {

	super.beforeExecute(t, r);
}

// 基於完成執行給定 Runnable 所呼叫的方法
@Override
protected void afterExecute(Runnable r, Throwable t) {

	super.afterExecute(r, t);

	try {
		// Future<?> result = (Future<?>) r;
		// "任務結果:" result.get();
	} catch (Exception e) {
	}
}

/**
 * 
 * getQueueSize:(已執行的任務數). <br/>
 *
 * @author
 * @return
 */
@Override
public long getCompletedTaskCount() {

	return super.getCompletedTaskCount();
}

/**
 * 
 * getQueueSize:(正在執行的任務數). <br/>
 *
 * @author
 * @return
 */
@Override
public int getActiveCount() {

	return super.getActiveCount();
}

/**
 * 
 * getQueueSize:(佇列等待任務數). <br/>
 *
 * @author
 * @return
 */
public int getQueueSize() {

	return getQueue().size();
}

}
測試類
public class TestUtil {
public static void main(String[] args) {

    CommonThreadPoolUtil poolUtil = new CommonThreadPoolUtil();
    poolUtil.dealTask(new Runnable() {
        @Override
        public void run() {
            System.out.println("執行緒池呼叫");
        }
    });
}

}

三 jdk1.5之後提供工具類 Executors
工具類Executors面提供了一些靜態工廠方法,生成一些常用的執行緒池,如下所示:

  • newCachedThreadPool:建立一個可快取的執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制(Interger. MAX_VALUE),執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。

  • newFixedThreadPool:建立固定大小的執行緒池。每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小。執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。

  • newSingleThreadExecutor:建立一個單執行緒的執行緒池。這個執行緒池只有一個執行緒在工作,也就是相當於單執行緒序列執行所有任務。如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。此執行緒池保證所有任務的執行順序按照任務的提交順序執行。

  • newScheduledThreadPool:建立一個大小無限的執行緒池。此執行緒池支援定時以及週期性執行任務的需求。

總結:除了newScheduledThreadPool的內部實現特殊一點之外,其它執行緒池內部都是基於 ThreadPoolExecutor 類(Executor的子類)實現的。
實現:
ScheduledExecutorService scheduExec = Executors.newScheduledThreadPool(10);
scheduExec.schedule(new Runnable() {

  @SuppressWarnings("static-access")
  @Override
  public void run() {
  }

}, 5, TimeUnit.MINUTES);
週期性定時任務5分鐘後執行