Python進階之協程詳解
協程
協程(co-routine,又稱微線程)是一種多方協同的工作方式。當前執行者在某個時刻主動讓出(yield)控制流,並記住自身當前的狀態,以便在控制流返回時能從上次讓出的位置恢復(resume)執行。
簡而言之,協程的核心思想就在於執行者對控制流的 “主動讓出” 和 “恢復”。相對於,線程此類的 “搶占式調度” 而言,協程是一種 “協作式調度” 方式。
協程的應用場景
搶占式調度的缺點
在 I/O 密集型場景中,搶占式調度的解決方案是 “異步 + 回調” 機制。
其存在的問題是,在某些場景中會使得整個程序的可讀性非常差。以圖片下載為例,圖片服務中臺提供瞭異步接口,發起者請求之後立即返回,圖片服務此時給瞭發起者一個唯一標識 ID,等圖片服務完成下載後把結果放到一個消息隊列,此時需要發起者不斷消費這個 MQ 才能拿到下載是否完成的結果。
可見,整體的邏輯被拆分為瞭好幾個部分,各個子部分都會存在狀態的遷移,日後必然是 BUG 的高發地。
用戶態協同調度的優勢
而隨著網絡技術的發展和高並發要求,協程所能夠提供的用戶態協同調度機制的優勢,在網絡操作、文件操作、數據庫操作、消息隊列操作等重 I/O 操作場景中逐漸被挖掘。
協程將 I/O 的處理權從內核態的操作系統交還給用戶態的程序自身。用戶態程序在執行 I/O 時,主動的通過 yield(讓出)CPU 的執行權給其他協程,多個協程之間處於平等、對稱、合作的關系。
協程的運行原理
當程序運行時,操作系統會為每個程序分配一塊同等大小的虛擬內存空間,並將程序的代碼和所有靜態數據加載到其中。然後,創建和初始化 Stack 存儲,用於儲存程序的局部變量,函數參數和返回地址;創建和初始化 Heap 內存;創建和初始化 I/O 相關的任務。當前期準備工作完成後,操作系統將 CPU 的控制權移交給新創建的進程,進程開始運行。
一個進程可以有一個或多個線程,同一進程中的多個線程將共享該進程中的全部系統資源,如:虛擬地址空間,文件描述符和信號處理等等。但同一進程中的多個線程有各自的調用棧和線程本地存儲。
協程是一種比線程更加輕量級的存在,協程不是被操作系統內核所管理,而完全是由用戶態程序所控制。協程與線程以及進程的關系如下圖所示。可見,協程自身無法利用多核,需要配合進程來使用才可以在多核平臺上發揮作用。
- 協程之間的切換不需要涉及任何 System Call(系統調用)或任何阻塞調用。
- 協程隻在一個線程中執行,切換由用戶態控制,而線程的阻塞狀態是由操作系統內核來完成的,因此協程相比線程節省線程創建和切換的開銷。
- 協程中不存在同時寫變量的沖突,因此,也就不需要用來守衛關鍵區塊的同步性原語,比如:互斥鎖、信號量等,並且不需要來自操作系統的支持。
協程通過 “掛起點” 來主動 yield(讓出)CPU,並保存自身的狀態,等候恢復。例如:首先在 funcA 函數中執行,運行一段時間後調用協程,協程開始執行,直到第一個掛起點,此後就像普通函數一樣返回 funcA 函數。 funcA 函數執行一些代碼後再次調用該協程,註意,協程這時就和普通函數不一樣瞭。協程並不是從第一條指令開始執行而是從上一次的掛起點開始執行,執行一段時間後遇到第二個掛起點,這時協程再次像普通函數一樣返回 funcA 函數,funcA 函數執行一段時間後整個程序結束。
可見,協程之所可以能夠 “主動讓出” 和 “被恢復”,是解析器在函數運行時堆棧中保存瞭其運行的 Context(上下文)。
Python 中的協程
Python 對協程的支持經歷瞭多個版本:
- Python2.x 對協程的支持比較有限,通過 yield 關鍵字支持的生成器實現瞭一部分協程的功能但不完全。
- 第三方庫 gevent 對協程有更好的支持。
- Python3.4 中提供瞭 asyncio 模塊。
- Python3.5 中引入瞭 async/await 關鍵字。
- Python3.6 中 asyncio 模塊更加完善和穩定。
- Python3.7 中內置瞭 async/await 關鍵字。
async/await 的示例程序:
import asyncio from pathlib import Path import logging from urllib.request import urlopen, Request import os from time import time import aiohttp logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) CODEFLEX_IMAGES_URLS = ['https://codeflex.co/wp-content/uploads/2021/01/pandas-dataframe-python-1024x512.png', 'https://codeflex.co/wp-content/uploads/2021/02/github-actions-deployment-to-eks-with-kustomize-1024x536.jpg', 'https://codeflex.co/wp-content/uploads/2021/02/boto3-s3-multipart-upload-1024x536.jpg', 'https://codeflex.co/wp-content/uploads/2018/02/kafka-cluster-architecture.jpg', 'https://codeflex.co/wp-content/uploads/2016/09/redis-cluster-topology.png'] async def download_image_async(session, dir, img_url): download_path = dir / os.path.basename(img_url) async with session.get(img_url) as response: with download_path.open('wb') as f: while True: # 在 async 函數中使用 await 關鍵字表示等待 task 執行完成,也就是等待 yeild 讓出控制權。 # 同時,asyncio 使用事件循環 event_loop 來實現整個過程。 chunk = await response.content.read(512) if not chunk: break f.write(chunk) logger.info('Downloaded: ' + img_url) # 使用 async 關鍵字聲明一個異步/協程函數。 # 調用該函數時,並不會立即運行,而是返回一個協程對象,後續在 event_loop 中執行。 async def main(): images_dir = Path("codeflex_images") Path("codeflex_images").mkdir(parents=False, exist_ok=True) async with aiohttp.ClientSession() as session: tasks = [(download_image_async(session, images_dir, img_url)) for img_url in CODEFLEX_IMAGES_URLS] await asyncio.gather(*tasks, return_exceptions=True) if __name__ == '__main__': start = time() # event_loop 事件循環充當管理者的角色,將控制權在幾個協程函數之間切換。 event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete(main()) finally: event_loop.close() logger.info('Download time: %s seconds', time() - start)
總結
本篇文章就到這裡瞭,希望能夠給你帶來幫助,也希望您能夠多多關註WalkonNet的更多內容!
推薦閱讀:
- Python使用Asyncio進行web編程方法詳解
- 淺談Python協程asyncio
- python協程與 asyncio 庫詳情
- python基礎之並發編程(三)
- 詳解Python生成器和基於生成器的協程