Python並發編程之未來模塊Futures

不論是哪一種語言,並發編程都是一項非常重要的技巧。比如我們上一章用的爬蟲,就被廣泛用在工業的各個領域。我們每天在各個網站、App上獲取的新聞信息,很大一部分都是通過並發編程版本的爬蟲獲得的。

正確並合理的使用並發編程,無疑會給我們的程序帶來極大性能上的提升。今天我們就一起學習Python中的並發編程——Futures。

區分並發和並行

我們在學習並發編程時,常常會聽到兩個詞:並發(Concurrency)和並行(Parallelism)這兩個術語。這兩者經常一起使用,導致很多人以為他們是一個意思,其實是不對的。

首先要辨別一個誤區,在Python中,並發並不是隻同一時刻上右多個操作(thread或者task)同時進行。相反,在某個特定的時刻上它隻允許有一個操作的發生,隻不過線程或任務之間會相互切換直到完成,就像下面的圖裡表達的

在上圖中出現瞭task和thread兩種切換順序的不同方式。分別對應瞭Python中並發兩種形式——threading和asyncio。

對於線程,操作系統知道每個線程的所有信息,因此他會做主在適當的時候做線程切換,這樣的好處就是代碼容易編寫,因為程序員不需要做任何切換操作的處理;但是切換線程的操作,有可能出現在一個語句的執行過程中( 比如X+=1),這樣比較容易出現race condiiton的情況。

而對於asyncio,主程序想要切換任務的時候必須得到此任務可以被切換的通知,這樣一來就可以避免出現上面的race condition的情況。

至於所謂的並行,隻在同一時刻、同時發生。Python中的multi-Processing便是這個意思對應多進程,我們可以這麼簡單的理解,如果我們的電腦是8核的CPU,那麼在運行程序時,我們可以強制Python開啟8個進程,同時執行,用以加快程序的運行速度。大概是下面這個圖的思路

對比看來,並發通常用於I/O操作頻繁的場景。比方我們要從網站上下載多個文件,由於I/O操作的時間要比CPU操作的時長多的多,這時並發就比較適合。而在CPU使用比較heavy的場景中,為瞭加快運行速度,我們會多用幾臺機器,讓多個處理器來運算。

還記得以前寫瞭個博客總結過:在Python中的多線程是依靠CPU切換上下文實現的一種“偽多線程”,在進行大量線程切換過程中會占用比較多的CPU資源,而在進行IO操作時候(不論是在網絡上進行數據交互還是從內存、硬盤上讀寫數據)是不需要CPU進行計算的。所以多線程隻適用於IO操作密集的環境,不適用於計算密集型操作。

並發編程之Futures

單線程於多線程性能比較

我們下面通過一個實例,從代碼的角度來理解並發編程中的Futures,並進一步比較其於單線程的性能區別

假設我們有個任務,從網站上下載一些內容然後打印出來,如果用單線程的方式是這樣實現的

import requests
import time
def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content),url))
def download_all(urls):
    for url in urls:
        download_one(url)
def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society', 
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)' 
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites),end_time-start_time))
if __name__ == '__main__':
    main()

這是種最簡單暴力最直接的方式:

先遍歷存儲網站的列表

對當前的網站進行下載操作

當前操作完成後,再對下一個網站進行同樣的操作,一直到結束。

可以試出來總耗時大概是2s多,單線程的方式簡單明瞭,但是最大的問題是效率低下,程序最大的時間都消耗在I/O等待上(這還是用的print,如果是寫在硬盤上的話時間會更多)。如果在實際生產環境中,我們需要訪問的網站至少是以萬為單位的,所以這個方案根本行不通。

接著我們看看多線程版本的代碼

import concurrent.futures
import requests
import threading
import time
def download_one(url):
    resp = requests.get(url).content
    print('Read {} from {}'.format(len(resp),url))
def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_one,sites)
def main():
    sites = [
    'https://en.wikipedia.org/wiki/Portal:Arts',
    'https://en.wikipedia.org/wiki/Portal:History',
    'https://en.wikipedia.org/wiki/Portal:Society', 
    'https://en.wikipedia.org/wiki/Portal:Biography',
    'https://en.wikipedia.org/wiki/Portal:Mathematics',
    'https://en.wikipedia.org/wiki/Portal:Technology',
    'https://en.wikipedia.org/wiki/Portal:Geography',
    'https://en.wikipedia.org/wiki/Portal:Science',
    'https://en.wikipedia.org/wiki/Computer_science',
    'https://en.wikipedia.org/wiki/Python_(programming_language)',
    'https://en.wikipedia.org/wiki/Java_(programming_language)',
    'https://en.wikipedia.org/wiki/PHP',
    'https://en.wikipedia.org/wiki/Node.js',
    'https://en.wikipedia.org/wiki/The_C_Programming_Language',
    'https://en.wikipedia.org/wiki/Go_(programming_language)' 
    ]
    start_time = time.perf_counter()
    download_all(sites)
    # for i in sites:
    end_time = time.perf_counter()
    # print('Down {} sites in {} seconds'.format(len(sites),end_time-start_time))
if __name__ == '__main__':
    main()

這段代碼的運行時長大概是0.2s,效率一下提升瞭10倍多,可以註意到這個版本和單線程的區別主要在下面:

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_one,sites)

在上面的代碼中我們創建瞭一個線程池,有5個線程可以分配使用。executer.map()與以前將的Python內置的map()函數,表示對sites中的每一個元素並發的調用函數download_one()函數。

順便提一下,在download_one()函數中,我們使用的requests.get()方法是線程安全的(thread-safe),因此在多線程的環境下,它也可以安全使用,並不會出現race condition(條件競爭)的情況。

另外,雖然線程的數量可以自己定義,但是線程數並不是越多越好,以為線程的創建、維護和刪除也需要一定的開銷。所以如果設置的很大,反而會導致速度變慢,我們往往要根據實際的需求做一些測試,來尋找最優的線程數量。

當然,我們也可以用並行的方式去提高運行效率,隻需要在download_all()函數中做出下面的變化即可

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        to_do = []
        for site in sites:
            future = executor.submit(download_one,site)
            to_do.append(site)

        for future in concurrent.futures.as_completed(to_do):
            future.result()

在需要改的這部分代碼中,函數ProcessPoolExecutor()表示創建進程池,使用多個進程並行的執行程序。不過,這裡 通常省略參數workers,因為系統會自動返回CPU的數量作為可以調用的進程數。

就像上面說的,並行方式一般用在CPU密集型的場景中,因為對於I/O密集型操作多數時間會用於等待,相比於多線程,使用多進程並不會提升效率,反而很多時候,因為CPU數量的限制,會導致執行效率不如多線程版本。

到底什麼是Futures?

Python中的Futures,位於concurrent.futures和asyncio中,他們都表示帶有延遲的操作,Futures會將處於等待狀態的操作包裹起來放到隊列中,這些操作的狀態可以隨時查詢。而他們的結果或是異常,也能在操作後被獲取。

通常,作為用戶,我們不用考慮如何去創建Futures,這些Futures底層會幫我們處理好,我們要做的就是去schedule這些Futures的執行。比方說,Futures中的Executor類,當我們中的方法done(),表示相對應的操作是否完成——用True表示已完成,ongFalse表示未完成。不過,要註意的是done()是non-blocking的,會立刻返回結果,相對應的add_done_callback(fn),則表示Futures完成後,相對應的參數fn,會被通知並執行調用。

Futures裡還有一個非常重要的函數result(),用來表示future完成後,返回器對應的結果或異常。而as_completed(fs),則是針對給定的future迭代器fs,在其完成後,返回完成後的迭代器。

所以也可以把上面的例子寫成下面的形式:

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        to_do = []
        for site in sites:
            future = executor.submit(download_one,site)
            to_do.append(site)
        for future in concurrent.futures.as_completed(to_do):
            future.result()

這裡,我們首先用executor.submit(),將下載每個網站的內容都放進future隊列to_do裡等待執行。然後是as_completed()函數,在future完成後輸出結果

不過這裡有個事情要註意一下:future列表中每個future完成的順序和他在列表中的順序不一定一致,至於哪個先完成,取決於系統的調度和每個future的執行時間。

為什麼多線程每次隻有一個線程執行?

前面我們講過,在一個時刻下,Python主程序隻允許有一個線程執行,所以Python的並發,是通過多線程的切換完成的,這是為什麼呢?

這就又和以前講的知識串聯到一起瞭——GIL(全局解釋器鎖),這裡在復習下:

事實上,Python的解釋器並不是線程安全的,為瞭解決由此帶來的race condition等問題,Python就引入瞭GIL,也就是在同一個時刻,隻允許一個線程執行。當然,在進行I/O操作是,如果一個線程被block瞭,GIL就會被釋放,從而讓另一個線程能夠繼續執行。

總結

這節課裡我們先學習瞭Python中並發和並行的概念

並發——通過線程(thread)和任務(task)之間相互切換的方式實現,但是同一時刻,隻允許有一個線程或任務執行

並行——多個進程同時進行。

並發通常用於I/O頻繁操作的場景,而並行則適用於CPU heavy的場景

隨後我們通過一個下載網站內容的例子,比較瞭單線程和運用FUtures的多線程版本的性能差異,顯而易見,合理的運用多線程,能夠極大的提高程序運行效率。

我們還大致瞭解瞭Futures的方式,介紹瞭一些常用的函數,並輔以實例加以理解。

要註意,Python中之所以同一時刻隻允許一個線程運行,其實是由於GIL的存在。但是對於I/O操作而言,當其被block的時候,GIL會被釋放,使其他線程繼續執行。

以上就是Python並發編程之未來模塊Futures的詳細內容,更多關於Python並發未來模塊Futures的資料請關註WalkonNet其它相關文章!

推薦閱讀: