Python並發編程之未來模塊Futures
不論是哪一種語言,並發編程都是一項非常重要的技巧。比如我們上一章用的爬蟲,就被廣泛用在工業的各個領域。我們每天在各個網站、App上獲取的新聞信息,很大一部分都是通過並發編程版本的爬蟲獲得的。
正確並合理的使用並發編程,無疑會給我們的程序帶來極大性能上的提升。今天我們就一起學習Python中的並發編程——Futures。
區分並發和並行
我們在學習並發編程時,常常會聽到兩個詞:並發(Concurrency)和並行(Parallelism)這兩個術語。這兩者經常一起使用,導致很多人以為他們是一個意思,其實是不對的。
首先要辨別一個誤區,在Python中,並發並不是隻同一時刻上右多個操作(thread或者task)同時進行。相反,在某個特定的時刻上它隻允許有一個操作的發生,隻不過線程或任務之間會相互切換直到完成,就像下面的圖裡表達的
在上圖中出現瞭task和thread兩種切換順序的不同方式。分別對應瞭Python中並發兩種形式——threading和asyncio。
對於線程,操作系統知道每個線程的所有信息,因此他會做主在適當的時候做線程切換,這樣的好處就是代碼容易編寫,因為程序員不需要做任何切換操作的處理;但是切換線程的操作,有可能出現在一個語句的執行過程中( 比如X+=1),這樣比較容易出現race condiiton的情況。
而對於asyncio,主程序想要切換任務的時候必須得到此任務可以被切換的通知,這樣一來就可以避免出現上面的race condition的情況。
至於所謂的並行,隻在同一時刻、同時發生。Python中的multi-Processing便是這個意思對應多進程,我們可以這麼簡單的理解,如果我們的電腦是8核的CPU,那麼在運行程序時,我們可以強制Python開啟8個進程,同時執行,用以加快程序的運行速度。大概是下面這個圖的思路
對比看來,並發通常用於I/O操作頻繁的場景。比方我們要從網站上下載多個文件,由於I/O操作的時間要比CPU操作的時長多的多,這時並發就比較適合。而在CPU使用比較heavy的場景中,為瞭加快運行速度,我們會多用幾臺機器,讓多個處理器來運算。
還記得以前寫瞭個博客總結過:在Python中的多線程是依靠CPU切換上下文實現的一種“偽多線程”,在進行大量線程切換過程中會占用比較多的CPU資源,而在進行IO操作時候(不論是在網絡上進行數據交互還是從內存、硬盤上讀寫數據)是不需要CPU進行計算的。所以多線程隻適用於IO操作密集的環境,不適用於計算密集型操作。
並發編程之Futures
單線程於多線程性能比較
我們下面通過一個實例,從代碼的角度來理解並發編程中的Futures,並進一步比較其於單線程的性能區別
假設我們有個任務,從網站上下載一些內容然後打印出來,如果用單線程的方式是這樣實現的
import requests import time def download_one(url): resp = requests.get(url) print('Read {} from {}'.format(len(resp.content),url)) def download_all(urls): for url in urls: download_one(url) def main(): sites = [ 'https://en.wikipedia.org/wiki/Portal:Arts', 'https://en.wikipedia.org/wiki/Portal:History', 'https://en.wikipedia.org/wiki/Portal:Society', 'https://en.wikipedia.org/wiki/Portal:Biography', 'https://en.wikipedia.org/wiki/Portal:Mathematics', 'https://en.wikipedia.org/wiki/Portal:Technology', 'https://en.wikipedia.org/wiki/Portal:Geography', 'https://en.wikipedia.org/wiki/Portal:Science', 'https://en.wikipedia.org/wiki/Computer_science', 'https://en.wikipedia.org/wiki/Python_(programming_language)', 'https://en.wikipedia.org/wiki/Java_(programming_language)', 'https://en.wikipedia.org/wiki/PHP', 'https://en.wikipedia.org/wiki/Node.js', 'https://en.wikipedia.org/wiki/The_C_Programming_Language', 'https://en.wikipedia.org/wiki/Go_(programming_language)' ] start_time = time.perf_counter() download_all(sites) end_time = time.perf_counter() print('Download {} sites in {} seconds'.format(len(sites),end_time-start_time)) if __name__ == '__main__': main()
這是種最簡單暴力最直接的方式:
先遍歷存儲網站的列表
對當前的網站進行下載操作
當前操作完成後,再對下一個網站進行同樣的操作,一直到結束。
可以試出來總耗時大概是2s多,單線程的方式簡單明瞭,但是最大的問題是效率低下,程序最大的時間都消耗在I/O等待上(這還是用的print,如果是寫在硬盤上的話時間會更多)。如果在實際生產環境中,我們需要訪問的網站至少是以萬為單位的,所以這個方案根本行不通。
接著我們看看多線程版本的代碼
import concurrent.futures import requests import threading import time def download_one(url): resp = requests.get(url).content print('Read {} from {}'.format(len(resp),url)) def download_all(sites): with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: executor.map(download_one,sites) def main(): sites = [ 'https://en.wikipedia.org/wiki/Portal:Arts', 'https://en.wikipedia.org/wiki/Portal:History', 'https://en.wikipedia.org/wiki/Portal:Society', 'https://en.wikipedia.org/wiki/Portal:Biography', 'https://en.wikipedia.org/wiki/Portal:Mathematics', 'https://en.wikipedia.org/wiki/Portal:Technology', 'https://en.wikipedia.org/wiki/Portal:Geography', 'https://en.wikipedia.org/wiki/Portal:Science', 'https://en.wikipedia.org/wiki/Computer_science', 'https://en.wikipedia.org/wiki/Python_(programming_language)', 'https://en.wikipedia.org/wiki/Java_(programming_language)', 'https://en.wikipedia.org/wiki/PHP', 'https://en.wikipedia.org/wiki/Node.js', 'https://en.wikipedia.org/wiki/The_C_Programming_Language', 'https://en.wikipedia.org/wiki/Go_(programming_language)' ] start_time = time.perf_counter() download_all(sites) # for i in sites: end_time = time.perf_counter() # print('Down {} sites in {} seconds'.format(len(sites),end_time-start_time)) if __name__ == '__main__': main()
這段代碼的運行時長大概是0.2s,效率一下提升瞭10倍多,可以註意到這個版本和單線程的區別主要在下面:
def download_all(sites): with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: executor.map(download_one,sites)
在上面的代碼中我們創建瞭一個線程池,有5個線程可以分配使用。executer.map()與以前將的Python內置的map()函數,表示對sites中的每一個元素並發的調用函數download_one()函數。
順便提一下,在download_one()函數中,我們使用的requests.get()方法是線程安全的(thread-safe),因此在多線程的環境下,它也可以安全使用,並不會出現race condition(條件競爭)的情況。
另外,雖然線程的數量可以自己定義,但是線程數並不是越多越好,以為線程的創建、維護和刪除也需要一定的開銷。所以如果設置的很大,反而會導致速度變慢,我們往往要根據實際的需求做一些測試,來尋找最優的線程數量。
當然,我們也可以用並行的方式去提高運行效率,隻需要在download_all()函數中做出下面的變化即可
def download_all(sites): with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: to_do = [] for site in sites: future = executor.submit(download_one,site) to_do.append(site) for future in concurrent.futures.as_completed(to_do): future.result()
在需要改的這部分代碼中,函數ProcessPoolExecutor()表示創建進程池,使用多個進程並行的執行程序。不過,這裡 通常省略參數workers,因為系統會自動返回CPU的數量作為可以調用的進程數。
就像上面說的,並行方式一般用在CPU密集型的場景中,因為對於I/O密集型操作多數時間會用於等待,相比於多線程,使用多進程並不會提升效率,反而很多時候,因為CPU數量的限制,會導致執行效率不如多線程版本。
到底什麼是Futures?
Python中的Futures,位於concurrent.futures和asyncio中,他們都表示帶有延遲的操作,Futures會將處於等待狀態的操作包裹起來放到隊列中,這些操作的狀態可以隨時查詢。而他們的結果或是異常,也能在操作後被獲取。
通常,作為用戶,我們不用考慮如何去創建Futures,這些Futures底層會幫我們處理好,我們要做的就是去schedule這些Futures的執行。比方說,Futures中的Executor類,當我們中的方法done(),表示相對應的操作是否完成——用True表示已完成,ongFalse表示未完成。不過,要註意的是done()是non-blocking的,會立刻返回結果,相對應的add_done_callback(fn),則表示Futures完成後,相對應的參數fn,會被通知並執行調用。
Futures裡還有一個非常重要的函數result(),用來表示future完成後,返回器對應的結果或異常。而as_completed(fs),則是針對給定的future迭代器fs,在其完成後,返回完成後的迭代器。
所以也可以把上面的例子寫成下面的形式:
def download_all(sites): with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: to_do = [] for site in sites: future = executor.submit(download_one,site) to_do.append(site) for future in concurrent.futures.as_completed(to_do): future.result()
這裡,我們首先用executor.submit(),將下載每個網站的內容都放進future隊列to_do裡等待執行。然後是as_completed()函數,在future完成後輸出結果
不過這裡有個事情要註意一下:future列表中每個future完成的順序和他在列表中的順序不一定一致,至於哪個先完成,取決於系統的調度和每個future的執行時間。
為什麼多線程每次隻有一個線程執行?
前面我們講過,在一個時刻下,Python主程序隻允許有一個線程執行,所以Python的並發,是通過多線程的切換完成的,這是為什麼呢?
這就又和以前講的知識串聯到一起瞭——GIL(全局解釋器鎖),這裡在復習下:
事實上,Python的解釋器並不是線程安全的,為瞭解決由此帶來的race condition等問題,Python就引入瞭GIL,也就是在同一個時刻,隻允許一個線程執行。當然,在進行I/O操作是,如果一個線程被block瞭,GIL就會被釋放,從而讓另一個線程能夠繼續執行。
總結
這節課裡我們先學習瞭Python中並發和並行的概念
並發——通過線程(thread)和任務(task)之間相互切換的方式實現,但是同一時刻,隻允許有一個線程或任務執行
並行——多個進程同時進行。
並發通常用於I/O頻繁操作的場景,而並行則適用於CPU heavy的場景
隨後我們通過一個下載網站內容的例子,比較瞭單線程和運用FUtures的多線程版本的性能差異,顯而易見,合理的運用多線程,能夠極大的提高程序運行效率。
我們還大致瞭解瞭Futures的方式,介紹瞭一些常用的函數,並輔以實例加以理解。
要註意,Python中之所以同一時刻隻允許一個線程運行,其實是由於GIL的存在。但是對於I/O操作而言,當其被block的時候,GIL會被釋放,使其他線程繼續執行。
以上就是Python並發編程之未來模塊Futures的詳細內容,更多關於Python並發未來模塊Futures的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- python使用期物處理並發教程
- python concurrent.futures模塊的使用測試
- python中ThreadPoolExecutor線程池和ProcessPoolExecutor進程池
- python進階之協程你瞭解嗎
- 一文詳解Python中多進程和進程池的使用方法