1. 程式人生 > >Java併發程式設計:Timer和TimerTask(轉載)

Java併發程式設計:Timer和TimerTask(轉載)

public Timer(boolean isDaemon) {
    this("Timer-" + serialNumber(), isDaemon);
}

   另外兩個構造方法負責傳入名稱和將timer啟動:

  public Timer(String name, boolean isDaemon) {
        thread.setName(name);
        thread.setDaemon(isDaemon);
        thread.start();
    }

   這裡有一個thread,這個thread很明顯是一個執行緒,被包裝在了Timer類中,我們看下這個thread的定義是:

private TimerThread thread = new TimerThread(queue);

  而定義TimerThread部分的是:

class TimerThread extends Thread {

   看到這裡知道了,Timer內部包裝了一個執行緒,用來做獨立於外部執行緒的排程,而TimerThread是一個default型別的,預設情況下是引用不到的,是被Timer自己所使用的。

  接下來看下有那些屬性

  除了上面提到的thread,還有一個很重要的屬性是:

private TaskQueue queue = new TaskQueue();

   看名字就知道是一個佇列,佇列裡面可以先猜猜看是什麼,那麼大概應該是我要排程的任務吧,先記錄下了,接下來繼續向下看:

裡面還有一個屬性是:threadReaper,它是Object型別,只是重寫了finalize方法而已,是為了垃圾回收的時候,將相應的資訊回收掉,做GC的回補,也就是當timer執行緒由於某種原因死掉了,而未被cancel,裡面的佇列中的資訊需要清空掉,不過我們通常是不會考慮這個方法的,所以知道java寫這個方法是幹什麼的就行了。

  接下來看排程方法的實現:

  對於上面6個排程方法,我們不做一一列舉,為什麼等下你就知道了:

  來看下方法:

public void schedule(TimerTask task, long delay)

  的原始碼如下:

 public void schedule(TimerTask task, long delay) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        sched(task, System.currentTimeMillis()+delay, 0);
    }

  這裡呼叫了另一個方法,將task傳入,第一個引數傳入System.currentTimeMillis()+delay可見為第一次需要執行的時間的時間點了(如果傳入Date,就是物件.getTime()即可,所以傳入Date的幾個方法就不用多說了),而第三個引數傳入了0,這裡可以猜下要麼是時間片,要麼是次數啥的,不過等會就知道是什麼了;另外關於方法:sched的內容我們不著急去看他,先看下過載的方法中是如何做的

  再看看方法:

public void schedule(TimerTask task, long delay,long period)

  原始碼為:

public void schedule(TimerTask task, long delay, long period) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        sched(task, System.currentTimeMillis()+delay, -period);
    }

  看來也呼叫了方法sched來完成排程,和上面的方法唯一的排程時候的區別是增加了傳入的period,而第一個傳入的是0,所以確定這個引數為時間片,而不是次數,注意這個裡的period加了一個負數,也就是取反,也就是我們開始傳入1000,在呼叫sched的時候會變成-1000,其實最終閱讀完原始碼後你會發現這個算是老外對於一種數字的理解,而並非有什麼特殊的意義,所以閱讀原始碼的時候也有這些困難所在。

  最後再看個方法是:

public void scheduleAtFixedRate(TimerTasktask,long delay,long period)

  原始碼為:

 public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        sched(task, System.currentTimeMillis()+delay, period);
    }

   唯一的區別就是在period沒有取反,其實你最終閱讀完原始碼,上面的取反沒有什麼特殊的意義,老外不想增加一個引數來表示scheduleAtFixedRate,而scheduleAtFixedRate和schedule的大部分邏輯程式碼一致,因此用了引數的範圍來作為區分方法,也就是當你傳入的引數不是正數的時候,你呼叫schedule方法正好是得到scheduleAtFixedRate的功能,而呼叫scheduleAtFixedRate方法的時候得到的正好是schedule方法的功能,呵呵,這些討論沒什麼意義,討論實質和重點:

   來看sched方法的實現體:

private void sched(TimerTask task, long time, long period) {
        if (time < 0)
            throw new IllegalArgumentException("Illegal execution time.");

        synchronized(queue) {
            if (!thread.newTasksMayBeScheduled)
                throw new IllegalStateException("Timer already cancelled.");

            synchronized(task.lock) {
                if (task.state != TimerTask.VIRGIN)
                    throw new IllegalStateException(
                        "Task already scheduled or cancelled");
                task.nextExecutionTime = time;
                task.period = period;
                task.state = TimerTask.SCHEDULED;
            }

            queue.add(task);
            if (queue.getMin() == task)
                queue.notify();
        }
    }

  queue為一個佇列,我們先不看他資料結構,看到他在做這個操作的時候,發生了同步,所以在timer級別,這個是執行緒安全的,最後將task相關的引數賦值,主要包含nextExecutionTime(下一次執行時間),period(時間片),state(狀態),然後將它放入queue佇列中,做一次notify操作,為什麼要做notify操作呢?看了後面的程式碼你就知道了。

  簡言之,這裡就是講task放入佇列queue的過程,此時,你可能對queue的結構有些興趣,那麼我們先來看看queue屬性的結構TaskQueue:

class TaskQueue {

    private TimerTask[] queue = new TimerTask[128];

    private int size = 0;

   可見,TaskQueue的結構很簡單,為一個數組,加一個size,有點像ArrayList,是不是長度就128呢,當然不是,ArrayList可以擴容,它可以,只是會造成記憶體拷貝而已,所以一個Timer來講,只要內部的task個數不超過128是不會造成擴容的;內部提供了add(TimerTask)、size()、getMin()、get(int)、removeMin()、quickRemove(int)、rescheduleMin(long newTime)、isEmpty()、clear()、fixUp()、fixDown()、heapify();

  這裡面的方法大概意思是:

  add(TimerTaskt)為增加一個任務

  size()任務佇列的長度

  getMin()獲取當前排序後最近需要執行的一個任務,下標為1,佇列頭部0是不做任何操作的。

  get(inti)獲取指定下標的資料,當然包括下標0.

  removeMin()為刪除當前最近執行的任務,也就是第一個元素,通常只調度一次的任務,在執行完後,呼叫此方法,就可以將TimerTask從佇列中移除。

  quickRmove(inti)刪除指定的元素,一般來說是不會呼叫這個方法的,這個方法只有在Timer發生purge的時候,並且當對應的TimerTask呼叫了cancel方法的時候,才會被呼叫這個方法,也就是取消某個TimerTask,然後就會從佇列中移除(注意如果任務在執行中是,還是仍然在執行中的,雖然在佇列中被移除了),還有就是這個cancel方法並不是Timer的cancel方法而是TimerTask,一個是排程器的,一個是單個任務的,最後注意,這個quickRmove完成後,是將佇列最後一個元素補充到這個位置,所以此時會造成順序不一致的問題,後面會有方法進行回補。

  rescheduleMin(long newTime)是重新設定當前執行的任務的下一次執行時間,並在佇列中將其從新排序到合適的位置,而呼叫的是後面說的fixDown方法。

  對於fixUpfixDown方法來講,前者是當新增一個task的時候,首先將元素放在佇列的尾部,然後向前找是否有比自己還要晚執行的任務,如果有,就將兩個任務的順序進行交換一下。而fixDown正好相反,執行完第一個任務後,需要加上一個時間片得到下一次執行時間,從而需要將其順序與後面的任務進行對比下。

  其次可以看下fixDown的細節為:

 private void fixDown(int k) {
        int j;
        while ((j = k << 1) <= size && j > 0) {
            if (j < size &&
                queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
                j++; // j indexes smallest kid
            if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
            k = j;
        }
    }

   這種方式並非排序,而是找到一個合適的位置來交換,因為並不是通過佇列逐個找的,而是每次移動一個二進位制為,例如傳入1的時候,接下來就是2、4、8、16這些位置,找到合適的位置放下即可,順序未必是完全有序的,它只需要看到距離排程部分的越近的是有序性越強的時候就可以了,這樣即可以保證一定的順序性,達到較好的效能。

  最後一個方法是heapify,其實就是將佇列的後半截,全部做一次fixeDown的操作,這個操作主要是為了回補quickRemove方法,當大量的quickRmove後,順序被打亂後,此時將一半的區域做一次非常簡單的排序即可。

  這些方法我們不在說原始碼了,只需要知道它提供了類似於ArrayList的東西來管理,內部有很多排序之類的處理,我們繼續回到Timer,裡面還有兩個方法是:cancel()和方法purge()方法,其實就cancel方法來講,一個取消操作,在測試中你會發現,如果一旦執行了這個方法timer就會結束掉,看下原始碼是什麼呢:

public void cancel() {
        synchronized(queue) {
            thread.newTasksMayBeScheduled = false;
            queue.clear();
            queue.notify();  // In case queue was already empty.
        }
    }

   貌似僅僅將佇列清空掉,然後設定了newTasksMayBeScheduled狀態為false,最後讓佇列也呼叫了下notify操作,但是沒有任何地方讓執行緒結束掉,那麼就要回到我們開始說的Timer中包含的thread為:TimerThread類了,在看這個類之前,再看下Timer中最後一個purge()類,當你對很多Task做了cancel操作後,此時通過呼叫purge方法實現對這些cancel掉的類空間的回收,上面已經提到,此時會造成順序混亂,所以需要呼叫隊裡的heapify方法來完成順序的重排,原始碼如下:

public int purge() {
         int result = 0;

         synchronized(queue) {
             for (int i = queue.size(); i > 0; i--) {
                 if (queue.get(i).state == TimerTask.CANCELLED) {
                     queue.quickRemove(i);
                     result++;
                 }
             }

             if (result != 0)
                 queue.heapify();
         }
         return result;
     }

  那麼排程呢,是如何排程的呢,那些notify,和清空佇列是如何做到的呢?我們就要看看TimerThread類了,內部有一個屬性是:newTasksMayBeScheduled,也就是我們開始所提及的那個引數在cancel的時候會被設定為false。

  另一個屬性定義了

private TaskQueue queue;

   也就是我們所呼叫的queue了,這下聯通了吧,不過這裡是queue是通過構造方法傳入的,傳入後賦值用以操作,很明顯是Timer傳遞給這個執行緒的,我們知道它是一個執行緒,所以執行的中心自然是run方法了,所以看下run方法的body部分是:

public void run() {
        try {
            mainLoop();
        } finally {
            synchronized(queue) {
                newTasksMayBeScheduled = false;
                queue.clear();  // Eliminate obsolete references
            }
        }
    }

  try很簡單,就一個mainLoop,看名字知道是主迴圈程式,finally中也就是必然執行的程式為將引數為為false,並將佇列清空掉。

那麼最核心的就是mainLoop了,是的,看懂了mainLoop一切都懂了:

private void mainLoop() {
        while (true) {
            try {
                TimerTask task;
                boolean taskFired;
                synchronized(queue) {
                    // Wait for queue to become non-empty
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    if (queue.isEmpty())
                        break; // Queue is empty and will forever remain; die

                    // Queue nonempty; look at first evt and do the right thing
                    long currentTime, executionTime;
                    task = queue.getMin();
                    synchronized(task.lock) {
                        if (task.state == TimerTask.CANCELLED) {
                            queue.removeMin();
                            continue;  // No action required, poll queue again
                        }
                        currentTime = System.currentTimeMillis();
                        executionTime = task.nextExecutionTime;
                        if (taskFired = (executionTime<=currentTime)) {
                            if (task.period == 0) { // Non-repeating, remove
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            } else { // Repeating task, reschedule
                                queue.rescheduleMin(
                                  task.period<0 ? currentTime   - task.period
                                                : executionTime + task.period);
                            }
                        }
                    }
                    if (!taskFired) // Task hasn't yet fired; wait
                        queue.wait(executionTime - currentTime);
                }
                if (taskFired)  // Task fired; run it, holding no locks
                    task.run();
            } catch(InterruptedException e) {
            }
        }
    }

   可以發現這個timer是一個死迴圈程式,除非遇到不能捕獲的異常或break才會跳出,首先注意這段程式碼:

while (queue.isEmpty() &&newTasksMayBeScheduled)
                        queue.wait();

  迴圈體為迴圈過程中,條件為queue為空且newTasksMayBeScheduled狀態為true,可以看到這個狀態其關鍵作用,也就是跳出迴圈的條件就是要麼佇列不為空,要麼是newTasksMayBeScheduled狀態設定為false才會跳出,而wait就是在等待其他地方對queue發生notify操作,從上面的程式碼中可以發現,當發生add、cancel以及在threadReaper呼叫finalize方法的時候會被呼叫,第三個我們基本可以不考慮其實發生add的時候也就是當佇列還是空的時候,發生add使得佇列不為空就跳出迴圈,而cancel是設定了狀態,否則不會進入這個迴圈,那麼看下面的程式碼:

if (queue.isEmpty())
    break;

   當跳出上面的迴圈後,如果是設定了newTasksMayBeScheduled狀態為false跳出,也就是呼叫了cancel,那麼queue就是空的,此時就直接跳出外部的死迴圈,所以cancel就是這樣實現的,如果下面的任務還在跑還沒執行到這裡來,cancel是不起作用的。

  接下來是獲取一個當前系統時間和上次預計的執行時間,如果預計執行的時間小於當前系統時間,那麼就需要執行,此時判定時間片是否為0,如果為0,則呼叫removeMin方法將其移除,否則將task通過rescheduleMin設定最新時間並排序:

currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
if (taskFired = (executionTime<=currentTime)) {
      if (task.period == 0) { // Non-repeating, remove
           queue.removeMin();
           task.state = TimerTask.EXECUTED;
      } else { // Repeating task, reschedule
           queue.rescheduleMin(
           task.period<0 ? currentTime   - task.period
                              : executionTime + task.period);
     }
}

   這裡可以看到,period為負數的時候,就會被認為是按照按照當前系統時間+一個時間片來計算下一次時間,就是前面說的schedule和scheduleAtFixedRate的區別了,其實內部是通過正負數來判定的,也許java是不想增加引數,而又想增加程式的可讀性,才這樣做,其實通過正負判定是有些詭異的,也就是你如果在schedule方法傳入負數達到的功能和scheduleAtFixedRate的功能是一樣的,相反在scheduleAtFixedRate方法中傳入負數功能和schedule方法是一樣的。

  同時你可以看到period為0,就是隻執行一次,所以時間片正負0都用上了,呵呵,然後再看看mainLoop接下來的部分:

if (!taskFired)// Taskhasn't yet fired; wait
    queue.wait(executionTime- currentTime);

   這裡是如果任務執行時間還未到,就等待一段時間,當然這個等待很可能會被其他的執行緒操作add和cancel的時候被喚醒,因為內部有notify方法,所以這個時間並不是完全準確,在這裡大多數情況下是考慮Timer內部的task資訊是穩定的,cancel方法喚醒的話是另一回事。

  最後:

if (taskFired) // Task fired; run it, holding no locks
    task.run();

   如果執行緒需要執行,那麼呼叫它的run方法,而並非啟動一個新的執行緒或從執行緒池中獲取一個執行緒來執行,所以TimerTask的run方法並不是多執行緒的run方法,雖然實現了Runnable,但是僅僅是為了表示它是可執行的,並不代表它必須通過執行緒的方式來執行的。

  回過頭來再看看:

  TimerTimerTask的簡單組合是多執行緒的嘛?不是,一個Timer內部包裝了“一個Thread”和“一個Task”佇列,這個佇列按照一定的方式將任務排隊處理,包含的執行緒在Timer的構造方法呼叫時被啟動,這個Thread的run方法無限迴圈這個Task佇列,若佇列為空且沒發生cancel操作,此時會一直等待,如果等待完成後,佇列還是為空,則認為發生了cancel從而跳出死迴圈,結束任務;迴圈中如果發現任務需要執行的時間小於系統時間,則需要執行,那麼根據任務的時間片從新計算下次執行時間,若時間片為0代表只執行一次,則直接移除佇列即可。

  但是是否能實現多執行緒呢?可以,任何東西是否是多執行緒完全看個人意願,多個Timer自然就是多執行緒的,每個Timer都有自己的執行緒處理邏輯,當然Timer從這裡來看並不是很適合很多工在短時間內的快速排程,至少不是很適合同一個timer上掛很多工,在多執行緒的領域中我們更多是使用多執行緒中的:

Executors.newScheduledThreadPool

 來完成對排程佇列中的執行緒池的處理,內部通過new ScheduledThreadPoolExecutor來建立執行緒池的Executor的建立,當然也可以呼叫:

Executors.unconfigurableScheduledExecutorService

   方法來建立一個DelegatedScheduledExecutorService其實這個類就是包裝了下下scheduleExecutor,也就是這只是一個殼,英文理解就是被委派的意思,被託管的意思。