1. 程式人生 > >執行緒的建立與執行緒池ThreadPoolExecutor,Executors

執行緒的建立與執行緒池ThreadPoolExecutor,Executors

                    執行緒的建立與執行緒池及執行緒池工具類

1.執行緒的建立方式

1.1繼承Thread類重寫run方法

public class Test {

	public static void main(String[] args) {
	Thread thread = new MyThread();
	thread.setName("mythread001");
	thread.start();  
	}

	
}
class MyThread extends Thread{
	@Override
	public void run() {
		// TODO Auto-generated method stub
		System.out.println(Thread.currentThread().getName() + ",running ....");
	}
}

1.2實現Runnable介面

public class Test {

	public static void main(String[] args) {
	Thread thread = new Thread(new MyRunnable());
	thread.setName("myRunnable001");
	thread.start();  
	}
}
class MyRunnable implements Runnable{

	@Override
	public void run() {
		System.out.println("my runnable...");
	}
}

    其實以上兩種方式本質上是一樣的,都是重寫了介面Runnable的run方法,因為Thread類實現了Runnable介面。Runnable介面是JDK1.0提供的,在java.lang包下,但是Runnable介面的run方法有的場景下並不合適,比如不能返回運算結果和丟擲檢查異常,這時我們可以使用JDK1.5提供的在java.util.concurrent包下的Callable介面:

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

1.3Callable介面型別的執行緒建立

因為該介面可以返回一個結果,所以我們需要獲取返回結果,這時系統提供了一個Future介面用於獲取返回的結果,但是該介面代表的是一個非同步計算的結果,所以我們需要等到計算完成時才能獲取結果,否則就會一直阻塞直到結果返回。Future介面內的方法如下:

 /**
     * Attempts to cancel execution of this task.  This attempt will
     * fail if the task has already completed, has already been cancelled,
     * or could not be cancelled for some other reason. If successful,
     * and this task has not started when {@code cancel} is called,
     * this task should never run.  If the task has already started,
     * then the {@code mayInterruptIfRunning} parameter determines
     * whether the thread executing this task should be interrupted in
     * an attempt to stop the task.
     *
     * <p>After this method returns, subsequent calls to {@link #isDone} will
     * always return {@code true}.  Subsequent calls to {@link #isCancelled}
     * will always return {@code true} if this method returned {@code true}.
     *
     * @param mayInterruptIfRunning {@code true} if the thread executing this
     * task should be interrupted; otherwise, in-progress tasks are allowed
     * to complete
     * @return {@code false} if the task could not be cancelled,
     * typically because it has already completed normally;
     * {@code true} otherwise
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * Returns {@code true} if this task was cancelled before it completed
     * normally.
     *
     * @return {@code true} if this task was cancelled before it completed
     */
    boolean isCancelled();

    /**
     * Returns {@code true} if this task completed.
     *
     * Completion may be due to normal termination, an exception, or
     * cancellation -- in all of these cases, this method will return
     * {@code true}.
     *
     * @return {@code true} if this task completed
     */
    boolean isDone();

    /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     * @throws TimeoutException if the wait timed out
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

所以我們獲取結果要在呼叫isDone()方法返回true時才能呼叫get()方法。該介面有一個系統提供的實現類FutureTask,FutureTask類實現了RunnableFuture<V>,RunnableFuture<V>介面繼承了Runnable, Future<V>兩個介面。FutureTask中實現的Runnable中的run方法呼叫了Callable介面中的call方法。所以建立Callable介面的執行緒的方法與Runnable類似:

public class Test {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		FutureTask<String> future = new FutureTask<String>(new Callable<String>() {

			@Override
			public String call() throws Exception {
				return "hello";
			}
		});

		Thread thread = new Thread(future);
		thread.setName("myCallable001");
		thread.start();
		while (!future.isDone()) {
			try {
				System.out.println("myCallable001 is not done");
				//等待執行緒執行結束
				Thread.currentThread().sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		String result = future.get();
		System.out.println("result : " + result);
	}
}

執行結果如圖:

2.執行緒池

        執行緒的建立我們已經知道了,但是需要讓執行緒啟動執行我們需要呼叫start()方法,該方法會為執行緒準備必要的系統資源,比如執行緒的棧記憶體,程式計數器和本地棧等,所以一個執行緒是需要消耗系統資源的,也是耗時的。為了降低系統資源的消耗和提高系統的執行效率我們需要對執行緒複用,就像資料庫連線池一樣,使用執行緒池技術,來達到這個目的。JDK為了我們提供了ThreadPoolExecutor類,接下來我們學習下這個執行緒池類:

2.1Executor

    該介面原始碼如下:

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

該介面聲明瞭一個方法,void execute(Runnable command);用來執行一個執行緒。

2.2ExecutorService

    該介面繼承自Executor,並聲明瞭以下方法:

該介面種的方法更加實用,shutdown()方法與shutdownNow()方法用於停掉執行緒池,原始碼中寫了如下的例子:

    首先呼叫shutdown()阻止新的任務提交到執行緒池,呼叫awaitTermination等待正在執行的執行緒執行完畢,或者超時或者被打斷執行,然後再呼叫shutdownNow()取消當前正在執行的任務。其他的方法見名知意。

2.3AbstractExecutorService

    該抽象類實現了ExecutorService介面,其方法如圖:

這些方法中要說的就是<T> RunnableFuture<T> newTaskFor(Runnable runnable, T value),該方法將Runnable介面的任務轉換為Callable相關的FutureTask<T>。其程式碼實現如圖:

在FutureTask<T>中的構造方法原始碼如下:

再到Executors.callable(runnable, result)檢視原始碼如下:

2.4ThreadPoolExecutor

    建構函式

    執行緒池實現類,我們先看該類的核心建構函式:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

引數:corePoolSize。the number of threads to keep in the pool, even if they are idle, unless {@code allowCoreThreadTimeOut} is set。執行緒池中保持的執行緒數量,儘管這些執行緒是空閒的,除非設定allowCoreThreadTimeOut引數為true,則在空閒時間超過keepAliveTime時,會被終止掉。allowCoreThreadTimeOut預設為false。

引數:maximumPoolSize。執行緒池中允許的最大執行緒數量。

引數:keepAliveTime。保持活躍的時間,也就是當執行緒池中的執行緒數量超過corePoolSize時,這些超量的執行緒在等待被新任務使用前的最大等待時間,超過找個時間就要被終止掉了。

引數:unit。保持活躍時間的單位。可選為:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS等。

引數:workQueue。工作佇列。這佇列用來保持那些execute()方法提交的還沒有執行的任務。常用的佇列有SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。一般我們需要根據自己的實際業務需求選擇合適的工作佇列。

SynchronousQueue:直接傳遞。對於一個好的預設的工作佇列選擇是SynchronousQueue,該佇列傳遞任務到執行緒而不持有它們。在這一點上,試圖向該佇列壓入一個任務,如果沒有可用的執行緒立刻執行任務,那麼就會入列失敗,所以一個新的執行緒就會被建立。當處理那些內部依賴的任務集合時,這個選擇可以避免鎖住。直接接傳遞通常需要無邊界的最大執行緒數來避免新提交任務被拒絕處理。當任務以平均快於被處理的速度提交到執行緒池時,它依次地確認無邊界執行緒增長的可能性;

LinkedBlockingDeque:無界佇列。沒有預先定義容量的無界佇列,在核心執行緒數都繁忙的時候會使新提交的任務在佇列中等待被執行,所以將不會建立更多的執行緒,因此,最大執行緒數的值將不起作用。當每個任務之間是相互獨立的時比較適合該佇列,所以任務之間不能互相影響執行。例如,在一個WEB頁面伺服器,當平滑的出現短暫的請求爆發時這個型別的佇列是非常有用的,當任務以快於平均處理速度被提交時該佇列會確認無邊界佇列增長的可能性。

ArrayBlockingQueue:有界阻塞佇列,遵循FIFO原則,一旦建立容量不能改變,當向一個已經滿了的該佇列中新增元素和向一個已經為空的該佇列取出元素都會導致阻塞;當執行緒池使用有限的最大執行緒數時該佇列可以幫助保護資源枯竭,但它更難協調和控制。佇列大小和最大執行緒數在效能上可以互相交換:使用大佇列和小執行緒池會降低CPU使用和OS資源與上下文切換開銷,但會導致人為降低吞吐量,如果任務頻繁阻塞,系統的執行緒排程時間會超過我們的允許值;如果使用小佇列大池,這將會使CPU較為繁忙但會出現難以接受的排程開銷,這也會導致降低吞吐量。

引數:threadFactory。執行緒工廠。當執行緒池需要建立執行緒的時候用來建立執行緒。預設是Executors類的靜態內部類DefaultThreadFactory。原始碼如下:

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

主要是設定了一些守護性和優先順序等屬性。

引數:handler。拒絕執行處理器。當執行緒的數量已經到了邊界值,並且workQueue中任務也達到最大值,此時需要使用處理器處理多餘的任務。該引數有四個值,分別是CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy。這四個處理器是執行緒池的內部類,都實現了RejectedExecutionHandler介面。

CallerRunsPolicy:在呼叫execut方法的呼叫執行緒中直接執行執行緒池拒絕的任務;

AbortPolicy:以丟擲一個RejectedExecutionException的方式處理拒絕執行的任務;

DiscardPolicy:以直接忽略的方式處理拒絕執行的任務;

DiscardOldestPolicy:忽略掉最老的沒有處理的拒絕任務,然後繼續嘗試執行execute方法,知道執行緒池關閉,那麼放棄找個任務。

執行緒的管理

    首先看看該類中的註釋中是如何解釋執行緒池怎麼管理執行緒的:

When a new task is submitted in method {@link #execute(Runnable)},
 * and fewer than corePoolSize threads are running, a new thread is
 * created to handle the request, even if other worker threads are
 * idle.  If there are more than corePoolSize but less than
 * maximumPoolSize threads running, a new thread will be created only
 * if the queue is full.  By setting corePoolSize and maximumPoolSize
 * the same, you create a fixed-size thread pool. By setting
 * maximumPoolSize to an essentially unbounded value such as {@code
 * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
 * number of concurrent tasks. Most typically, core and maximum pool
 * sizes are set only upon construction, but they may also be changed
 * dynamically using {@link #setCorePoolSize} and {@link
 * #setMaximumPoolSize}. 

       當一個任務通過execute(Runnable)方法被提交時,並且有少於核心執行緒數的執行緒正在執行,那麼一個新的執行緒會被建立去處理請求,即使其它的執行緒正在空閒;如果有多餘核心執行緒數但小於最大執行緒數的執行緒在執行,如果任務佇列沒有滿,則將任務壓入任務佇列等待執行完任務的執行緒去處理,如果滿了則建立新的執行緒處理該任務;如果正在執行的執行緒數大於最大執行緒數,則需要根據拒絕執行處理器的不同進行處理。通過將核心執行緒數與最大執行緒數設定為相同的值,你可以建立一個固定大小的執行緒池。通過設定最大執行緒數為一個基本上是無邊界的值例如Integer.MAX_VALUE,你允許執行緒池容納任意數量的併發任務。通常,核心執行緒數和最大執行緒數是通過建構函式設定的,但是他們也可以通過使用setCorePoolSize和setMaximumPoolSize動態的改變。

    驗證如下:

    新建一個執行緒池,程式碼如下:

public class Main {

	public static void main(String[] args) throws Exception {

		// 有界佇列
		BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(5);
		// 放棄拒絕的任務並丟擲異常
		RejectedExecutionHandler abortPolicyHandler = new ThreadPoolExecutor.AbortPolicy();
		RejectedExecutionHandler discardPolicyHandler = new ThreadPoolExecutor.DiscardPolicy();
		ThreadPoolExecutor threadPool = 
				new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, workQueue, discardPolicyHandler);
		long start = System.currentTimeMillis();
		for (int i = 0; i < 40; i++) {
			threadPool.execute(new MyTask());
			System.out.println("核心執行緒數" + threadPool.getCorePoolSize());
			System.out.println("最大執行緒數" + threadPool.getMaximumPoolSize());
			System.out.println("執行緒池數" + threadPool.getPoolSize());
			System.out.println("佇列任務數" + threadPool.getQueue().size());
			System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
		}
		
		System.out.println(System.currentTimeMillis()-start);
		threadPool.shutdown();
		if (threadPool.awaitTermination(6, TimeUnit.SECONDS)) {
			threadPool.shutdownNow();
		}
	}

}

class MyTask implements Runnable {

	@Override
	public void run() {
		try {
			Thread.currentThread().sleep(5000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println(Thread.currentThread().getName() + ", hello");
	}

}

執行部分日誌如下: 

核心執行緒數5
最大執行緒數10
執行緒池數1
佇列任務數0
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數2
佇列任務數0
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數3
佇列任務數0
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數4
佇列任務數0
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數5
佇列任務數0
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數5
佇列任務數1
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數5
佇列任務數2
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數5
佇列任務數3
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數5
佇列任務數4
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數5
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數6
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數7
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數8
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數9
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
核心執行緒數5
最大執行緒數10
執行緒池數10
佇列任務數5
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

通過以上日誌分析可以驗證執行緒的建立過程是正確的。

初始化核心執行緒數

       預設的核心執行緒只有在任務被提交的時候才會建立和啟動,但是當我們的工作佇列是一個非空佇列的時候我們需要線上程池啟動的時候就有核心執行緒可用,這時候我們可以使用prestartCoreThread()方法和prestartAllCoreThreads()方法。我們只需要線上程池被建立後根據自己的需求呼叫其中的方法即可。

Executors

    因為ThreadPoolExecutor執行緒池類建立執行緒較為麻煩,需要設定的引數比較多。執行緒池工具類,提供了一些工廠方法或者實用方法用來建立或者使用執行緒池,使我們更加方便快捷的建立和使用執行緒池。

  1.     建立固定執行緒數的執行緒池

2.建立單執行緒的執行緒池是固定執行緒的執行緒池的特例。

3.建立快取執行緒池,該執行緒池中的執行緒會隨著任務的提交不斷被建立,也就是as needed。但是先前建立的可用的執行緒仍然可以被複用。當執行緒空閒時間超過60秒後會被終止和從快取中移除。這種執行緒池適合那些有大量的短執行時間的非同步任務。

當然還可以建立一些定時任務執行緒池。