分析Python感知線程狀態的解決方案之Event與信號量
一、停止線程
利用Threading庫我們可以很方便地創建線程,讓它按照我們的想法執行我們想讓它執行的事情,從而加快程序運行的效率。然而有一點坑爹的是,線程創建之後,就交給瞭操作系統執行,我們無法直接結束一個線程,也無法給它發送信號,無法調整它的調度,也沒有其他高級操作。如果想要相關的功能,隻能自己開發。
怎麼開發呢?
我們創建線程的時候指定瞭target等於一個我們想讓它執行的函數,這個函數並不一定是全局函數,實際上也可以是一個對象中的函數。如果是對象中的函數,那麼我們就可以在這個函數當中獲取到對象中的其他信息,我們可以利用這一點來實現手動控制線程的停止。
說起來好像不太好理解,但是看下代碼真的非常簡單:
import time from threading import Thread class TaskWithSwitch: def __init__(self): self._running = True def terminate(self): self._running = False def run(self, n): while self._running and n > 0: print('Running {}'.format(n)) n -= 1 time.sleep(1) c = TaskWithSwitch() t = Thread(target=c.run, args=(10, )) t.start() c.terminate() t.join()
如果你運行這段代碼,會發現屏幕上隻輸出瞭10,因為我們將_running這個字段置為False之後,下次循環的時候不再滿足循環條件,它就會自己退出瞭。
如果我們想要用多線程來讀取IO,由於IO可能存在堵塞,所以可能會出現線程一直無法返回的情況。也就是說我們在循環內部卡死瞭,這個時候單純用_running來判斷還是不夠的,我們需要在線程內部設置計時器,防止循環內部的卡死。
class IOTask: def __init__(self): self._running = True def terminate(self): self._running = False def run(self, sock): # 在socket中設置計時器 sock.settimeout(10) while self._running: try: # 由於設置瞭計時器,所以這裡不會永久等待 data = sock.recv(1024) break except socket.timeout: continue return
二、線程信號的傳遞
我們之所以如此費勁才能控制線程的運行,主要原因是線程的狀態是不可知的,並且我們無法直接操作它,因為它是被操作系統管理的。我們運行的主線程和創建出來的線程是獨立的,兩者之間並沒有從屬關系,所以想要實現對線程的狀態進行控制,往往需要我們通過其他手段來實現。
我們來思考一個場景,假設我們有一個任務,需要在另外一個線程運行結束之後才能開始執行。要想要實現這一點,就必須對線程的狀態有所感知,需要其他線程傳遞出信號來才行。我們可以使用threading中的Event工具來實現這一點。Event工具就是可以用來傳遞信號的,就好像是一個開關,當一個線程執行完成之後,會去啟動這個開關。而這個開關控制著另外一段邏輯的運行。
我們來看下樣例代碼:
import time from threading import Thread, Event def run_in_thread(): time.sleep(1) print('Thread is running') t = Thread(target=run_in_thread) t.start() print('Main thread print')
我們在線程裡面就隻做瞭輸出一行提示符,沒有其他任何邏輯。由於我們在run_in_thread函數當中沉睡瞭1s,所以一定是先輸出Main thread print再輸出的Thread is running。假設這個線程是一個很重要的任務,我們希望主線程能夠等待它運行到一個階段再往下執行,我們應該怎麼辦呢?
註意,這裡說的是運行到一個階段,並不是運行結束。運行結束我們很好處理,可以通過join來完成。但如果不是運行結束,而是運行完成瞭某一個階段,當然通過join也可以,但是會損害整體的效率。這個時候我們就必須要用上Event瞭。加上Event之後,我們再來看下代碼:
import time from threading import Thread, Event def run_in_thread(event): time.sleep(1) print('Thread is running') # set一下event,這樣外面wait的部分就會被啟動 event.set() # 初始化Event event = Event() t = Thread(target=run_in_thread, args=(event, )) t.start() # event等待set event.wait() print('Main thread print')
整體的邏輯沒有太多的修改,主要的是增加瞭幾行關於Event的使用代碼。
我們如果要用到Event,最好在代碼當中隻使用一次。當然通過Event中的clear方法我們可以重置Event的值,但問題是我們沒辦法保證重置的這個邏輯會在wait之前執行。如果是在之後執行的,那麼就會問題,並且在debug的時候會異常痛苦,因為bug不是必現的,而是有時候會出現有時候不會出現。這種情況往往都是因為多線程的使用問題。
所以如果要多次使用開關和信號的話,不要使用Event,可以使用信號量。
三、信號量
Event的問題在於如果多個線程在等待Event的發生,當它一旦被set的時候,那麼這些線程都會同時執行。但有時候我們並不希望這樣,我們希望可以控制這些線程一個一個地運行。如果想要做到這一點,Event就無法滿足瞭,而需要使用信號量。
信號量和Event的使用方法類似,不同的是,信號量可以保證每次隻會啟動一個線程。因為這兩者的底層邏輯不太一致,對於Event來說,它更像是一個開關。一旦開關啟動,所有和這個開關關聯的邏輯都會同時執行。而信號量則像是許可證,隻有拿到許可證的線程才能執行工作,並且許可證一次隻發一張。
想要使用信號量並不需要自己開發,thread庫當中為我們提供瞭現成的工具——Semaphore,我們來看它的使用代碼:
# 工作線程 def worker(n, sema): # 等待信號量 sema.acquire() print('Working', n) # 初始化 sema = threading.Semaphore(0) nworkers = 10 for n in range(nworkers): t = threading.Thread(target=worker, args=(n, sema,)) t.start()
在上面的代碼當中我們創建瞭10個線程,雖然這些線程都被啟動瞭,但是都不會執行邏輯,因為sema.acquire是一個阻塞方法,沒有監聽到信號量是會一直掛起等待。
當我們釋放信號量之後,線程被啟動,才開始瞭執行。我們每釋放一個信號,則會多啟動一個線程。這裡面的邏輯應該不難理解。
四、總結
在並發場景當中,多線程的使用絕不是多啟動幾個線程做不同的任務而已,我們需要線程間協作,需要同步、獲取它們的狀態,這是非常不容易的。一不小心就會出現幽靈bug,時顯時隱,這也是並發問題讓人頭疼的主要原因。
以上就是分析Python感知線程狀態的解決方案之Event與信號量的詳細內容,更多關於Python 感知線程狀態 Event與信號量的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- python基礎之並發編程(一)
- Python線程編程之Thread詳解
- Python多線程與同步機制淺析
- Python threading和Thread模塊及線程的實現
- Python多線程即相關理念詳解