Python進階之協程詳解

協程

協程(co-routine,又稱微線程)是一種多方協同的工作方式。當前執行者在某個時刻主動讓出(yield)控制流,並記住自身當前的狀態,以便在控制流返回時能從上次讓出的位置恢復(resume)執行。

簡而言之,協程的核心思想就在於執行者對控制流的 “主動讓出” 和 “恢復”。相對於,線程此類的 “搶占式調度” 而言,協程是一種 “協作式調度” 方式。

在這裡插入圖片描述

協程的應用場景

搶占式調度的缺點

在 I/O 密集型場景中,搶占式調度的解決方案是 “異步 + 回調” 機制。

在這裡插入圖片描述

其存在的問題是,在某些場景中會使得整個程序的可讀性非常差。以圖片下載為例,圖片服務中臺提供瞭異步接口,發起者請求之後立即返回,圖片服務此時給瞭發起者一個唯一標識 ID,等圖片服務完成下載後把結果放到一個消息隊列,此時需要發起者不斷消費這個 MQ 才能拿到下載是否完成的結果。

在這裡插入圖片描述

可見,整體的邏輯被拆分為瞭好幾個部分,各個子部分都會存在狀態的遷移,日後必然是 BUG 的高發地。

在這裡插入圖片描述

用戶態協同調度的優勢

而隨著網絡技術的發展和高並發要求,協程所能夠提供的用戶態協同調度機制的優勢,在網絡操作、文件操作、數據庫操作、消息隊列操作等重 I/O 操作場景中逐漸被挖掘。

在這裡插入圖片描述

協程將 I/O 的處理權從內核態的操作系統交還給用戶態的程序自身。用戶態程序在執行 I/O 時,主動的通過 yield(讓出)CPU 的執行權給其他協程,多個協程之間處於平等、對稱、合作的關系。

協程的運行原理

當程序運行時,操作系統會為每個程序分配一塊同等大小的虛擬內存空間,並將程序的代碼和所有靜態數據加載到其中。然後,創建和初始化 Stack 存儲,用於儲存程序的局部變量,函數參數和返回地址;創建和初始化 Heap 內存;創建和初始化 I/O 相關的任務。當前期準備工作完成後,操作系統將 CPU 的控制權移交給新創建的進程,進程開始運行。

在這裡插入圖片描述

一個進程可以有一個或多個線程,同一進程中的多個線程將共享該進程中的全部系統資源,如:虛擬地址空間,文件描述符和信號處理等等。但同一進程中的多個線程有各自的調用棧和線程本地存儲。

在這裡插入圖片描述

協程是一種比線程更加輕量級的存在,協程不是被操作系統內核所管理,而完全是由用戶態程序所控制。協程與線程以及進程的關系如下圖所示。可見,協程自身無法利用多核,需要配合進程來使用才可以在多核平臺上發揮作用。

在這裡插入圖片描述

  • 協程之間的切換不需要涉及任何 System Call(系統調用)或任何阻塞調用。
  • 協程隻在一個線程中執行,切換由用戶態控制,而線程的阻塞狀態是由操作系統內核來完成的,因此協程相比線程節省線程創建和切換的開銷。
  • 協程中不存在同時寫變量的沖突,因此,也就不需要用來守衛關鍵區塊的同步性原語,比如:互斥鎖、信號量等,並且不需要來自操作系統的支持。

協程通過 “掛起點” 來主動 yield(讓出)CPU,並保存自身的狀態,等候恢復。例如:首先在 funcA 函數中執行,運行一段時間後調用協程,協程開始執行,直到第一個掛起點,此後就像普通函數一樣返回 funcA 函數。 funcA 函數執行一些代碼後再次調用該協程,註意,協程這時就和普通函數不一樣瞭。協程並不是從第一條指令開始執行而是從上一次的掛起點開始執行,執行一段時間後遇到第二個掛起點,這時協程再次像普通函數一樣返回 funcA 函數,funcA 函數執行一段時間後整個程序結束。

在這裡插入圖片描述

可見,協程之所可以能夠 “主動讓出” 和 “被恢復”,是解析器在函數運行時堆棧中保存瞭其運行的 Context(上下文)。

在這裡插入圖片描述

Python 中的協程

Python 對協程的支持經歷瞭多個版本:

  • Python2.x 對協程的支持比較有限,通過 yield 關鍵字支持的生成器實現瞭一部分協程的功能但不完全。
  • 第三方庫 gevent 對協程有更好的支持。
  • Python3.4 中提供瞭 asyncio 模塊。
  • Python3.5 中引入瞭 async/await 關鍵字。
  • Python3.6 中 asyncio 模塊更加完善和穩定。
  • Python3.7 中內置瞭 async/await 關鍵字。

async/await 的示例程序:

import asyncio
from pathlib import Path
import logging
from urllib.request import urlopen, Request
import os
from time import time
import aiohttp
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
CODEFLEX_IMAGES_URLS = ['https://codeflex.co/wp-content/uploads/2021/01/pandas-dataframe-python-1024x512.png',
                        'https://codeflex.co/wp-content/uploads/2021/02/github-actions-deployment-to-eks-with-kustomize-1024x536.jpg',
                        'https://codeflex.co/wp-content/uploads/2021/02/boto3-s3-multipart-upload-1024x536.jpg',
                        'https://codeflex.co/wp-content/uploads/2018/02/kafka-cluster-architecture.jpg',
                        'https://codeflex.co/wp-content/uploads/2016/09/redis-cluster-topology.png']
async def download_image_async(session, dir, img_url):
    download_path = dir / os.path.basename(img_url)
    async with session.get(img_url) as response:
        with download_path.open('wb') as f:
            while True:
                # 在 async 函數中使用 await 關鍵字表示等待 task 執行完成,也就是等待 yeild 讓出控制權。
                # 同時,asyncio 使用事件循環 event_loop 來實現整個過程。
                chunk = await response.content.read(512)
                if not chunk:
                    break
                f.write(chunk)
    logger.info('Downloaded: ' + img_url)
# 使用 async 關鍵字聲明一個異步/協程函數。
# 調用該函數時,並不會立即運行,而是返回一個協程對象,後續在 event_loop 中執行。
async def main():
    images_dir = Path("codeflex_images")
    Path("codeflex_images").mkdir(parents=False, exist_ok=True)
    async with aiohttp.ClientSession() as session:
        tasks = [(download_image_async(session, images_dir, img_url)) for img_url in CODEFLEX_IMAGES_URLS]
        await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == '__main__':
    start = time()
    # event_loop 事件循環充當管理者的角色,將控制權在幾個協程函數之間切換。
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main())
    finally:
        event_loop.close()
    logger.info('Download time: %s seconds', time() - start)

總結

本篇文章就到這裡瞭,希望能夠給你帶來幫助,也希望您能夠多多關註WalkonNet的更多內容!

推薦閱讀: