詳解python之異步編程

一、異步編程概述

異步編程是一種並發編程的模式,其關註點是通過調度不同任務之間的執行和等待時間,通過減少處理器的閑置時間來達到減少整個程序的執行時間;異步編程跟同步編程模型最大的不同就是其任務的切換,當遇到一個需要等待長時間執行的任務的時候,我們可以切換到其他的任務執行;

與多線程和多進程編程模型相比,異步編程隻是在同一個線程之內的的任務調度,無法充分利用多核CPU的優勢,所以特別適合IO阻塞性任務;

python 版本 3.9.5

二、python的異步框架模型

python提供瞭asyncio模塊來支持異步編程,其中涉及到coroutinesevent loopsfutures三個重要概念;

event loops主要負責跟蹤和調度所有異步任務,編排具體的某個時間點執行的任務;

coroutines是對具體執行任務的封裝,是一個可以在執行中暫停並切換到event loops執行流程的特殊類型的函數;其一般還需要創建task才能被event loops調度;

futures負責承載coroutines的執行結果,其隨著任務在event loops中的初始化而創建,並隨著任務的執行來記錄任務的執行狀態;

異步編程框架的整個執行過程涉及三者的緊密協作;

首先事件循環啟動之後,會從任務隊列獲取第一個要執行的coroutine,並隨之創建對應task和future;

然後隨著task的執行,當遇到coroutine內部需要切換任務的地方,task的執行就會暫停並釋放執行線程給event loop,event loop接著會獲取下一個待執行的coroutine,並進行相關的初始化之後,執行這個task;

隨著event loop執行完隊列中的最後一個coroutine才會切換到第一個coroutine;

隨著task的執行結束,event loops會將task清除出隊列,對應的執行結果會同步到future中,這個過程會持續到所有的task執行結束;

三、順序執行多個可重疊的任務

每個任務執行中間會暫停給定的時間,循序執行的時間就是每個任務執行的時間加和;

import time
def count_down(name, delay):
    indents = (ord(name) - ord('A')) * '\t'
    n = 3
    while n:
        time.sleep(delay)
        duration = time.perf_counter() - start
        print('-' * 40)
        print(f'{duration:.4f} \t{indents}{name} = {n}')
        n -= 1
start = time.perf_counter()
count_down('A', 1)
count_down('B', 0.8)
count_down('C', 0.5)
print('-' * 40)
print('Done')
# ----------------------------------------
# 1.0010 	A = 3
# ----------------------------------------
# 2.0019 	A = 2
# ----------------------------------------
# 3.0030 	A = 1
# ----------------------------------------
# 3.8040 		B = 3
# ----------------------------------------
# 4.6050 		B = 2
# ----------------------------------------
# 5.4059 		B = 1
# ----------------------------------------
# 5.9065 			C = 3
# ----------------------------------------
# 6.4072 			C = 2
# ----------------------------------------
# 6.9078 			C = 1
# ----------------------------------------
# Done

四、異步化同步代碼

python在語法上提供瞭asyncawait兩個關鍵字來簡化將同步代碼修改為異步;

async使用在函數的def關鍵字前邊,標記這是一個coroutine函數;

await用在conroutine裡邊,用於標記需要暫停釋放執行流程給event loops;

await 後邊的表達式需要返回waitable的對象,例如conroutine、task、future等;

asyncio模塊主要提供瞭操作event loop的方式;

我們可以通過async將count_down標記為coroutine,然後使用await和asyncio.sleep來實現異步的暫停,從而將控制權交給event loop;

async def count_down(name, delay, start):
    indents = (ord(name) - ord('A')) * '\t'
    n = 3
    while n:
        await asyncio.sleep(delay)
        duration = time.perf_counter() - start
        print('-' * 40)
        print(f'{duration:.4f} \t{indents}{name} = {n}')
        n -= 1

我們定義一個異步的main方法,主要完成task的創建和等待任務執行結束;

async def main():
    start = time.perf_counter()
    tasks = [asyncio.create_task(count_down(name,delay,start)) for name, delay in [('A', 1),('B', 0.8),('C', 0.5)]]
    await asyncio.wait(tasks)
    print('-' * 40)
    print('Done')

執行我們可以看到時間已經變為瞭執行時間最長的任務的時間瞭;

asyncio.run(main())
# ----------------------------------------
# 0.5010 			C = 3
# ----------------------------------------
# 0.8016 		B = 3
# ----------------------------------------
# 1.0011 	A = 3
# ----------------------------------------
# 1.0013 			C = 2
# ----------------------------------------
# 1.5021 			C = 1
# ----------------------------------------
# 1.6026 		B = 2
# ----------------------------------------
# 2.0025 	A = 2
# ----------------------------------------
# 2.4042 		B = 1
# ----------------------------------------
# 3.0038 	A = 1
# ----------------------------------------
# Done

五、使用多線程克服具體任務的異步限制

異步編程要求具體的任務必須是coroutine,也就是要求方法是異步的,否則隻有任務執行完瞭,才能將控制權釋放給event loop;

python中的concurent.futures提供瞭ThreadPoolExecutor和ProcessPoolExecutor,可以直接在異步編程中使用,從而可以在單獨的線程或者進程至今任務;

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
def count_down(name, delay, start):
    indents = (ord(name) - ord('A')) * '\t'
    n = 3
    while n:
        time.sleep(delay)
        duration = time.perf_counter() - start
        print('-'*40)
        print(f'{duration:.4f} \t{indents}{name} = {n}')
        n -=1
async def main():
    start = time.perf_counter()
    loop = asyncio.get_running_loop()
    executor = ThreadPoolExecutor(max_workers=3)
    fs = [
       loop.run_in_executor(executor, count_down, *args)  for args in [('A', 1, start), ('B', 0.8, start), ('C', 0.5, start)]
    ]
    await asyncio.wait(fs)
    print('-'*40)
    print('Done.')
asyncio.run(main())
# ----------------------------------------
# 0.5087 			C = 3
# ----------------------------------------
# 0.8196 		B = 3
# ----------------------------------------
# 1.0073 	A = 3
# ----------------------------------------
# 1.0234 			C = 2
# ----------------------------------------
# 1.5350 			C = 1
# ----------------------------------------
# 1.6303 		B = 2
# ----------------------------------------
# 2.0193 	A = 2
# ----------------------------------------
# 2.4406 		B = 1
# ----------------------------------------
# 3.0210 	A = 1
# ----------------------------------------
# Done.
 

總結

本篇文章就到這裡瞭,希望能夠給你帶來幫助,也希望您能夠多多關註WalkonNet的更多內容!

推薦閱讀: