關於Django使用 django-celery-beat動態添加定時任務的方法

版本信息

# 插件安裝 
Django==2.2.2
django-celery-beat==2.1.0
django-redis==4.8.0
mysqlclient==2.0.0
django-mysql==3.2.0
redis==3.2.1
uWSGI==2.0.17.1
django-redis-cache==2.1.0

安裝與配置

  1. 安裝上面的對應的celery版本
  2. 配置settings.py
# django時區配置
TIME_ZONE = 'Asia/Shanghai'
# 如果USE_TZ設置為True時,Django會使用系統默認設置的時區,此時的TIME_ZONE不管有沒有設置都不起作用
# 如果USE_TZ 設置為False,TIME_ZONE = 'Asia/Shanghai', 則使用上海的UTC時間。
USE_TZ = False

INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)

# celery beat配置
# CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django-celery-beat.schedulers.DatabaseScheduler'
# celery 的啟動工作數量設置
CELERY_WORKER_CONCURRENCY = 10
# 任務預取功能,會盡量多拿 n 個,以保證獲取的通訊成本可以壓縮。
CELERYD_PREFETCH_MULTIPLIER = 20
# 有些情況下可以防止死鎖
CELERYD_FORCE_EXECV = True
# celery 的 worker 執行多少個任務後進行重啟操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制,如果網絡資源有限,不建議開足馬力。
CELERY_DISABLE_RATE_LIMITS = True
# 設置代理人broker
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'
# 指定 Backend
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'

生成數據庫

python manage.py migrate

# 遷移之後生成的表結構
django_celery_beat.models.PeriodicTask 此模型定義要運行的單個周期性任務。
django_celery_beat.models.IntervalSchedule 以特定間隔(例如,每5秒)運行的計劃。
django_celery_beat.models.CrontabSchedule 與像在cron項領域的時間表 分鐘小時日的一周 DAY_OF_MONTH month_of_year
django_celery_beat.models.PeriodicTasks 此模型僅用作索引以跟蹤計劃何時更改

在工作目錄下配置celery.py

# -*- coding: utf-8 -*-
# @File:    celeryc.py
# @Content: celery定時任務配置


import os
from celery import Celery, platforms
from celery.schedules import crontab
from django.conf import settings

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_server.settings")
app = Celery("django_server")
app.config_from_object("django.conf:settings", namespace="CELERY")
# 定時任務的存放位置
app.autodiscover_tasks(["monitoring.tasks", "wechat.tasks"])

創建tasks任務

from celery import shared_task

@shared_task
def alarm_monitor_task(**kwargs):
  print("定時任務!!!")

創建定時任務

from django_celery_beat.models import PeriodicTask, IntervalSchedule

-----周期性任務
#  創建10分鐘的間隔 interval 對象
schedule, _ = IntervalSchedule.objects.update_or_create(every=10, period=IntervalSchedule.MINUTES)
# 如果任務存在就更新,不存在則創建
PeriodicTask.objects.update_or_create(
  defaults={
    "interval": schedule,  # 上面創建10分鐘的間隔 interval 對象
    "task": "monitoring.tasks.alarm_monitor_task",  # 指定需要周期性執行的任務
    "args"=json.dumps(['arg1', 'arg2']),
    "kwargs": json.dumps({"a": 1, "b": 2}, ensure_ascii=False)  # 傳入的參數
  },
  name="定時任務-task",
)
# 周期性任務可選參數
IntervalSchedule.DAYS 固定間隔天數
IntervalSchedule.HOURS 固定間隔小時數
IntervalSchedule.MINUTES 固定間隔分鐘數
IntervalSchedule.SECONDS 固定間隔秒數
IntervalSchedule.MICROSECONDS 固定間隔微秒


----Crontab 周期性任務
from django_celery_beat.models import CrontabSchedule, PeriodicTask
# 創建間隔30分鐘執行的任務
crontab, _ = CrontabSchedule.objects.update_or_create(
  minute="*/30",
  hour="*",
  day_of_week="*",
  day_of_month='*',
  month_of_year='*',
  timezone=pytz.timezone("Asia/Shanghai"),
)
# 任務存在則更新,不存在創建
PeriodicTask.objects.update_or_create(
  name=task_name,
  defaults={
    "kwargs": json.dumps(kwargs, ensure_ascii=False),
    "task": "wechat.tasks.subscribe_task",
    "crontab": crontab,
  },
)

# 刪除任務
task = PeriodicTask.objects.filter(name__startswith=sub_id)
if task:
    task.update(enabled=False)
    task.delete()
    
    
# 暫停當前任務
tasks = PeriodicTask.objects.filter(name__startswith=sub_id)
if tasks:
    tasks.update(enabled=True if status else False)

運行任務

# 啟動任務 work
celery -A django_server worker -l INFO --logfile=/var/log/dec_server/worker.log

# 啟動定時器觸發 beat
celery -A django_server beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler --logfile=/var/log/dec_server/beat.log

Tips:

參考鏈接:

https://github.com/celery/django-celery-beat

https://pypi.org/project/django-celery-beat/

到此這篇關於Django使用 django-celery-beat動態添加定時任務的文章就介紹到這瞭,更多相關django celery beat動態添加定時任務內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: