Python利用contextvars實現管理上下文變量

Python 在 3.7 的時候引入瞭一個模塊:contextvars,從名字上很容易看出它指的是上下文變量(Context Variables),所以在介紹 contextvars 之前我們需要先瞭解一下什麼是上下文(Context)。

Context 是一個包含瞭相關信息內容的對象,舉個例子:"比如一部 13 集的動漫,你直接點進第八集,看到女主角在男主角面前流淚瞭"。相信此時你是不知道為什麼女主角會流淚的,因為你沒有看前面幾集的內容,缺失瞭相關的上下文信息。

所以 Context 並不是什麼神奇的東西,它的作用就是攜帶一些指定的信息。

web 框架中的 request

我們以 fastapi 和 sanic 為例,看看當一個請求過來的時候,它們是如何解析的。

# fastapi
from fastapi import FastAPI, Request
import uvicorn

app = FastAPI()


@app.get("/index")
async def index(request: Request):
    name = request.query_params.get("name")
    return {"name": name}


uvicorn.run("__main__:app", host="127.0.0.1", port=5555)

# -------------------------------------------------------

# sanic
from sanic import Sanic
from sanic.request import Request
from sanic import response

app = Sanic("sanic")


@app.get("/index")
async def index(request: Request):
    name = request.args.get("name")
    return response.json({"name": name})


app.run(host="127.0.0.1", port=6666)

發請求測試一下,看看結果是否正確。

可以看到請求都是成功的,並且對於 fastapi 和 sanic 而言,其 request 和 視圖函數是綁定在一起的。也就是在請求到來的時候,會被封裝成一個 Request 對象、然後傳遞到視圖函數中。

但對於 flask 而言則不是這樣子的,我們看一下 flask 是如何接收請求參數的。

from flask import Flask, request

app = Flask("flask")


@app.route("/index")
def index():
    name = request.args.get("name")
    return {"name": name}


app.run(host="127.0.0.1", port=7777)

我們看到對於 flask 而言則是通過 import request 的方式,如果不需要的話就不用 import,當然我這裡並不是在比較哪種方式好,主要是為瞭引出我們今天的主題。首先對於 flask 而言,如果我再定義一個視圖函數的話,那麼獲取請求參數依舊是相同的方式,但是這樣問題就來瞭,不同的視圖函數內部使用同一個 request,難道不會發生沖突嗎?

顯然根據我們使用 flask 的經驗來說,答案是不會的,至於原因就是 ThreadLocal。

ThreadLocal

ThreadLocal,從名字上看可以得出它肯定是和線程相關的。沒錯,它專門用來創建局部變量,並且創建的局部變量是和線程綁定的。

import threading

# 創建一個 local 對象
local = threading.local()

def get():
    name = threading.current_thread().name
    # 獲取綁定在 local 上的 value
    value = local.value
    print(f"線程: {name}, value: {value}")

def set_():
    name = threading.current_thread().name
    # 為不同的線程設置不同的值
    if name == "one":
        local.value = "ONE"
    elif name == "two":
        local.value = "TWO"
    # 執行 get 函數
    get()

t1 = threading.Thread(target=set_, name="one")
t2 = threading.Thread(target=set_, name="two")
t1.start()
t2.start()
"""
線程 one, value: ONE
線程 two, value: TWO
"""

可以看到兩個線程之間是互不影響的,因為每個線程都有自己唯一的 id,在綁定值的時候會綁定在當前的線程中,獲取也會從當前的線程中獲取。可以把 ThreadLocal 想象成一個字典:

{
    "one": {"value": "ONE"},
    "two": {"value": "TWO"}
}

更準確的說 key 應該是線程的 id,為瞭直觀我們就用線程的 name 代替瞭,但總之在獲取的時候隻會獲取綁定在該線程上的變量的值。

而 flask 內部也是這麼設計的,隻不過它沒有直接用 threading.local,而是自己實現瞭一個 Local 類,除瞭支持線程之外還支持 greenlet 的協程,那麼它是怎麼實現的呢?首先我們知道 flask 內部存在 "請求 context" 和 "應用 context",它們都是通過棧來維護的(兩個不同的棧)。

# flask/globals.py
_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
current_app = LocalProxy(_find_app)
request = LocalProxy(partial(_lookup_req_object, "request"))
session = LocalProxy(partial(_lookup_req_object, "session"))

每個請求都會綁定在當前的 Context 中,等到請求結束之後再銷毀,這個過程由框架完成,開發者隻需要直接使用 request 即可。所以請求的具體細節流程可以點進源碼中查看,這裡我們重點關註一個對象:werkzeug.local.Local,也就是上面說的 Local 類,它是變量的設置和獲取的關鍵。直接看部分源碼:

# werkzeug/local.py

class Local(object):
    __slots__ = ("__storage__", "__ident_func__")

    def __init__(self):
        # 內部有兩個成員:__storage__ 是一個字典,值就存在這裡面
        # __ident_func__ 隻需要知道它是用來獲取線程 id 的即可
        object.__setattr__(self, "__storage__", {})
        object.__setattr__(self, "__ident_func__", get_ident)

    def __call__(self, proxy):
        """Create a proxy for a name."""
        return LocalProxy(self, proxy)

    def __release_local__(self):
        self.__storage__.pop(self.__ident_func__(), None)

    def __getattr__(self, name):
        try:
            # 根據線程 id 得到 value(一個字典)
            # 然後再根據 name 獲取對應的值
            # 所以隻會獲取綁定在當前線程上的值
            return self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)

    def __setattr__(self, name, value):
        ident = self.__ident_func__()
        storage = self.__storage__
        try:
            # 將線程 id 作為 key,然後將值設置在對應的字典中
            # 所以隻會將值設置在當前的線程中
            storage[ident][name] = value
        except KeyError:
            storage[ident] = {name: value}

    def __delattr__(self, name):
        # 刪除邏輯也很簡單
        try:
            del self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)

所以我們看到 flask 內部的邏輯其實很簡單,通過 ThreadLocal 實現瞭線程之間的隔離。每個請求都會綁定在各自的 Context 中,獲取值的時候也會從各自的 Context 中獲取,因為它就是用來保存相關信息的(重要的是同時也實現瞭隔離)。

相應此刻你已經理解瞭上下文,但是問題來瞭,不管是 threading.local 也好、還是類似於 flask 自己實現的 Local 也罷,它們都是針對線程的。如果是使用 async def 定義的協程該怎麼辦呢?如何實現每個協程的上下文隔離呢?所以終於引出瞭我們的主角:contextvars。

contextvars

該模塊提供瞭一組接口,可用於在協程中管理、設置、訪問局部 Context 的狀態。

import asyncio
import contextvars

c = contextvars.ContextVar("隻是一個標識, 用於調試")

async def get():
    # 獲取值
    return c.get() + "~~~"

async def set_(val):
    # 設置值
    c.set(val)
    print(await get())

async def main():
    coro1 = set_("協程1")
    coro2 = set_("協程2")
    await asyncio.gather(coro1, coro2)


asyncio.run(main())
"""
協程1~~~
協程2~~~
"""

ContextVar 提供瞭兩個方法,分別是 get 和 set,用於獲取值和設置值。我們看到效果和 ThreadingLocal 類似,數據在協程之間是隔離的,不會受到彼此的影響。

但我們再仔細觀察一下,我們是在 set_ 函數中設置的值,然後在 get 函數中獲取值。可 await get() 相當於是開啟瞭一個新的協程,那麼意味著設置值和獲取值不是在同一個協程當中。但即便如此,我們依舊可以獲取到希望的結果。因為 Python 的協程是無棧協程,通過 await 可以實現級聯調用。

我們不妨再套一層:

import asyncio
import contextvars

c = contextvars.ContextVar("隻是一個標識, 用於調試")

async def get1():
    return await get2()

async def get2():
    return c.get() + "~~~"

async def set_(val):
    # 設置值
    c.set(val)
    print(await get1())
    print(await get2())

async def main():
    coro1 = set_("協程1")
    coro2 = set_("協程2")
    await asyncio.gather(coro1, coro2)


asyncio.run(main())
"""
協程1~~~
協程1~~~
協程2~~~
協程2~~~
"""

我們看到不管是 await get1() 還是 await get2(),得到的都是 set_ 中設置的結果,說明它是可以嵌套的。

並且在這個過程當中,可以重新設置值。

import asyncio
import contextvars

c = contextvars.ContextVar("隻是一個標識, 用於調試")

async def get1():
    c.set("重新設置")
    return await get2()

async def get2():
    return c.get() + "~~~"

async def set_(val):
    # 設置值
    c.set(val)
    print("------------")
    print(await get2())
    print(await get1())
    print(await get2())
    print("------------")

async def main():
    coro1 = set_("協程1")
    coro2 = set_("協程2")
    await asyncio.gather(coro1, coro2)


asyncio.run(main())
"""
------------
協程1~~~
重新設置~~~
重新設置~~~
------------
------------
協程2~~~
重新設置~~~
重新設置~~~
------------
"""

先 await get2() 得到的就是 set_ 函數中設置的值,這是符合預期的。但是我們在 get1 中將值重新設置瞭,那麼之後不管是 await get1() 還是直接 await get2(),得到的都是新設置的值。

這也說明瞭,一個協程內部 await 另一個協程,另一個協程內部 await 另另一個協程,不管套娃(await)多少次,它們獲取的值都是一樣的。並且在任意一個協程內部都可以重新設置值,然後獲取會得到最後一次設置的值。再舉個栗子:

import asyncio
import contextvars

c = contextvars.ContextVar("隻是一個標識, 用於調試")

async def get1():
    return await get2()

async def get2():
    val = c.get() + "~~~"
    c.set("重新設置啦")
    return val

async def set_(val):
    # 設置值
    c.set(val)
    print(await get1())
    print(c.get())

async def main():
    coro = set_("古明地覺")
    await coro

asyncio.run(main())
"""
古明地覺~~~
重新設置啦
"""

await get1() 的時候會執行 await get2(),然後在裡面拿到 c.set 設置的值,打印 "古明地覺~~~"。但是在 get2 裡面,又將值重新設置瞭,所以第二個 print 打印的就是新設置的值。\

如果在 get 之前沒有先 set,那麼會拋出一個 LookupError,所以 ContextVar 支持默認值:

import asyncio
import contextvars

c = contextvars.ContextVar("隻是一個標識, 用於調試",
                           default="哼哼")

async def set_(val):
    print(c.get())
    c.set(val)
    print(c.get())

async def main():
    coro = set_("古明地覺")
    await coro

asyncio.run(main())
"""
哼哼
古明地覺
"""

除瞭在 ContextVar 中指定默認值之外,也可以在 get 中指定:

import asyncio
import contextvars

c = contextvars.ContextVar("隻是一個標識, 用於調試",
                           default="哼哼")

async def set_(val):
    print(c.get("古明地戀"))
    c.set(val)
    print(c.get())

async def main():
    coro = set_("古明地覺")
    await coro

asyncio.run(main())
"""
古明地戀
古明地覺
"""

所以結論如下,如果在 c.set 之前使用 c.get:

  • 當 ContextVar 和 get 中都沒有指定默認值,會拋出 LookupError;
  • 隻要有一方設置瞭,那麼會得到默認值;
  • 如果都設置瞭,那麼以 get 為準;

如果 c.get 之前執行瞭 c.set,那麼無論 ContextVar 和 get 有沒有指定默認值,獲取到的都是 c.set 設置的值。

所以總的來說還是比較好理解的,並且 ContextVar 除瞭可以作用在協程上面,它也可以用在線程上面。沒錯,它可以替代 threading.local,我們來試一下:

import threading
import contextvars

c = contextvars.ContextVar("context_var")

def get():
    name = threading.current_thread().name
    value = c.get()
    print(f"線程 {name}, value: {value}")

def set_():
    name = threading.current_thread().name
    if name == "one":
        c.set("ONE")
    elif name == "two":
        c.set("TWO")
    get()

t1 = threading.Thread(target=set_, name="one")
t2 = threading.Thread(target=set_, name="two")
t1.start()
t2.start()
"""
線程 one, value: ONE
線程 two, value: TWO
"""

和 threading.local 的表現是一樣的,但是更建議使用 ContextVars。不過前者可以綁定任意多個值,而後者隻能綁定一個值(可以通過傳遞字典的方式解決這一點)。

c.Token

當我們調用 c.set 的時候,其實會返回一個 Token 對象:

import contextvars

c = contextvars.ContextVar("context_var")
token = c.set("val")
print(token)
"""
<Token var=<ContextVar name='context_var' at 0x00..> at 0x00...>
"""

Token 對象有一個 var 屬性,它是隻讀的,會返回指向此 token 的 ContextVar 對象。

import contextvars

c = contextvars.ContextVar("context_var")
token = c.set("val")

print(token.var is c)  # True
print(token.var.get())  # val

print(
    token.var.set("val2").var.set("val3").var is c
)  # True
print(c.get())  # val3

Token 對象還有一個 old_value 屬性,它會返回上一次 set 設置的值,如果是第一次 set,那麼會返回一個 <Token.MISSING>。

import contextvars

c = contextvars.ContextVar("context_var")
token = c.set("val")

# 該 token 是第一次 c.set 所返回的
# 在此之前沒有 set,所以 old_value 是 <Token.MISSING>
print(token.old_value)  # <Token.MISSING>

token = c.set("val2")
print(c.get())  # val2
# 返回上一次 set 的值
print(token.old_value)  # val

那麼這個 Token 對象有什麼作用呢?從目前來看貌似沒太大用處啊,其實它最大的用處就是和 reset 搭配使用,可以對狀態進行重置。

import contextvars
#### 
c = contextvars.ContextVar("context_var")
token = c.set("val")
# 顯然是可以獲取的
print(c.get())  # val

# 將其重置為 token 之前的狀態
# 但這個 token 是第一次 set 返回的
# 那麼之前就相當於沒有 set 瞭
c.reset(token)
try:
    c.get()  # 此時就會報錯
except LookupError:
    print("報錯啦")  # 報錯啦

# 但是我們可以指定默認值
print(c.get("默認值"))  # 默認值

contextvars.Context

它負責保存 ContextVars 對象和設置的值之間的映射,但是我們不會直接通過 contextvars.Context 來創建,而是通過 contentvars.copy_context 函數來創建。

import contextvars

c1 = contextvars.ContextVar("context_var1")
c1.set("val1")
c2 = contextvars.ContextVar("context_var2")
c2.set("val2")

# 此時得到的是所有 ContextVar 對象和設置的值之間的映射
# 它實現瞭 collections.abc.Mapping 接口
# 因此我們可以像操作字典一樣操作它
context = contextvars.copy_context()
# key 就是對應的 ContextVar 對象,value 就是設置的值
print(context[c1])  # val1
print(context[c2])  # val2
for ctx, value in context.items():
    print(ctx.get(), ctx.name, value)
    """
    val1 context_var1 val1
    val2 context_var2 val2
    """

print(len(context))  # 2

除此之外,context 還有一個 run 方法:

import contextvars

c1 = contextvars.ContextVar("context_var1")
c1.set("val1")
c2 = contextvars.ContextVar("context_var2")
c2.set("val2")

context = contextvars.copy_context()

def change(val1, val2):
    c1.set(val1)
    c2.set(val2)
    print(c1.get(), context[c1])
    print(c2.get(), context[c2])

# 在 change 函數內部,重新設置值
# 然後裡面打印的也是新設置的值
context.run(change, "VAL1", "VAL2")
"""
VAL1 VAL1
VAL2 VAL2
"""

print(c1.get(), context[c1])
print(c2.get(), context[c2])
"""
val1 VAL1
val2 VAL2
"""

我們看到 run 方法接收一個 callable,如果在裡面修改瞭 ContextVar 實例設置的值,那麼對於 ContextVar 而言隻會在函數內部生效,一旦出瞭函數,那麼還是原來的值。但是對於 Context 而言,它是會受到影響的,即便出瞭函數,也是新設置的值,因為它直接把內部的字典給修改瞭。

小結

以上就是 contextvars 模塊的用法,在多個協程之間傳遞數據是非常方便的,並且也是並發安全的。如果你用過 Go 的話,你應該會發現和 Go 在 1.7 版本引入的 context 模塊比較相似,當然 Go 的 context 模塊功能要更強大一些,除瞭可以傳遞數據之外,對多個 goroutine 的級聯管理也提供瞭非常清蒸的解決方案。

總之對於 contextvars 而言,它傳遞的數據應該是多個協程之間需要共享的數據,像 cookie, session, token 之類的,比如上遊接收瞭一個 token,然後不斷地向下透傳。但是不要把本應該作為函數參數的數據,也通過 contextvars 來傳遞,這樣就有點本末倒置瞭。

到此這篇關於Python利用contextvars實現管理上下文變量的文章就介紹到這瞭,更多相關Python contextvars管理變量內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: