spring schedule配置多任務動態cron(增刪啟停)

一、背景

之前公司經常會遇到配置定時任務,簡單的任務可以直接依賴spring。
簡單任務直接使用 @scheduled 註解配合@EnableScheduling。
但是如何實現簡單的動態cron呢?

開發原則:
盡可能在項目本身去實現,少依賴第三方框架,避免項目過於臃腫和復雜。

倆種任務調度方式:

在這裡插入圖片描述

二、本篇說明

springBoot 基礎模塊 spring-boot-starter-web 已經內置 schedule ,無需引入額外依賴。
先思考幾個問題:

1、動態 cron 實現的原理

任務的 【 停止】是基於 future接口 的cancel() 方法。
任務的 【增加、刪除、啟動】是基於 註冊到 類ScheduledTaskRegistrar 的 ScheduledFuture的數量。
涉及核心類:

  • ScheduledFuture
  • SchedulingConfigurer
  • ScheduledTaskRegistrar

2、多任務並行執行配置
spring默認機制對schedule是單線程,需要配置多線程並行執行。

3、如何配置多個任務
好多博文,都是配置一個cron,這讓初學者很難受。

4、如何配置任務分組
根據自己業務背景,可根據步驟三,進行改造。

5、如何配置服務啟動自啟任務。
想要程序啟動時首次去加我們設置的task,隻需實現 CommandLineRunner 即可。

6、如何從數據庫讀取配置
這個其實很簡單,在實現 ScheduledTaskRegistrar 時,先直接查詢我們需要的數據即可。

7、如何優雅的實現我們的代碼
這裡為瞭我們多個task實現時,去除臃腫的if else ,使用策略模式去實現我們的task,這裡代碼裡面會具體介紹。

參考類圖:

在這裡插入圖片描述

8、如何去觸發我們的schedule 【增刪啟停】
配置好 task任務類,註入到 controller ,通過接口直接調用即可。

三、代碼實現

先貼出我的github 代碼,下面代碼描述不全。

  • 普通多任務動態cron
  • 分組多任務動態cron

1. 普通多任務動態cron 實現

1.1 對應數據庫的實體類 TaskEntity

@Data
@AllArgsConstructor
@NoArgsConstructor
public class TaskEntity {
  /**
   * 任務id
   */
  private int taskId;
  /**
   * 任務說明
   */
  private String desc;
  /**
   * cron 表達式
   */
  private String expression;
}

1.2 配置每個任務實現

配置任務接口 TaskService

public interface TaskService {

  void HandlerJob();

  Integer jobId();

}

配置任務接口實現 TaskServiceJob1Impl、TaskServiceJob2Impl …

@Service
public class TaskServiceJob1Impl implements TaskService {
  @Override
  public void HandlerJob() {
    System.out.println("------job1 開始執行---------:"+new Date());

    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "  " + Thread.currentThread().getName() + "  任務一啟動");
    try {
      Thread.sleep(10000);//任務耗時10秒
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "  " + Thread.currentThread().getName() + "  結束");

  }

  @Override
  public Integer jobId() {
    return 1;
  }
}

1.3 配置任務解析器 TaskSolverChooser

註:
這裡引入策略模式
為啥要配置 任務解析器選擇器:
因為我們實現多個任務時,一個任務對應一個 CronTask,需要在 MyScheduledTask 裡面去實現我們每一個方法。
譬如,我們有100個任務就要自定義100個任務實現方法,代碼會很臃腫,明顯不符合,【開閉原則】,於是這裡采用策略模式,解耦我們多個任務業務實現邏輯。

@Slf4j
@Component
public class TaskSolverChooser implements ApplicationContextAware {

  private ApplicationContext applicationContext;

  private Map<Integer, TaskService> chooseMap = new HashMap<>(16);

  /**
   * 拿到spring context 上下文
   */
  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = applicationContext;
  }

  @PostConstruct
  private void registerToTaskSolver(){
    Map<String, TaskService> taskServiceMap = applicationContext.getBeansOfType(TaskService.class);
    for (TaskService value : taskServiceMap.values()) {
      chooseMap.put(value.jobId(), value);
      log.info("task {} 處理器: {} 註冊成功",new Object[]{value.jobId(),value});
    }
  }

  /**
   * 獲取需要的job
   */
  public TaskService getTask(Integer jobId){
    return chooseMap.get(jobId);
  }
}

1.4 配置MyScheduledTask (動態cron核心配置)

說明:
1、配置多線程執行任務
2、配置 刷新 task
3、配置 停止 task
4、配置 執行task 業務邏輯

@Component
public class MyScheduledTask implements SchedulingConfigurer {

  private volatile ScheduledTaskRegistrar registrar;

  private final ConcurrentHashMap<Integer, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
  private final ConcurrentHashMap<Integer, CronTask> cronTasks = new ConcurrentHashMap<>();

  @Autowired
  private TaskSolverChooser taskSolverChooser;

  @Override
  public void configureTasks(ScheduledTaskRegistrar registrar) {

    //設置20個線程,默認單線程,如果不設置的話,不能同時並發執行任務
    registrar.setScheduler(Executors.newScheduledThreadPool(10));
    this.registrar = registrar;
  }

  /**
   * 修改 cron 需要 調用該方法
   */
  public void refresh(List<TaskEntity> tasks){
    //取消已經刪除的策略任務
    Set<Integer> sids = scheduledFutures.keySet();
    for (Integer sid : sids) {
      if(!exists(tasks, sid)){
        scheduledFutures.get(sid).cancel(false);
      }
    }
    for (TaskEntity TaskEntity : tasks) {
      String expression = TaskEntity.getExpression();
      //計劃任務表達式為空則跳過
      if(!StringUtils.hasLength(expression)){
        continue;
      }
      //計劃任務已存在並且表達式未發生變化則跳過
      if (scheduledFutures.containsKey(TaskEntity.getTaskId())
          && cronTasks.get(TaskEntity.getTaskId()).getExpression().equals(expression)) {
        continue;
      }
      //如果策略執行時間發生瞭變化,則取消當前策略的任務
      if(scheduledFutures.containsKey(TaskEntity.getTaskId())){
        scheduledFutures.get(TaskEntity.getTaskId()).cancel(false);
        scheduledFutures.remove(TaskEntity.getTaskId());
        cronTasks.remove(TaskEntity.getTaskId());
      }
      //業務邏輯處理
      CronTask task = cronTask(TaskEntity, expression);


      //執行業務
      ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());
      cronTasks.put(TaskEntity.getTaskId(), task);
      scheduledFutures.put(TaskEntity.getTaskId(), future);
    }
  }

  /**
   * 停止 cron 運行
   */
  public void stop(List<TaskEntity> tasks){
    tasks.forEach(item->{
      if (scheduledFutures.containsKey(item.getTaskId())) {
        // mayInterruptIfRunning設成false話,不允許在線程運行時中斷,設成true的話就允許。
        scheduledFutures.get(item.getTaskId()).cancel(false);
        scheduledFutures.remove(item.getTaskId());
      }
    });
  }

  /**
   * 業務邏輯處理
   */
  public CronTask cronTask(TaskEntity TaskEntity, String expression) {
    return new CronTask(() -> {
          //每個計劃任務實際需要執行的具體業務邏輯
          //采用策略,模式 ,執行我們的job
          taskSolverChooser.getTask(TaskEntity.getTaskId()).HandlerJob();
        }, expression);
  }

  private boolean exists(List<TaskEntity> tasks, Integer tid){
    for(TaskEntity TaskEntity:tasks){
      if(TaskEntity.getTaskId() == tid){
        return true;
      }
    }
    return false;
  }

  @PreDestroy
  public void destroy() {
    this.registrar.destroy();
  }

}

1.5 配置程序啟動時首次去加我們設置的task

@Component
public class StartInitTask implements CommandLineRunner {

  @Autowired
  private MyScheduledTask myScheduledTask;

  @Override
  public void run(String... args) throws Exception {
    List<TaskEntity> list = Arrays.asList(
        new TaskEntity(1, "測試1", "0/1 * * * * ?"),
        new TaskEntity(2, "測試2", "0/1 * * * * ?")
    );
    myScheduledTask.refresh(list);
  }
}

1.6 配置web接口去觸發,增刪啟停

@RestController
public class StartController {

  @Autowired
  private MyScheduledTask scheduledTask;

  @PostMapping(value = "/startOrChangeCron")
  public String changeCron(@RequestBody List<TaskEntity> list){
    if (CollectionUtils.isEmpty(list)) {
      // 這裡模擬存在數據庫的數據
      list = Arrays.asList(
          new TaskEntity(1, "測試1","0/1 * * * * ?") ,
          new TaskEntity(2, "測試2","0/1 * * * * ?")
      );
    }
    scheduledTask.refresh(list);
    return "task任務:" + list.toString() + "已經開始運行";
  }

  @PostMapping(value = "/stopCron")
  public String stopCron(@RequestBody List<TaskEntity> list){
    if (CollectionUtils.isEmpty(list)) {
      // 這裡模擬將要停止的cron可通過前端傳來
      list = Arrays.asList(
          new TaskEntity(1, "測試1","0/1 * * * * ?") ,
          new TaskEntity(2, "測試2","0/1 * * * * ?")
      );
    }
    scheduledTask.stop(list);
    List<Integer> collect = list.stream().map(TaskEntity::getTaskId).collect(Collectors.toList());
    return "task任務:" + collect.toString() + "已經停止啟動";
  }

}

2. 分組多任務動態cron 實現

實現原理:
基於反射實現,根據方法全類名,去動態執行方法。多任務分組配置,根據任務類型進行分組。
eg:
定時任務人員的相關操作,有檢測人員離職狀態,人員業績達標,人員考勤…等,
作用:
對人員定時任務做一個分類,在同一個類裡面去實現不同的task,
比較
《1. 普通多任務動態cron 實現》,是一個類可以實現一個task
《2. 分組多任務動態cron 實現》,是一個類可以實現多個task
詳細可參考: 分組多任務動態cron

3 測試記錄

測試1 項目啟動自啟
TaskServiceJob1Impl和TaskServiceJob1Impl … 設置 阻塞10s
觀察日志時間可發現,已經同時並發執行倆個任務。

在這裡插入圖片描述

測試2 觸發 刷新【增、刪、啟】我們的task,。
其實這裡沒這麼智能,如果需要觸發刷新接口,實際上是重新加載我們的task,就是對應觸發我們,增加任務任務,刪除任務,啟動任務。
使用idea插件測試接口

在這裡插入圖片描述

觀察日志

在這裡插入圖片描述

測試3 觸發 停止接口,停止一個接口。
這裡測試略過…

四、總結

其實實現簡單的動態配置,以上代碼可用,比較簡單。

到此這篇關於spring schedule配置多任務動態cron(增刪啟停)的文章就介紹到這瞭,更多相關spring schedule 多任務動態cron內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: