一文詳解Python中的重試機制
介紹
為瞭避免由於一些網絡或等其他不可控因素,而引起的功能性問題。比如在發送請求時,會因為網絡不穩定,往往會有請求超時的問題。
這種情況下,我們通常會在代碼中加入重試的代碼。重試的代碼本身不難實現,但如何寫得優雅、易用,是我們要考慮的問題。
這裡要給大傢介紹的是一個第三方庫 – Tenacity (標題中的重試機制並並不準確,它不是 Python 的內置模塊,因此並不能稱之為機制),它實現瞭幾乎我們可以使用到的所有重試場景,喜歡記得收藏、關註、點贊。
比如:
在什麼情況下才進行重試?
重試幾次呢?
重試多久後結束?
每次重試的間隔多長呢?
重試失敗後的回調?
在使用它之前 ,先要安裝它
$ pip install tenacity
1. 最基本的重試
無條件重試,重試之間無間隔
from tenacity import retry @retry def test_retry(): print("等待重試,重試無間隔執行...") raise Exception test_retry()
無條件重試,但是在重試之前要等待 2 秒
from tenacity import retry, wait_fixed @retry(wait=wait_fixed(2)) def test_retry(): print("等待重試...") raise Exception test_retry()
2. 設置停止基本條件
隻重試7 次
from tenacity import retry, stop_after_attempt @retry(stop=stop_after_attempt(7)) def test_retry(): print("等待重試...") raise Exception test_retry()
重試 10 秒後不再重試
from tenacity import retry, stop_after_delay @retry(stop=stop_after_delay(10)) def test_retry(): print("等待重試...") raise Exception test_retry()
或者上面兩個條件滿足一個就結束重試
from tenacity import retry, stop_after_delay, stop_after_attempt @retry(stop=(stop_after_delay(10) | stop_after_attempt(7))) def test_retry(): print("等待重試...") raise Exception test_retry()
3. 設置何時進行重試
在出現特定錯誤/異常(比如請求超時)的情況下,再進行重試
from requests import exceptions from tenacity import retry, retry_if_exception_type @retry(retry=retry_if_exception_type(exceptions.Timeout)) def test_retry(): print("等待重試...") raise exceptions.Timeout test_retry()
在滿足自定義條件時,再進行重試。
如下示例,當 test_retry 函數返回值為 False 時,再進行重試
from tenacity import retry, stop_after_attempt, retry_if_result def is_false(value): return value is False @retry(stop=stop_after_attempt(3), retry=retry_if_result(is_false)) def test_retry(): return False test_retry()
4. 重試後錯誤重新拋出
當出現異常後,tenacity 會進行重試,若重試後還是失敗,默認情況下,往上拋出的異常會變成 RetryError,而不是最根本的原因。
因此可以加一個參數(reraise=True),使得當重試失敗後,往外拋出的異常還是原來的那個。
from tenacity import retry, stop_after_attempt @retry(stop=stop_after_attempt(7), reraise=True) def test_retry(): print("等待重試...") raise Exception test_retry()
5. 設置回調函數
當最後一次重試失敗後,可以執行一個回調函數
from tenacity import * def return_last_value(retry_state): print("執行回調函數") return retry_state.outcome.result() # 表示返回原函數的返回值 def is_false(value): return value is False @retry(stop=stop_after_attempt(3), retry_error_callback=return_last_value, retry=retry_if_result(is_false)) def test_retry(): print("等待重試中...") return False print(test_retry())
輸出如下
等待重試中…
等待重試中…
等待重試中…
執行回調函數
False
到此這篇關於一文詳解Python中的重試機制的文章就介紹到這瞭,更多相關Python重試機制內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!