一分鐘掌握Java Quartz定時任務
前言
前幾篇介紹瞭單體架構的定時任務解決方式,但是現代軟件架構由於業務復雜度高,業務的耦合性太強,已經由單體架構拆分成瞭分佈式架構。因此,定時任務的架構也隨之修改。而Quartz是分佈式定時任務解決方案中使用簡單,結構清晰,且不依賴第三方分佈式調度中間件的。上車,mars醬帶你車裡細說~
角色介紹
Quartz入門使用的角色不多,三個角色足夠,分別是:
Scheduler
:調度器。用來負責任務的調度;
Job
:任務。這是一個接口,業務代碼繼承Job接口並實現它的execute
方法,是業務執行的主體部分;
Trigger
: 觸發器。也是個接口,有兩個觸發器比較關鍵,一個是SimpleTrigger
,另一個是CronTrigger
。前者支持簡單的定時,比如:按時、按秒等;後者直接支持cron表達式。下面我們從官方的源代碼入手,看看Quartz如何做到分佈式的。
官方例子
官方源代碼down下來之後,有個examples文件夾:
example1是入門級中最簡單的。就兩個java文件,一個HelloJob:
package org.quartz.examples.example1; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; /** * <p> * This is just a simple job that says "Hello" to the world. * </p> * * @author Bill Kratzer */ public class HelloJob implements Job { private static Logger _log = LoggerFactory.getLogger(HelloJob.class); /** * <p> * Empty constructor for job initilization * </p> * <p> * Quartz requires a public empty constructor so that the * scheduler can instantiate the class whenever it needs. * </p> */ public HelloJob() { } /** * <p> * Called by the <code>{@link org.quartz.Scheduler}</code> when a * <code>{@link org.quartz.Trigger}</code> fires that is associated with * the <code>Job</code>. * </p> * * @throws JobExecutionException * if there is an exception while executing the job. */ public void execute(JobExecutionContext context) throws JobExecutionException { // Say Hello to the World and display the date/time _log.info("Hello World! - " + new Date()); } }
另一個SimpleExample:
package org.quartz.examples.example1; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SchedulerFactory; import org.quartz.Trigger; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import static org.quartz.DateBuilder.evenMinuteDate; import static org.quartz.JobBuilder.newJob; import static org.quartz.TriggerBuilder.newTrigger; /** * This Example will demonstrate how to start and shutdown the Quartz scheduler and how to schedule a job to run in * Quartz. * * @author Bill Kratzer */ public class SimpleExample { public void run() throws Exception { Logger log = LoggerFactory.getLogger(SimpleExample.class); log.info("------- Initializing ----------------------"); // 1. 創建一個scheduler SchedulerFactory sf = new StdSchedulerFactory(); Scheduler sched = sf.getScheduler(); log.info("------- Initialization Complete -----------"); // computer a time that is on the next round minute Date runTime = evenMinuteDate(new Date()); log.info("------- Scheduling Job -------------------"); // 2. 指定一個job JobDetail job = newJob(HelloJob.class).withIdentity("job1", "group1").build(); // 3. 指定一個trigger Trigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(runTime).build(); // 4. 綁定job和trigger sched.scheduleJob(job, trigger); log.info(job.getKey() + " will run at: " + runTime); // 5. 執行 sched.start(); log.info("------- Started Scheduler -----------------"); // wait long enough so that the scheduler as an opportunity to // run the job! log.info("------- Waiting 65 seconds... -------------"); try { // wait 65 seconds to show job Thread.sleep(65L * 1000L); // executing... } catch (Exception e) { // } // shut down the scheduler log.info("------- Shutting Down ---------------------"); sched.shutdown(true); log.info("------- Shutdown Complete -----------------"); } public static void main(String[] args) throws Exception { SimpleExample example = new SimpleExample(); example.run(); } }
整個SimpleExample隻有五個步驟:
- 創建Scheduler,這是一個調度器,例子中使用調度器工廠來創建一個調度器;
- 創建一個Job。實際上Job就是那個HelloJob,但是這裡把HelloJob丟給瞭JobDetail對象,Job接口本身隻有一個execute函數,沒有其他的屬性瞭,如果需要附加其他屬性,JobDetail就支持,比如我們需要往Job中傳遞參數,JobDetail中提供瞭一個JobDataMap。當Job在運行的時候,execute函數裡面的就能獲取到JobDetail對象,並將設置的數據傳遞給Job接口的實現;
- 創建一個Trigger。Trigger對象主責是任務的執行時間,比如官方例子中的startAt函數,就指定瞭具體的運行時間,還有startNow(立即執行);
- 用scheduler綁定Job和Trigger;
- 執行scheduler。
Quartz的使用是不是簡單又清晰?Job是任務,單一職責,不做任何其他事情。Trigger負責執行的頻率等等屬性。Scheduler負責按照Trigger的規則去執行Job的內容。各自部分的功能符合單一原則。
但是,到這裡都不是分佈式的方式,依然是單體架構的。那麼,Quartz如何做到分佈式的呢?
Quartz如何分佈式?
Quartz的分佈式實現方式並不依賴其他分佈式協調管理中間件完成,而是使用數據鎖來實現。使用數據做協調管理中間件的唯一的前提是:需要把集群的每臺機器時間校對一致。
Quartz數據庫核心表如下:
表名 | 功能描述 |
---|---|
QRTZ_CALENDARS | 存儲Quartz的Calendar信息 |
QRTZ_CRON_TRIGGERS | 存儲CronTrigger,包括Cron表達式和時區信息 |
QRTZ_FIRED_TRIGGERS | 存儲與已觸發的Trigger相關的狀態信息,以及相聯Job的執行信息 |
QRTZ_PAUSED_TRIGGER_GRPS | 存儲已暫停的Trigger組的信息 |
QRTZ_SCHEDULER_STATE | 存儲少量的有關Scheduler的狀態信息,和別的Scheduler實例 |
QRTZ_LOCKS | 存儲程序的悲觀鎖的信息 |
QRTZ_JOB_DETAILS | 存儲每一個已配置的Job的詳細信息 |
QRTZ_JOB_LISTENERS | 存儲有關已配置的JobListener的信息 |
QRTZ_SIMPLE_TRIGGERS | 存儲簡單的Trigger,包括重復次數、間隔、以及已觸的次數 |
QRTZ_BLOG_TRIGGERS | Trigger作為Blob類型存儲 |
QRTZ_TRIGGER_LISTENERS | 存儲已配置的TriggerListener的信息 |
QRTZ_TRIGGERS | 存儲已配置的Trigger的信息 |
字體加粗的QRTZ_LOCKS表是一個悲觀鎖的存儲實現,Quartz認為每條記錄都可能會產生並發沖突。以上表的SQL可以在quartz目錄中找到:
找到自己喜歡的數據庫品牌,並創建好表即可。
跟著官方例子看源碼
我們從Hello的execute方法開始,反著找,繼續看看分佈式的方式如何實現。為什麼反著找呢?因為這裡是我們業務實現的主體內容,Quartz框架最終必須要調用到這個execute的具體實現的。我們找到調用execute的地方有很多處:
從類名來分析,DirectoryScanJob看著是目錄掃描任務,FileScanJob直譯是文件掃描任務,SendMailJob是發送郵件任務,最後隻剩那個JobRunShell,畢竟翻譯過來叫“任務運行の核心”啊。進入JobRunShell,找到調用execute函數的部分,execute函數被包裹在一個一百三十多行長又長的run函數中:
public void run() { qs.addInternalSchedulerListener(this); try { // ...省略很多源代碼 do { // ...省略很多源代碼 try { begin(); } catch (SchedulerException se) { // ... 省略源代碼 } // ... 省略源代碼 try { log.debug("Calling execute on job " + jobDetail.getKey()); // 這裡負責執行job的execute函數 job.execute(jec); endTime = System.currentTimeMillis(); } catch (JobExecutionException jee) { // ... 省略源代碼 } catch (Throwable e) { // ... 省略源代碼 } // ...省略很多源代碼 try { complete(true); } catch (SchedulerException se) { // ... 省略源代碼 } // ...省略很多源代碼 } while (true); } finally { qs.removeInternalSchedulerListener(this); } }
可以看到run中間的execute被夾在一個begin函數和comlete函數中,而begin和complete的實現是一個基於JTA事務的JTAJobRunShell的實現來完成的。JobRunShell是一個Runnable接口的實現,那麼剛剛的run方法,必定在某處啟用瞭線程(池)的start方法。
mars醬繼續跟蹤查找源代碼,在QuartzSchedulerThread中的run函數中,找到JobRunShell的調用部分:
@Override public void run() { int acquiresFailed = 0; while (!halted.get()) { // ...省略很多源代碼 // 源代碼279行 int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); // ...省略很多源代碼 if(availThreadCount > 0) { // ...省略很多源代碼 // 取下一個trigger,周期是30秒取一次 triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); // ...省略很多源代碼 // 觸發trigger List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); // ...省略很多源代碼 // 釋放trigger,當bndle的結果是null就釋放trigger if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // ...省略很多源代碼 JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } // 這裡調用JobRunShell if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // ...省略很多源代碼 } } } }
QuartzSchedulerThread的run函數就是核心處理流程瞭,qsRsrcs.getThreadPool().runInThread(shell)
內部就根據具體的SimpleThreadPool或者ZeroSizeThreadPool來執行run函數,while循環基本就是不停的在輪詢不斷的去拿trigger,然後判斷trigger的時間是不是到瞭,再按照時間trigger的時間規則執行Job,最後再標記為完成、釋放trigger。
Trigger的處理
上面的邏輯都是通過qsRsrcs.getJobStore()
得到的對象去處理Trigger的,返回對象是JobStore。任意查看qsRsrcs.getJobStore()
調用的函數,比如:releaseAcquiredTriggerJobStore,它的實現有兩個是比較重要的:一個是RAMJobStore,一個是JobStoreSupport。前者是RAM作為存儲介質,作者還特意寫上瞭這樣一段註釋:
This class implements a JobStore that utilizes RAM as its storage device.
As you should know, the ramification of this is that access is extrememly fast, but the data is completely volatile – therefore this JobStore should not be used if true persistence between program shutdowns is required.
這段英文的央視翻譯:
這個類實現瞭一個使用RAM作為存儲設備的JobStore。
您應該知道,這樣做的後果是訪問速度非常快,但是數據是完全不穩定的——因此,如果需要在程序關閉之間實現真正的持久性,則不應該使用這個JobStore。
而且內存存儲也無法分佈式處理吧?所以,mars醬選擇瞭觀看JobStoreSupport:
從import可以知道,這個玩意是連接瞭數據庫的,所以呢,acquireNextTriggers、triggersFired、releaseAcquiredTrigger這些方法負責具體trigger的相關操作,都最終會執行到JobStoreSupport的第3844行的executeInNonManagedTXLock函數:
/** * Execute the given callback having optionally acquired the given lock. * This uses the non-managed transaction connection. * * @param lockName The name of the lock to acquire, for example * "TRIGGER_ACCESS". If null, then no lock is acquired, but the * lockCallback is still executed in a non-managed transaction. */ protected <T> T executeInNonManagedTXLock( String lockName, TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException { boolean transOwner = false; Connection conn = null; try { if (lockName != null) { // If we aren't using db locks, then delay getting DB connection // until after acquiring the lock since it isn't needed. if (getLockHandler().requiresConnection()) { conn = getNonManagedTXConnection(); } transOwner = getLockHandler().obtainLock(conn, lockName); } if (conn == null) { conn = getNonManagedTXConnection(); } final T result = txCallback.execute(conn); try { commitConnection(conn); } catch (JobPersistenceException e) { rollbackConnection(conn); if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() { @Override public Boolean execute(Connection conn) throws JobPersistenceException { return txValidator.validate(conn, result); } })) { throw e; } } Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion(); if(sigTime != null && sigTime >= 0) { signalSchedulingChangeImmediately(sigTime); } return result; } catch (JobPersistenceException e) { rollbackConnection(conn); throw e; } catch (RuntimeException e) { rollbackConnection(conn); throw new JobPersistenceException("Unexpected runtime exception: " + e.getMessage(), e); } finally { try { releaseLock(lockName, transOwner); } finally { cleanupConnection(conn); } } }
整體的過程簡要說明就是:獲取數據庫連接,給需要執行的trigger加鎖,處理完之後再釋放鎖。
結合起來
結合前面的流程來看,一個調度器在執行前如果涉及到分佈式的情況,流程如下:
- 首先要獲取QUARTZ_LOCKS表中對應的鎖(在
executeInNonManagedTXLock
函數的getLockHandler().obtainLock(conn, lockName)
中); - 獲取鎖後執行QuartzSchedulerThread中的JobRunShell,完成任務的執行;
- 最後QuartzSchedulerThread中調用
triggeredJobComplete
函數,鎖被釋放,在executeInNonManagedTXLock
函數的releaseLock(lockName, transOwner)
中執行;
集群中的每一個調度器實例都遵循這樣的操作流程。
總結
Quartz 是一款用於分佈式系統的高性能調度框架,它采用瞭數據庫作為分佈式鎖機制來保證同一時刻隻有一個 Scheduler 實例訪問數據庫中的 Trigger。
在 Quartz 中,調度器線程會爭搶訪問數據庫中的 Trigger,以確保在同一時刻隻有一個調度器線程執行某個 Trigger 的操作。如果有多個調度器線程同時嘗試訪問同一個 Trigger,它們會相互等待對方釋放鎖。當一個調度器線程獲得瞭鎖,它就可以訪問數據庫並執行相應的操作。
另外,Quartz 還采用瞭悲觀鎖的策略來避免死鎖的發生。當一個調度器線程嘗試獲取鎖時,如果鎖已經被其他線程占用,那麼這個線程會等待,直到有線程釋放瞭鎖。如果在等待過程中沒有其他線程釋放鎖,那麼這個線程就會一直等待下去,直到調度器重新分配瞭鎖。
總之,Quartz 的分佈式調度原理是通過數據庫鎖和悲觀鎖來實現的,以保證同一時刻隻有一個調度器線程訪問數據庫中的 Trigger,從而提高系統的性能和可靠性。
以上就是一分鐘掌握Java Quartz定時任務的詳細內容,更多關於Java Quartz定時任務的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- springboot整合quartz項目使用案例
- Java 任務調度框架 Quartz實操
- SpringBoot2.6.3集成quartz的方式
- Spring Boot 配置 Quartz 定時任務的方法
- Java使用quartz實現定時任務示例詳解