一分鐘掌握Java Quartz定時任務

前言

前幾篇介紹瞭單體架構的定時任務解決方式,但是現代軟件架構由於業務復雜度高,業務的耦合性太強,已經由單體架構拆分成瞭分佈式架構。因此,定時任務的架構也隨之修改。而Quartz是分佈式定時任務解決方案中使用簡單,結構清晰,且不依賴第三方分佈式調度中間件的。上車,mars醬帶你車裡細說~

角色介紹

Quartz入門使用的角色不多,三個角色足夠,分別是:

Scheduler:調度器。用來負責任務的調度;

Job:任務。這是一個接口,業務代碼繼承Job接口並實現它的execute方法,是業務執行的主體部分;

Trigger: 觸發器。也是個接口,有兩個觸發器比較關鍵,一個是SimpleTrigger,另一個是CronTrigger。前者支持簡單的定時,比如:按時、按秒等;後者直接支持cron表達式。下面我們從官方的源代碼入手,看看Quartz如何做到分佈式的。

官方例子

官方源代碼down下來之後,有個examples文件夾:

example1是入門級中最簡單的。就兩個java文件,一個HelloJob:

package org.quartz.examples.example1;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
/**
 * <p>
 * This is just a simple job that says "Hello" to the world.
 * </p>
 * 
 * @author Bill Kratzer
 */
public class HelloJob implements Job {
    private static Logger _log = LoggerFactory.getLogger(HelloJob.class);
    /**
     * <p>
     * Empty constructor for job initilization
     * </p>
     * <p>
     * Quartz requires a public empty constructor so that the
     * scheduler can instantiate the class whenever it needs.
     * </p>
     */
    public HelloJob() {
    }
    /**
     * <p>
     * Called by the <code>{@link org.quartz.Scheduler}</code> when a
     * <code>{@link org.quartz.Trigger}</code> fires that is associated with
     * the <code>Job</code>.
     * </p>
     * 
     * @throws JobExecutionException
     *             if there is an exception while executing the job.
     */
    public void execute(JobExecutionContext context)
        throws JobExecutionException {
        // Say Hello to the World and display the date/time
        _log.info("Hello World! - " + new Date());
    }
}

另一個SimpleExample:

 package org.quartz.examples.example1;
 import org.quartz.JobDetail;
 import org.quartz.Scheduler;
 import org.quartz.SchedulerFactory;
 import org.quartz.Trigger;
 import org.quartz.impl.StdSchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.Date;
 import static org.quartz.DateBuilder.evenMinuteDate;
 import static org.quartz.JobBuilder.newJob;
 import static org.quartz.TriggerBuilder.newTrigger;
 /**
  * This Example will demonstrate how to start and shutdown the Quartz scheduler and how to schedule a job to run in
  * Quartz.
  *
  * @author Bill Kratzer
  */
 public class SimpleExample {
     public void run() throws Exception {
         Logger log = LoggerFactory.getLogger(SimpleExample.class);
         log.info("------- Initializing ----------------------");
         // 1. 創建一個scheduler
         SchedulerFactory sf = new StdSchedulerFactory();
         Scheduler sched = sf.getScheduler();
         log.info("------- Initialization Complete -----------");
         // computer a time that is on the next round minute
         Date runTime = evenMinuteDate(new Date());
         log.info("------- Scheduling Job  -------------------");
         // 2. 指定一個job
         JobDetail job = newJob(HelloJob.class).withIdentity("job1", "group1").build();
         // 3. 指定一個trigger
         Trigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(runTime).build();
         // 4. 綁定job和trigger
         sched.scheduleJob(job, trigger);
         log.info(job.getKey() + " will run at: " + runTime);
         // 5. 執行
         sched.start();
         log.info("------- Started Scheduler -----------------");
         // wait long enough so that the scheduler as an opportunity to
         // run the job!
         log.info("------- Waiting 65 seconds... -------------");
         try {
             // wait 65 seconds to show job
             Thread.sleep(65L * 1000L);
             // executing...
         } catch (Exception e) {
             //
         }
         // shut down the scheduler
         log.info("------- Shutting Down ---------------------");
         sched.shutdown(true);
         log.info("------- Shutdown Complete -----------------");
     }
     public static void main(String[] args) throws Exception {
         SimpleExample example = new SimpleExample();
         example.run();
     }
 }

整個SimpleExample隻有五個步驟:

  • 創建Scheduler,這是一個調度器,例子中使用調度器工廠來創建一個調度器;
  • 創建一個Job。實際上Job就是那個HelloJob,但是這裡把HelloJob丟給瞭JobDetail對象,Job接口本身隻有一個execute函數,沒有其他的屬性瞭,如果需要附加其他屬性,JobDetail就支持,比如我們需要往Job中傳遞參數,JobDetail中提供瞭一個JobDataMap。當Job在運行的時候,execute函數裡面的就能獲取到JobDetail對象,並將設置的數據傳遞給Job接口的實現;
  • 創建一個Trigger。Trigger對象主責是任務的執行時間,比如官方例子中的startAt函數,就指定瞭具體的運行時間,還有startNow(立即執行);
  • 用scheduler綁定Job和Trigger;
  • 執行scheduler。

Quartz的使用是不是簡單又清晰?Job是任務,單一職責,不做任何其他事情。Trigger負責執行的頻率等等屬性。Scheduler負責按照Trigger的規則去執行Job的內容。各自部分的功能符合單一原則。

但是,到這裡都不是分佈式的方式,依然是單體架構的。那麼,Quartz如何做到分佈式的呢?

Quartz如何分佈式?

Quartz的分佈式實現方式並不依賴其他分佈式協調管理中間件完成,而是使用數據鎖來實現。使用數據做協調管理中間件的唯一的前提是:需要把集群的每臺機器時間校對一致。

Quartz數據庫核心表如下:

表名 功能描述
QRTZ_CALENDARS 存儲Quartz的Calendar信息
QRTZ_CRON_TRIGGERS 存儲CronTrigger,包括Cron表達式和時區信息
QRTZ_FIRED_TRIGGERS 存儲與已觸發的Trigger相關的狀態信息,以及相聯Job的執行信息
QRTZ_PAUSED_TRIGGER_GRPS 存儲已暫停的Trigger組的信息
QRTZ_SCHEDULER_STATE 存儲少量的有關Scheduler的狀態信息,和別的Scheduler實例
QRTZ_LOCKS 存儲程序的悲觀鎖的信息
QRTZ_JOB_DETAILS 存儲每一個已配置的Job的詳細信息
QRTZ_JOB_LISTENERS 存儲有關已配置的JobListener的信息
QRTZ_SIMPLE_TRIGGERS 存儲簡單的Trigger,包括重復次數、間隔、以及已觸的次數
QRTZ_BLOG_TRIGGERS Trigger作為Blob類型存儲
QRTZ_TRIGGER_LISTENERS 存儲已配置的TriggerListener的信息
QRTZ_TRIGGERS 存儲已配置的Trigger的信息

字體加粗的QRTZ_LOCKS表是一個悲觀鎖的存儲實現,Quartz認為每條記錄都可能會產生並發沖突。以上表的SQL可以在quartz目錄中找到:

找到自己喜歡的數據庫品牌,並創建好表即可。

跟著官方例子看源碼

我們從Hello的execute方法開始,反著找,繼續看看分佈式的方式如何實現。為什麼反著找呢?因為這裡是我們業務實現的主體內容,Quartz框架最終必須要調用到這個execute的具體實現的。我們找到調用execute的地方有很多處:

從類名來分析,DirectoryScanJob看著是目錄掃描任務,FileScanJob直譯是文件掃描任務,SendMailJob是發送郵件任務,最後隻剩那個JobRunShell,畢竟翻譯過來叫“任務運行の核心”啊。進入JobRunShell,找到調用execute函數的部分,execute函數被包裹在一個一百三十多行長又長的run函數中:

public void run() {
    qs.addInternalSchedulerListener(this);
    try {
        // ...省略很多源代碼
        do {
            // ...省略很多源代碼
            try {
                begin();
            } catch (SchedulerException se) {
                // ... 省略源代碼
            }
            // ... 省略源代碼
            try {
                log.debug("Calling execute on job " + jobDetail.getKey());
                // 這裡負責執行job的execute函數
                job.execute(jec);
                endTime = System.currentTimeMillis();
            } catch (JobExecutionException jee) {
                // ... 省略源代碼
            } catch (Throwable e) {
                // ... 省略源代碼
            }
            // ...省略很多源代碼
            try {
                complete(true);
            } catch (SchedulerException se) {
                // ... 省略源代碼
            }
            // ...省略很多源代碼
        } while (true);
    } finally {
        qs.removeInternalSchedulerListener(this);
    }
}

可以看到run中間的execute被夾在一個begin函數和comlete函數中,而begin和complete的實現是一個基於JTA事務的JTAJobRunShell的實現來完成的。JobRunShell是一個Runnable接口的實現,那麼剛剛的run方法,必定在某處啟用瞭線程(池)的start方法。

mars醬繼續跟蹤查找源代碼,在QuartzSchedulerThread中的run函數中,找到JobRunShell的調用部分:

@Override
public void run() {
    int acquiresFailed = 0;
    while (!halted.get()) {
        // ...省略很多源代碼
        // 源代碼279行
        int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
        // ...省略很多源代碼
        if(availThreadCount > 0) { 
            // ...省略很多源代碼
            // 取下一個trigger,周期是30秒取一次
            triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
            // ...省略很多源代碼
            // 觸發trigger
            List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
            // ...省略很多源代碼
            // 釋放trigger,當bndle的結果是null就釋放trigger
            if (bndle == null) {
                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                continue;
            }
            // ...省略很多源代碼
            JobRunShell shell = null;
            try {
                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                shell.initialize(qs);
            } catch (SchedulerException se) {
                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                continue;
            }
            // 這裡調用JobRunShell
            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                // ...省略很多源代碼
            }
        }  
    }
}

QuartzSchedulerThread的run函數就是核心處理流程瞭,qsRsrcs.getThreadPool().runInThread(shell)內部就根據具體的SimpleThreadPool或者ZeroSizeThreadPool來執行run函數,while循環基本就是不停的在輪詢不斷的去拿trigger,然後判斷trigger的時間是不是到瞭,再按照時間trigger的時間規則執行Job,最後再標記為完成、釋放trigger。

Trigger的處理

上面的邏輯都是通過qsRsrcs.getJobStore()得到的對象去處理Trigger的,返回對象是JobStore。任意查看qsRsrcs.getJobStore()調用的函數,比如:releaseAcquiredTriggerJobStore,它的實現有兩個是比較重要的:一個是RAMJobStore,一個是JobStoreSupport。前者是RAM作為存儲介質,作者還特意寫上瞭這樣一段註釋:

This class implements a JobStore that utilizes RAM as its storage device.

As you should know, the ramification of this is that access is extrememly fast, but the data is completely volatile – therefore this JobStore should not be used if true persistence between program shutdowns is required.

這段英文的央視翻譯:

這個類實現瞭一個使用RAM作為存儲設備的JobStore。

您應該知道,這樣做的後果是訪問速度非常快,但是數據是完全不穩定的——因此,如果需要在程序關閉之間實現真正的持久性,則不應該使用這個JobStore。

而且內存存儲也無法分佈式處理吧?所以,mars醬選擇瞭觀看JobStoreSupport:

從import可以知道,這個玩意是連接瞭數據庫的,所以呢,acquireNextTriggers、triggersFired、releaseAcquiredTrigger這些方法負責具體trigger的相關操作,都最終會執行到JobStoreSupport的第3844行的executeInNonManagedTXLock函數:

    /**
     * Execute the given callback having optionally acquired the given lock.
     * This uses the non-managed transaction connection.
     * 
     * @param lockName The name of the lock to acquire, for example
     * "TRIGGER_ACCESS".  If null, then no lock is acquired, but the
     * lockCallback is still executed in a non-managed transaction. 
     */
    protected <T> T executeInNonManagedTXLock(
            String lockName, 
            TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
        boolean transOwner = false;
        Connection conn = null;
        try {
            if (lockName != null) {
                // If we aren't using db locks, then delay getting DB connection 
                // until after acquiring the lock since it isn't needed.
                if (getLockHandler().requiresConnection()) {
                    conn = getNonManagedTXConnection();
                }
                transOwner = getLockHandler().obtainLock(conn, lockName);
            }
            if (conn == null) {
                conn = getNonManagedTXConnection();
            }
            final T result = txCallback.execute(conn);
            try {
                commitConnection(conn);
            } catch (JobPersistenceException e) {
                rollbackConnection(conn);
                if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
                    @Override
                    public Boolean execute(Connection conn) throws JobPersistenceException {
                        return txValidator.validate(conn, result);
                    }
                })) {
                    throw e;
                }
            }
            Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
            if(sigTime != null && sigTime >= 0) {
                signalSchedulingChangeImmediately(sigTime);
            }
            return result;
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            throw e;
        } catch (RuntimeException e) {
            rollbackConnection(conn);
            throw new JobPersistenceException("Unexpected runtime exception: "
                    + e.getMessage(), e);
        } finally {
            try {
                releaseLock(lockName, transOwner);
            } finally {
                cleanupConnection(conn);
            }
        }
    }

整體的過程簡要說明就是:獲取數據庫連接,給需要執行的trigger加鎖,處理完之後再釋放鎖。

結合起來

結合前面的流程來看,一個調度器在執行前如果涉及到分佈式的情況,流程如下:

  • 首先要獲取QUARTZ_LOCKS表中對應的鎖(在executeInNonManagedTXLock函數的getLockHandler().obtainLock(conn, lockName)中);
  • 獲取鎖後執行QuartzSchedulerThread中的JobRunShell,完成任務的執行;
  • 最後QuartzSchedulerThread中調用triggeredJobComplete函數,鎖被釋放,在executeInNonManagedTXLock函數的releaseLock(lockName, transOwner)中執行;

集群中的每一個調度器實例都遵循這樣的操作流程。

總結

Quartz 是一款用於分佈式系統的高性能調度框架,它采用瞭數據庫作為分佈式鎖機制來保證同一時刻隻有一個 Scheduler 實例訪問數據庫中的 Trigger。

在 Quartz 中,調度器線程會爭搶訪問數據庫中的 Trigger,以確保在同一時刻隻有一個調度器線程執行某個 Trigger 的操作。如果有多個調度器線程同時嘗試訪問同一個 Trigger,它們會相互等待對方釋放鎖。當一個調度器線程獲得瞭鎖,它就可以訪問數據庫並執行相應的操作。

另外,Quartz 還采用瞭悲觀鎖的策略來避免死鎖的發生。當一個調度器線程嘗試獲取鎖時,如果鎖已經被其他線程占用,那麼這個線程會等待,直到有線程釋放瞭鎖。如果在等待過程中沒有其他線程釋放鎖,那麼這個線程就會一直等待下去,直到調度器重新分配瞭鎖。

總之,Quartz 的分佈式調度原理是通過數據庫鎖和悲觀鎖來實現的,以保證同一時刻隻有一個調度器線程訪問數據庫中的 Trigger,從而提高系統的性能和可靠性。

以上就是一分鐘掌握Java Quartz定時任務的詳細內容,更多關於Java Quartz定時任務的資料請關註WalkonNet其它相關文章!

推薦閱讀: