使用 Celery Once 來防止 Celery 重復執行同一個任務

在使用 Celery 的時候發現有的時候 Celery 會將同一個任務執行兩遍,我遇到的情況是相同的任務在不同的 worker 中被分別執行,並且時間隻相差幾毫秒。這問題我一直以為是自己哪裡處理的邏輯有問題,後來發現其他人 也有類似的問題,然後基本上出問題的都是使用 Redis 作為 Broker 的,而我這邊一方面不想將 Redis 替換掉,就隻能在 task 執行的時候加分佈式鎖瞭。

不過在 Celery 的 issue 中搜索瞭一下,有人使用 Redis 實現瞭分佈式鎖,然後也有人使用瞭 Celery Once。 大致看瞭一下 Celery Once ,發現非常符合現在的情況,就用瞭下。

Celery Once 也是利用 Redis 加鎖來實現, Celery Once 在 Task 類基礎上實現瞭 QueueOnce 類,該類提供瞭任務去重的功能,所以在使用時,我們自己實現的方法需要將 QueueOnce 設置為 base

@task(base=QueueOnce, once={‘graceful’: True})

後面的 once 參數表示,在遇到重復方法時的處理方式,默認 graceful 為 False,那樣 Celery 會拋出 AlreadyQueued 異常,手動設置為 True,則靜默處理。

另外如果要手動設置任務的 key,可以指定 keys 參數

@celery.task(base=QueueOnce, once={'keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

總得來說,分為幾步

第一步,安裝

pip install -U celery_once

第二步,增加配置

from celery import Celery
from celery_once import QueueOnce
from time import sleep

celery = Celery('tasks', broker='amqp://guest@localhost//')
celery.conf.ONCE = {
  'backend': 'celery_once.backends.Redis',
  'settings': {
    'url': 'redis://localhost:6379/0',
    'default_timeout': 60 * 60
  }
}

第三步,修改 delay 方法

example.delay(10)
# 修改為
result = example.apply_async(args=(10))

第四步,修改 task 參數

@celery.task(base=QueueOnce, once={'graceful': True, keys': ['a']})
def slow_add(a, b):
    sleep(30)
    return a + b

參考鏈接 https://github.com/cameronmaske/celery-once

到此這篇關於使用 Celery Once 來防止 Celery 重復執行同一個任務的文章就介紹到這瞭,更多相關 Celery 重復執行同一個任務內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: