1. 程式人生 > >Quartz原始碼——scheduler.start()啟動原始碼分析(二)

Quartz原始碼——scheduler.start()啟動原始碼分析(二)

scheduler.start()是Quartz的啟動方式!下面進行分析,方便自己檢視!
我都是分析的jobStore 方式為jdbc的SimpleTrigger!RAM的方式類似分析方式!

解釋:

{0} : 表的字首 ,如表qrtz_trigger ,{0}== qrtz_

{1} :quartz.properties 中配置的 org.quartz.scheduler.instanceName: myInstanceName ,{1} ==myInstanceName

scheduler.start() 呼叫 .QuartzScheduler.start();

Quartz 的啟動要呼叫start()方法進行執行緒的啟動,並執行需要出發的Trigger,start方法裡面進行的操作:

  1. 啟動的初始化
  2. 判斷是否叢集,對應不同的操作
  3. 若是非叢集,首先有恢復機制,恢復任何失敗或misfire的作業,並根據需要清理資料儲存。
  4. 初始化執行緒管理,喚醒所有等待的執行緒!

下面就是簡單的原始碼分析:

1.QuartzScheduler.start();

public void start() throws SchedulerException {

        if (shuttingDown|| closed) {
            throw
new SchedulerException( "The Scheduler cannot be restarted after shutdown() has been called."); } // QTZ-212 : calling new schedulerStarting() method on the listeners // right after entering start() notifySchedulerListenersStarting(); if (initialStart == null
) {//初始化標識為null,進行初始化操作 initialStart = new Date(); this.resources.getJobStore().schedulerStarted();//1.1 主要分析的地方 startPlugins(); } else { resources.getJobStore().schedulerResumed();//1.2如果已經初始化過,則恢復jobStore } schedThread.togglePause(false);//3.1 喚醒所有等待的執行緒 getLog().info( "Scheduler " + resources.getUniqueIdentifier() + " started."); notifySchedulerListenersStarted(); }

1.1 this.resources.getJobStore().schedulerStarted();//主要分析的地方

public void schedulerStarted() throws SchedulerException {
        //是叢集
        if (isClustered()) {
            clusterManagementThread = new ClusterManager();
            if(initializersLoader != null)
                clusterManagementThread.setContextClassLoader(initializersLoader);
            clusterManagementThread.initialize();
        } else {//不是叢集
            try {
                recoverJobs();//2.1 恢復job 
            } catch (SchedulerException se) {
                throw new SchedulerConfigException(
                        "Failure occured during job recovery.", se);
            }
        }

        misfireHandler = new MisfireHandler();
        if(initializersLoader != null)
            misfireHandler.setContextClassLoader(initializersLoader);
        misfireHandler.initialize();//2.2 獲取ThreadExecutor 執行緒管理
        schedulerRunning = true;

        getLog().debug("JobStore background threads started (as scheduler was started).");
    }   

1.2如果已經初始化過,則恢復排程器執行 resources.getJobStore().schedulerResumed();

 private volatile boolean schedulerRunning = false;//預設schedulerRunning = false
 public void schedulerResumed() {
        schedulerRunning = true;
    }

2.1 恢復job recoverJobs();

//啟動的時候 有一個恢復機制:
    //recoverJobs -----  將恢復任何失敗或misfire的作業,並根據需要清理資料儲存。
     protected void recoverJobs() throws JobPersistenceException {
        executeInNonManagedTXLock(
            LOCK_TRIGGER_ACCESS,
            new VoidTransactionCallback() {
                public void executeVoid(Connection conn) throws JobPersistenceException {
                    recoverJobs(conn);//恢復job
                }
            }, null);
    }

    protected void recoverJobs(Connection conn) throws JobPersistenceException {
        try {
            (1//更新不一致的作業狀態     先修改狀態,將 ACQUIRED 和 BLOCKED ---> WAITING
            int rows = getDelegate().updateTriggerStatesFromOtherStates(conn,
                    STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED);

            rows += getDelegate().updateTriggerStatesFromOtherStates(conn,
                        STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED);

            //----更新sql---      
            //"UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND (TRIGGER_STATE = ? OR TRIGGER_STATE = ?)"

            getLog().info(
                    "Freed " + rows
                            + " triggers from 'acquired' / 'blocked' state.");

            // clean up misfired jobs
            //2.1.1 清理misfire的jobs
            recoverMisfiredJobs(conn, true);

            // recover jobs marked for recovery that were not fully executed
            //2.1.2 恢復未完全執行的標記為恢復的作業 --查詢 qrtz_fire_trigger
            List<OperableTrigger> recoveringJobTriggers = getDelegate()
                    .selectTriggersForRecoveringJobs(conn);
            getLog()
                    .info(
                            "Recovering "
                                    + recoveringJobTriggers.size()
                                    + " jobs that were in-progress at the time of the last shut-down.");

            for (OperableTrigger recoveringJobTrigger: recoveringJobTriggers) {
                if (jobExists(conn, recoveringJobTrigger.getJobKey())) {
                    recoveringJobTrigger.computeFirstFireTime(null);
                    storeTrigger(conn, recoveringJobTrigger, null, false,
                            STATE_WAITING, false, true);
                }
            }
            getLog().info("Recovery complete.");

            // remove lingering 'complete' triggers...
            //2.1.3 移除state == complete 
            List<TriggerKey> cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
            for(TriggerKey ct: cts) {
                removeTrigger(conn, ct);
            }
            getLog().info(
                "Removed " + cts.size() + " 'complete' triggers.");

            // clean up any fired trigger entries
            //2.1.4 清理任何已觸發的觸發器條目
            int n = getDelegate().deleteFiredTriggers(conn);
            getLog().info("Removed " + n + " stale fired job entries.");
        } catch (JobPersistenceException e) {
            throw e;
        } catch (Exception e) {
            throw new JobPersistenceException("Couldn't recover jobs: "
                    + e.getMessage(), e);
        }
    }

2.1.1 清理misfire的jobs recoverMisfiredJobs(conn, true);

    //是否有misfire的Trigger
        //我們必須仍然尋找MISFIRED狀態,以防觸發器被遺忘
        //在此狀態下升級到此版本不支援
    (a1)hasMisfiredTriggersInState(conn, STATE_WAITING, getMisfireTime(),    
        maxMisfiresToHandleAtATime, misfiredTriggers);   
        ////getMisfireTime() 當前時間 -(減去) 一分鐘 ,maxMisfiresToHandleAtATime == -1   ,misfiredTriggers== null 

        "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? AND TRIGGER_STATE = ? ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC"

        上面sql查詢出來結果是個list
                (aa1)若resultList.size() == count 返回 TRUE!! 否則 返回false!
                (aa2)不等於 count ,封裝資料,到resultList中,triggername  TriggerGroup

        //查詢出來有misfire 的 Trigger        
    (b2) misfiredTriggers.size() > 0
         (bb1)輸出日誌資訊   :getLog().info(
        "Handling " + misfiredTriggers.size() + 
            " trigger(s) that missed their scheduled fire-time.");

        (bb2)迴圈 misfiredTriggers List集合
            for (TriggerKey triggerKey: misfiredTriggers) {
                //retrieveTrigger ,檢索Trigger,檢索到進行資料封裝
                OperableTrigger trig = 
                    retrieveTrigger(conn, triggerKey);
                     //retrieveTrigger 執行的操作1"SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"2)關聯Trigger對應的型別,如simpleTrigger等

                if (trig == null) {
                    continue;
                }

                //do 更新misfire的觸發器
                doUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering); //recovering===TRUE1)cal = retrieveCalendar(conn, trig.getCalendarName()); 搞這個表,qrtz_calendar
                    (2)trig.updateAfterMisfire(cal); //simpleTrigger預設的misfire 機制  
                         setNextFireTime(new Date()); //設定下次執行的時間(next_fire_time)為當前時間!這裡比較重要!!!3) getNextFireTime != null
                         if (trig.getNextFireTime() == null) {
                            storeTrigger(conn, trig,
                                null, true, STATE_COMPLETE, forceState, recovering);
                            schedSignaler.notifySchedulerListenersFinalized(trig);
                        } else {
                            storeTrigger(conn, trig, null, true, newStateIfNotComplete,
                                    forceState, false);

                            // job == null  replaceExisting ==true  state==waitting   forceState==false  recovering==false
                            storeTrigger(Connection conn,
                                OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state,
                                boolean forceState, boolean recovering)

                                //Insert or update a trigger.
                               boolean existingTrigger = triggerExists(conn, newTrigger.getKey());
                                if (existingTrigger) {

                                    //state == waitting 
                                    getDelegate().updateTrigger(conn, newTrigger, state, job);

                                    //更新sql
                                    /*
                                    "UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"*/


                                } else {
                                    getDelegate().insertTrigger(conn, newTrigger, state, job);
                                    //插入sql
                                    /*"INSERT INTO {0}TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, DESCRIPTION, NEXT_FIRE_TIME, PREV_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, START_TIME, END_TIME, CALENDAR_NAME, MISFIRE_INSTR, JOB_DATA, PRIORITY)  VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"*/
                                }
                        }

    (c3) long earliestNewTime = Long.MAX_VALUE; // long earliestNewTime = Long.MAX_VALUE; === 9223372036854775807
          if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime){
                earliestNewTime = trig.getNextFireTime().getTime();
          }

2.1.2 恢復未完全執行的標記為恢復的作業

List<OperableTrigger> recoveringJobTriggers = getDelegate()
                    .selectTriggersForRecoveringJobs(conn);                 
        // INSTANCE_NAME == dufy_test    REQUESTS_RECOVERY == true  實際封裝到資料庫查詢是 REQUESTS_RECOVERY== 1
        "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ? AND REQUESTS_RECOVERY = ?"
        //具體怎麼是 true是怎麼轉換成為 1的見附1圖片!

    Recovery complete.恢復完成!!    

2.1.3 移除state == complete

List<TriggerKey> cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
-----------------------------------------------------------------------------
"SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ?"
-----------------------------------------------------------------------------
for(TriggerKey ct: cts) {
    removeTrigger(conn, ct);
    ---------------------------------------------------------------------
        (a)刪除前,先查詢jobDetail
              JobDetail job = getDelegate().selectJobForTrigger(conn,getClassLoadHelper(), key, false);
                "SELECT J.JOB_NAME, J.JOB_GROUP, J.IS_DURABLE, J.JOB_CLASS_NAME, J.REQUESTS_RECOVERY FROM {0}TRIGGERS T, {0}JOB_DETAILS J WHERE T.SCHED_NAME = {1} AND J.SCHED_NAME = {1} AND T.TRIGGER_NAME = ? AND T.TRIGGER_GROUP = ? AND T.JOB_NAME = J.JOB_NAME AND T.JOB_GROUP = J.JOB_GROUP"

        (b)刪除觸發器,其偵聽器及其Simple / Cron / BLOB子表條目。
          boolean removedTrigger = deleteTriggerAndChildren(conn, key);
             deleteTrigger(Connection conn, TriggerKey triggerKey)
            (b1)deleteTriggerExtension
                   "DELETE FROM {0}SIMPLE_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
                   "DELETE FROM {0}BLOB_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"

            (b2)"DELETE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"

        (c)是否刪除jobdetail ,判斷 isDurable 預設 為falseif (null != job && !job.isDurable()) {
                int numTriggers = getDelegate().selectNumTriggersForJob(conn,
                        job.getKey());
                ---------------------------------------------------------
                "SELECT COUNT(TRIGGER_NAME) FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
                ---------------------------------------------------------
                if (numTriggers == 0) {
                    // Don't call removeJob() because we don't want to check for
                    // triggers again.
                    //不要呼叫removeJob(),因為我們不想再次檢查觸發器。
                    deleteJobAndChildren(conn, job.getKey()); //刪除作業及其偵聽器。
                    -----------------------------------------------------
                    //deleteJobDetail(Connection conn, JobKey jobKey) 刪除給定作業的作業明細記錄。
                    "DELETE FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
                    -----------------------------------------------------
                }
            }
    }

2.1.4 清理任何已觸發的觸發器條目

 int n = getDelegate().deleteFiredTriggers(conn);
 ----------------------------------------------------------------------------
 "DELETE FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1}"
 ----------------------------------------------------------------------------

2.2 獲取ThreadExecutor 執行緒管理 misfireHandler.initialize();

public void initialize() {
        ThreadExecutor executor = getThreadExecutor();
        //getThreadExecutor ==  private ThreadExecutor threadExecutor = new DefaultThreadExecutor();
        executor.execute(MisfireHandler.this); //啟動執行緒執行 對應job的 execute方法
        //MisfireHandler  ==  class MisfireHandler extends Thread  繼承了Thread
}

3.1 喚醒所有等待的執行緒 schedThread.togglePause(false);

 schedThread.togglePause(false);
    //指示主處理迴圈在下一個可能的點暫停。
    void togglePause(boolean pause) {
        synchronized (sigLock) {
            paused = pause;

            if (paused) {
                signalSchedulingChange(0);
                ------------------------------------------
                //發訊號通知主要處理迴圈,已經進行了排程的改變 - 以便中斷在等待misfire時間到達時可能發生的任何睡眠。
                public void signalSchedulingChange(long candidateNewNextFireTime) {
                    synchronized(sigLock) {
                        signaled = true;
                        signaledNextFireTime = candidateNewNextFireTime;
                        sigLock.notifyAll();  // private final Object sigLock = new Object();
                    }
                }
                ------------------------------------------
            } else {
                sigLock.notifyAll();//喚醒所有等待的執行緒
            }
        }
    }   

附1:true 是如何轉換為 1 的:
這裡寫圖片描述

Quartz專欄系列

歡迎訪問我的csdn部落格,我們一同成長!

不管做什麼,只要堅持下去就會看到不一樣!在路上,不卑不亢!