Django中celery的使用項目實例

1、django應用Celery

django框架請求/響應的過程是同步的,框架本身無法實現異步響應。

但是我們在項目過程中會經常會遇到一些耗時的任務, 比如:發送郵件、發送短信、大數據統計等等,這些操作耗時長,同步執行對用戶體驗非常不友好,那麼在這種情況下就需要實現異步執行。

異步執行前端一般使用ajax,後端使用Celery。

2 、項目應用

django項目應用celery,主要有兩種任務方式,一是異步任務(發佈者任務),一般是web請求,二是定時任務。

celery組成

Celery是由Python開發、簡單、靈活、可靠的分佈式任務隊列,是一個處理異步任務的框架,其本質是生產者消費者模型,生產者發送任務到消息隊列,消費者負責處理任務。Celery側重於實時操作,但對調度支持也很好,其每天可以處理數以百萬計的任務。特點:

簡單:熟悉celery的工作流程後,配置使用簡單

高可用:當任務執行失敗或執行過程中發生連接中斷,celery會自動嘗試重新執行任務

快速:一個單進程的celery每分鐘可處理上百萬個任務

靈活:幾乎celery的各個組件都可以被擴展及自定制

Celery由三部分構成:

消息中間件(Broker):官方提供瞭很多備選方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推薦RabbitMQ

任務執行單元(Worker):任務執行單元,負責從消息隊列中取出任務執行,它可以啟動一個或者多個,也可以啟動在不同的機器節點,這就是其實現分佈式的核心

結果存儲(Backend):官方提供瞭諸多的存儲方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch等

架構如下:

工作原理:

任務模塊Task包含異步任務和定時任務。其中,異步任務通常在業務邏輯中被觸發並發往消息隊列,而定時任務由Celery Beat進程周期性地將任務發往消息隊列;

任務執行單元Worker實時監視消息隊列獲取隊列中的任務執行;

Woker執行完任務後將結果保存在Backend中;

本文使用的是redis數據庫作為消息中間件和結果存儲數據庫

1.異步任務redis

1.安裝庫

pip install celery
pip install redis

2.celery.py

在主項目目錄下,新建 celery.py 文件:

import os
import django
from celery import Celery
from django.conf import settings
 
# 設置系統環境變量,安裝django,必須設置,否則在啟動celery時會報錯
# celery_study 是當前項目名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_study.settings')
django.setup()
 
celery_app = Celery('celery_study')
celery_app.config_from_object('django.conf:settings')
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

註意:是和settings.py文件同目錄,一定不能建立在項目根目錄,不然會引起 celery 這個模塊名的命名沖突  

同時,在主項目的init.py中,添加如下代碼:

from .celery import celery_app
 
__all__ = ['celery_app']

3.settings.py

在配置文件中配置對應的redis配置:

# Broker配置,使用Redis作為消息中間件
BROKER_URL = 'redis://127.0.0.1:6379/0' 
 
# BACKEND配置,這裡使用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' 
 
# 結果序列化方案
CELERY_RESULT_SERIALIZER = 'json' 
 
# 任務結果過期時間,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 
 
# 時區配置
CELERY_TIMEZONE='Asia/Shanghai'   
 
# 指定導入的任務模塊,可以指定多個
#CELERY_IMPORTS = (     
#    'other_dir.tasks',
#)

註意:所有配置的官方文檔:Configuration and defaults — Celery 5.2.0b3 documentation  

4.tasks.py

在子應用下建立各自對應的任務文件tasks.py(必須是tasks.py這個名字,不允許修改)

from celery import shared_task
 
@shared_task
def add(x, y):
    return x + y
 
@shared_task
def mul(x, y):
    return x * y
 
@shared_task
def xsum(numbers):
    return sum(numbers)

5.調用任務

from .tasks import *
# Create your views here.
 
def task_add_view(request):
    add.delay(100,200)
    return HttpResponse(f'調用函數結果')

6.啟動celery

pip install eventlet
celery  -A celery_study worker  -l debug -P eventlet

註意 :celery_study是項目名

使用redis時,有可能會出現如下類似的異常

AttributeError: 'str' object has no attribute 'items'

這是由於版本差異,需要卸載已經安裝的python環境中的 redis 庫,重新指定安裝特定版本(celery4.x以下適用 redis2.10.6, celery4.3以上使用redis3.2.0以上):

xxxxxxxxxx pip install redis==2.10.6

7.獲取任務結果

在 views.py 中,通過 AsyncResult.get() 獲取結果

from celery import result
def get_result_by_taskid(request):
    task_id = request.GET.get('task_id')
	# 異步執行
    ar = result.AsyncResult(task_id)
 
    if ar.ready():
        return JsonResponse({'status': ar.state, 'result': ar.get()})
    else:
        return JsonResponse({'status': ar.state, 'result': ''})

AsyncResult類的常用的屬性和方法:

  • state: 返回任務狀態,等同status;
  • task_id: 返回任務id;
  • result: 返回任務結果,同get()方法;
  • ready(): 判斷任務是否執行以及有結果,有結果為True,否則False;
  • info(): 獲取任務信息,默認為結果;
  • wait(t): 等待t秒後獲取結果,若任務執行完畢,則不等待直接獲取結果,若任務在執行中,則wait期間一直阻塞,直到超時報錯;
  • successful(): 判斷任務是否成功,成功為True,否則為False;

2.定時任務

在第一步的異步任務的基礎上,進行部分修改即可

1.settings.py

from celery.schedules import crontab
 
CELERYBEAT_SCHEDULE = {
    'mul_every_30_seconds': {
         # 任務路徑
        'task': 'celery_app.tasks.mul',
         # 每30秒執行一次
        'schedule': 5,
        'args': (14, 5)
    }
}

說明(更多內容見文檔:Periodic Tasks — Celery 5.2.0b3 documentation):

  • task:任務函數
  • schedule:執行頻率,可以是整型(秒數),也可以是timedelta對象,也可以是crontab對象,也可以是自定義類(繼承celery.schedules.schedule)
  • args:位置參數,列表或元組
  • kwargs:關鍵字參數,字典
  • options:可選參數,字典,任何 apply_async() 支持的參數
  • relative:默認是False,取相對於beat的開始時間;設置為True,則取設置的timedelta時間

在task.py中設置瞭日志

from celery import shared_task
import logging  
logger = logging.getLogger(__name__))
 
 
@shared_task
def mul(x, y):
    logger.info('___mul__'*10)
    return x * y

2.啟動celery

(兩個cmd)分別啟動worker和beat

celery -A worker celery_study -l debug -P eventlet
celery beat -A celery_study -l debug

3.任務綁定

Celery可通過task綁定到實例獲取到task的上下文,這樣我們可以在task運行時候獲取到task的狀態,記錄相關日志等

方法:

  • 在裝飾器中加入參數 bind=True
  • 在task函數中的第一個參數設置為self

在task.py 裡面寫

from celery import shared_task
import logging  
logger = logging.getLogger(__name__)
 
 
# 任務綁定
@shared_task(bind=True)
def add(self,x, y):
    logger.info('add__-----'*10)
    logger.info('name:',self.name)
    logger.info('dir(self)',dir(self))
    return x + y

其中:self對象是celery.app.task.Task的實例,可以用於實現重試等多種功能

from celery import shared_task
import logging  
logger = logging.getLogger(__name__)
 
 
# 任務綁定
@shared_task(bind=True)
def add(self,x, y):
    try:
        logger.info('add__-----'*10)
        logger.info('name:',self.name)
        logger.info('dir(self)',dir(self))
        raise Exception
    except Exception as e:
        # 出錯每4秒嘗試一次,總共嘗試4次
        self.retry(exc=e, countdown=4, max_retries=4)    
    return x + y

啟動celery

celery -A worker celery_study -l debug -P eventlet

4.任務鉤子

Celery在執行任務時,提供瞭鉤子方法用於在任務執行完成時候進行對應的操作,在Task源碼中提供瞭很多狀態鉤子函數如:on_success(成功後執行)、on_failure(失敗時候執行)、on_retry(任務重試時候執行)、after_return(任務返回時候執行)

方法:通過繼承Task類,重寫對應方法即可,

from celery import Task
 
class MyHookTask(Task):
 
    def on_success(self, retval, task_id, args, kwargs):
        logger.info(f'task id:{task_id} , arg:{args} , successful !')
 
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.info(f'task id:{task_id} , arg:{args} , failed ! erros: {exc}')
 
    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.info(f'task id:{task_id} , arg:{args} , retry !  erros: {exc}')
 
# 在對應的task函數的裝飾器中,通過 base=MyHookTask 指定
@shared_task(base=MyHookTask, bind=True)
def add(self,x, y):
    logger.info('add__-----'*10)
    logger.info('name:',self.name)
    logger.info('dir(self)',dir(self))
    return x + y

啟動celery

celery -A worker celery_study -l debug -P eventlet

5.任務編排

在很多情況下,一個任務需要由多個子任務或者一個任務需要很多步驟才能完成,Celery也能實現這樣的任務,完成這類型的任務通過以下模塊完成:

  • group: 並行調度任務
  • chain: 鏈式任務調度
  • chord: 類似group,但分header和body2個部分,header可以是一個group任務,執行完成後調用body的任務
  • map: 映射調度,通過輸入多個入參來多次調度同一個任務
  • starmap: 類似map,入參類似*args
  • chunks: 將任務按照一定數量進行分組

文檔:Next Steps — Celery 5.2.0b3 documentation

1.group

urls.py:

path('primitive/', views.test_primitive),

views.py:

from .tasks import *
from celery import group
 
def test_primitive(request):
    # 創建10個並列的任務
    lazy_group = group(add.s(i, i) for i in range(10))
    promise = lazy_group()
    result = promise.get()
    return JsonResponse({'function': 'test_primitive', 'result': result})

說明:

通過task函數的 s 方法傳入參數,啟動任務

上面這種方法需要進行等待,如果依然想實現異步的方式,那麼就必須在tasks.py中新建一個task方法,調用group,示例如下:

tasks.py:

@shared_task
def group_task(num):
    return group(add.s(i, i) for i in range(num))().get()

urls.py:

path('first_group/', views.first_group),

views.py:

def first_group(request):
    ar = tasks.group_task.delay(10)
 
    return HttpResponse('返回first_group任務,task_id:' + ar.task_id)

2.chain

默認上一個任務的結果作為下一個任務的第一個參數

def test_primitive(request):
    # 等同調用  mul(add(add(2, 2), 5), 8)
    promise = chain(tasks.add.s(2, 2), tasks.add.s(5), tasks.mul.s(8))()
    #  72
    result = promise.get()  
    return JsonResponse({'function': 'test_primitive', 'result': result})

3.chord

任務分割,分為header和body兩部分,hearder任務執行完在執行body,其中hearder返回結果作為參數傳遞給body

def test_primitive(request):
    # header:  [3, 12] 
    # body: xsum([3, 12])
    promise = chord(header=[tasks.add.s(1,2),tasks.mul.s(3,4)],body=tasks.xsum.s())()
    result = promise.get()
    return JsonResponse({'function': 'test_primitive', 'result': result})

6、celery管理和監控

celery通過flower組件實現管理和監控功能 ,flower組件不僅僅提供監控功能,還提供HTTP API可實現對woker和task的管理

官網:flower · PyPI

文檔:Flower – Celery monitoring tool — Flower 1.0.1 documentation

安裝flower

pip install flower

啟動flower

flower -A celery_study --port=5555   

說明:

  • -A:項目名
  • –port: 端口號

訪問

在瀏覽器輸入:http://127.0.0.1:5555

通過api操作

curl http://127.0.0.1:5555/api/workers

總結

到此這篇關於Django中celery使用項目的文章就介紹到這瞭,更多相關Django中celery使用內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: