Flask實現異步執行任務
Flask 是 Python 中有名的輕量級同步 web 框架,在一些開發中,可能會遇到需要長時間處理的任務,此時就需要使用異步的方式來實現,讓長時間任務在後臺運行,先將本次請求的響應狀態返回給前端,不讓前端界面「卡頓」,當異步任務處理好後,如果需要返回狀態,再將狀態返回。
怎麼實現呢?
使用線程的方式
當要執行耗時任務時,直接開啟一個新的線程來執行任務,這種方式最為簡單快速。
通過 ThreadPoolExecutor 來實現
from flask import Flask from time import sleep from concurrent.futures import ThreadPoolExecutor # DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor # 創建線程池執行器 executor = ThreadPoolExecutor(2) app = Flask(__name__) @app.route('/jobs') def run_jobs(): # 交由線程去執行耗時任務 executor.submit(long_task, 'hello', 123) return 'long task running.' # 耗時任務 def long_task(arg1, arg2): print("args: %s %s!" % (arg1, arg2)) sleep(5) print("Task is done!") if __name__ == '__main__': app.run()
當要執行一些比較簡單的耗時任務時就可以使用這種方式,如發郵件、發短信驗證碼等。
但這種方式有個問題,就是前端無法得知任務執行狀態。
如果想要前端知道,就需要設計一些邏輯,比如將任務執行狀態存儲到 redis 中,通過唯一的任務 id 進行標識,然後再寫一個接口,通過任務 id 去獲取任務的狀態,然後讓前端定時去請求該接口,從而獲得任務狀態信息。
全部自己實現就顯得有些麻煩瞭,而 Celery 剛好實現瞭這樣的邏輯,來使用一下。
使用 Celery
為瞭滿足前端可以獲得任務狀態的需求,可以使用 Celery。
Celery 是實時任務處理與調度的分佈式任務隊列,它常用於 web 異步任務、定時任務等,後面單獨寫一篇文章描述 Celery 的架構,這裡不深入討論。
現在我想讓前端可以通過一個進度條來判斷後端任務的執行情況。使用 Celery 就很容易實現,首先通過 pip 安裝 Celery 與 redis,之所以要安裝 redis,是因為讓 Celery 選擇 redis 作為「消息代理 / 消息中間件」。
pip install celery pip install redis
在 Flask 中使用 Celery 其實很簡單,這裡先簡單的過一下 Flask 中使用 Celery 的整體流程,然後再去實現具體的項目
1.在 Flask 中初始化 Celery
from flask import Flask from celery import Celery app = Flask(__name__) # 配置 # 配置消息代理的路徑,如果是在遠程服務器上,則配置遠程服務器中redis的URL app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' # 要存儲 Celery 任務的狀態或運行結果時就必須要配置 app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0' # 初始化Celery celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) # 將Flask中的配置直接傳遞給Celery celery.conf.update(app.config)
上述代碼中,通過 Celery 類初始化 celery 對象,傳入的應用名稱與消息代理的連接 URL。
2.通過 celery.task 裝飾器裝飾耗時任務對應的函數
@celery.task def long_task(arg1, arg2): # 耗時任務的邏輯 return result
3.Flask 中定義接口通過異步的方式執行耗時任務
@app.route('/', methods=['GET', 'POST']) def index(): task = long_task.delay(1, 2) delay () 方法是 applyasync () 方法的快捷方式,applyasync () 參數更多,可以更加細致的控制耗時任務,比如想要 long_task () 在一分鐘後再執行 @app.route('/', methods=['GET', 'POST']) def index(): task = long_task.apply_async(args=[1, 2], countdown=60)
delay () 與 apply_async () 會返回一個任務對象,該對象可以獲取任務的狀態與各種相關信息。
通過這 3 步就可以使用 Celery 瞭。
接著就具體來實現「讓前端可以通過一個進度條來判斷後端任務的執行情況」的需求。
# bind為True,會傳入self給被裝飾的方法 @celery.task(bind=True) def long_task(self): verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking'] adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast'] noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit'] message = '' total = random.randint(10, 50) for i in range(total): if not message or random.random() < 0.25: # 隨機的獲取一些信息 message = '{0} {1} {2}...'.format(random.choice(verb), random.choice(adjective), random.choice(noun)) # 更新Celery任務狀態 self.update_state(state='PROGRESS', meta={'current': i, 'total': total, 'status': message}) time.sleep(1) # 返回字典 return {'current': 100, 'total': 100, 'status': 'Task completed!', 'result': 42}
上述代碼中,celery.task () 裝飾器使用瞭 bind=True 參數,這個參數會讓 Celery 將 Celery 本身傳入,可以用於記錄與更新任務狀態。
然後就是一個 for 迭代,迭代的邏輯沒什麼意義,就是隨機從 list 中抽取一些詞匯來模擬一些邏輯的運行,為瞭表示這是耗時邏輯,通過 time.sleep (1) 休眠一秒。
每次獲取一次詞匯,就通過 self.update_state () 更新 Celery 任務的狀態,Celery 包含一些內置狀態,如 SUCCESS、STARTED 等等,這裡使用瞭自定義狀態「PROGRESS」,除瞭狀態外,還將本次循環的一些信息通過 meta 參數 (元數據) 以字典的形式存儲起來。有瞭這些數據,前端就可以顯示進度條瞭。
定義好耗時方法後,再定義一個 Flask 接口方法來調用該耗時方法
@app.route('/longtask', methods=['POST']) def longtask(): # 異步調用 task = long_task.apply_async() # 返回 202,與Location頭 return jsonify({}), 202, {'Location': url_for('taskstatus', task_id=task.id)}
簡單而言,前端通過 POST 請求到 /longtask,讓後端開始去執行耗時任務。
返回的狀態碼為 202,202 通常表示一個請求正在進行中,然後還在返回數據包的包頭 (Header) 中添加瞭 Location 頭信息,前端可以通過讀取數據包中 Header 中的 Location 的信息來獲取任務 id 對應的完整 url。
前端有瞭任務 id 對應的 url 後,還需要提供一個接口給前端,讓前端可以通過任務 id 去獲取當前時刻任務的具體狀態。
@app.route('/status/<task_id>') def taskstatus(task_id): task = long_task.AsyncResult(task_id) if task.state == 'PENDING': # 在等待 response = { 'state': task.state, 'current': 0, 'total': 1, 'status': 'Pending...' } elif task.state != 'FAILURE': # 沒有失敗 response = { 'state': task.state, # 狀態 # meta中的數據,通過task.info.get()可以獲得 'current': task.info.get('current', 0), # 當前循環進度 'total': task.info.get('total', 1), # 總循環進度 'status': task.info.get('status', '') } if 'result' in task.info: response['result'] = task.info['result'] else: # 後端執行任務出現瞭一些問題 response = { 'state': task.state, 'current': 1, 'total': 1, 'status': str(task.info), # 報錯的具體異常 } return jsonify(response)
為瞭可以獲得任務對象中的信息,使用任務 id 初始化 AsyncResult 類,獲得任務對象,然後就可以從任務對象中獲得當前任務的信息。
該方法會返回一個 JSON,其中包含瞭任務狀態以及 meta 中指定的信息,前端可以利用這些信息構建一個進度條。
如果任務在 PENDING 狀態,表示該任務還沒有開始,在這種狀態下,任務中是沒有什麼信息的,這裡人為的返回一些數據。如果任務執行失敗,就返回 task.info 中包含的異常信息,此外就是正常執行瞭,正常執行可以通 task.info 獲得任務中具體的信息。
這樣,後端的邏輯就處理完成瞭,接著就來實現前端的邏輯,要實現圖形進度條,可以直接使用 nanobar.js,簡單兩句話就可以實現一個進度條,其官網例子如下:
var options = { classname: 'my-class', id: 'my-id', // 進度條要出現的位置 target: document.getElementById('myDivId') }; // 初始化進度條對象 var nanobar = new Nanobar( options ); nanobar.go( 30 ); // 30% 進度條 nanobar.go( 76 ); // 76% 進度條 // 100% 進度條,進度條結束 nanobar.go(100);
有瞭 nanobar.js 就非常簡單瞭。
先定義一個簡單的 HTML 界面
<h2>Long running task with progress updates</h2> <button id="start-bg-job">Start Long Calculation</button><br><br> <div id="progress"></div>
通過 JavaScript 實現對後臺的請求
// 按鈕點擊事件 $(function() { $('#start-bg-job').click(start_long_task); }); // 請求 longtask 接口 function start_long_task() { // 添加元素在html中 div = $('<div class="progress"><div></div><div>0%</div><div>...</div><div> </div></div><hr>'); $('#progress').append(div); // 創建進度條對象 var nanobar = new Nanobar({ bg: '#44f', target: div[0].childNodes[0] }); // ajax請求longtask $.ajax({ type: 'POST', url: '/longtask', // 獲得數據,從響應頭中獲取Location success: function(data, status, request) { status_url = request.getResponseHeader('Location'); // 調用 update_progress() 方法更新進度條 update_progress(status_url, nanobar, div[0]); }, error: function() { alert('Unexpected error'); } }); } // 更新進度條 function update_progress(status_url, nanobar, status_div) { // getJSON()方法是JQuery內置方法,這裡向Location中對應的url發起請求,即請求「/status/<task_id>」 $.getJSON(status_url, function(data) { // 計算進度 percent = parseInt(data['current'] * 100 / data['total']); // 更新進度條 nanobar.go(percent); // 更新文字 $(status_div.childNodes[1]).text(percent + '%'); $(status_div.childNodes[2]).text(data['status']); if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') { if ('result' in data) { // 展示結果 $(status_div.childNodes[3]).text('Result: ' + data['result']); } else { // 意料之外的事情發生 $(status_div.childNodes[3]).text('Result: ' + data['state']); } } else { // 2秒後再次運行 setTimeout(function() { update_progress(status_url, nanobar, status_div); }, 2000); } }); }
可以通過註釋閱讀代碼整體邏輯。
至此,需求實現完瞭,運行一下。
首先運行 Redis
redis-server
然後運行 celery
celery worker -A app.celery --loglevel=info
最後運行 Flask 項目
python app.py
效果如下:
到此這篇關於Flask實現異步執行任務的文章就介紹到這瞭,更多相關Flask 異步內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Python異步處理返回進度——使用Flask實現進度條
- Django使用celery異步發送短信驗證碼代碼示例
- Django中如何使用celery異步發送短信驗證碼詳解
- Django中celery的使用項目實例
- 使用 Celery Once 來防止 Celery 重復執行同一個任務