Python多線程與同步機制淺析
線程實現
Python中線程有兩種方式:函數或者用類來包裝線程對象。threading模塊中包含瞭豐富的多線程支持功能:
- threading.currentThread(): 返回當前線程;
- threading.enumerate(): 返回包含正在運行的線程列表;
- threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())等價。
Thread類
通過Thread類來處理線程,類中提供的一些方法:
- run(): 用以表示線程執行的方法(可重載實現實際功能);
- start(): 啟動線程;
- join([time]): 等待線程中止(或者超時);
- isAlive(): 返回線程是否活動;
- getName(): 返回線程名;
- setName(): 設置線程名;
- setDaemon(True):設置為後臺進程(必須在start調用前設定)。
函數方式
通過Thread直接構造線程,然後通過start方法啟動線程:
threading.Thread(group=None, target=None, name=None, args=(), kwargs=None, *,daemon=None)
各參數說明:
- group:指定線程隸屬的線程組(當前忽略);
- target:指定線程要調度的目標方法(即實現功能的函數);
- args:傳遞給目標方法的參數(以元組的方式);
- kwargs:傳遞給目標方法的參數(以字典的方式);
- daemon:指定線程是否為後臺線程。
def simpleRoutine(name, delay): print(f"routine {name} starting...") time.sleep(delay) print(f"routine {name} finished") if __name__ == '__main__': thrOne = threading.Thread(target=simpleRoutine, args=("First", 1)) thrTwo = threading.Thread(target=simpleRoutine, args=("Two", 2)) thrOne.start() thrTwo.start() thrOne.join() thrTwo.join()
繼承方式
直接繼承Thread,創建一個新的子類(主要實現run方法):
class SimpleThread (threading.Thread): def __init__(self, name, delay): # threading.Thread.__init__(self) super().__init__() self.name = name self.delay = delay def run(self): print(f"thread {self.name} starting...") time.sleep(self.delay) print(f"thread {self.name} finished") if __name__ == '__main__': thrOne = SimpleThread("First", 2) thrTwo = SimpleThread("Second", 2) thrOne.start() thrTwo.start() thrOne.join() thrTwo.join()
同步機制
當多個線程同時修改同一條數據時可能會出現臟數據;所以,就需要線程鎖,即同一時刻隻允許一個線程執行操作。
同步鎖Lock
threading提供瞭Lock和RLock(可重入鎖)兩個類,它們都提供瞭如下兩個方法來加鎖和釋放鎖:
- acquire(blocking=True, timeout=-1):加鎖,其中 timeout 參數指定加鎖多少秒。
- release():釋放鎖。
兩種使用鎖的方式:
gCount = 0 def PlusOne(locker): global gCount with locker: gCount += 1、 def MinusOne(locker): global gCount if locker.acquire(): gCount -= 1 locker.release()
條件變量Condition
Condition對象內部維護瞭一個鎖(構造時可傳遞一個Lock/RLock對象,否則內部會自行創建一個RLock)和一個waiting池:
- 通過acquire獲得Condition對象;
- 當調用wait方法時,線程會釋放Condition內部的鎖並進入blocked狀態,同時在waiting池中記錄這個線程;
- 當調用notify方法時,Condition對象會從waiting池中挑選一個線程,通知其調用acquire方法嘗試取到鎖。
Condition對象:
__init__(self,lock=None)
:Condition類總是與一個鎖相關聯(若不指定lock參數,會自動創建一個與之綁定的RLock對象);
acquire(timeout)
:調用關聯鎖的acquire()方法;
release()
:調用關聯鎖的release()方法
wait(timeout)
:線程掛起,直到收到一個notify通知或超時才會被喚醒;必須在已獲得鎖的前提下調用;
notify(n=1)
:喚醒waiting池中的n個正在等待的線程並通知它:
- 收到通知的線程將自動調用acquire()方法嘗試加鎖;
- 若waiting池中有多個線程,隨機選擇n個喚醒;
- 必須在已獲得鎖的前提下調用,否則將引發錯誤。
notify_all()
:通知所有線程。
class Producer(threading.Thread): def __init__(self, cond, storage): threading.Thread.__init__(self) self.cond = cond self.storage = storage def run(self): label = 1 while True: with self.cond: if len(self.storage) < 10: self.storage.append(label) print(f"<- Produce {label} product") label += 1 self.cond.notify(2) else: print(f"<- storage full: Has Produced {label - 1} product") self.cond.notify_all() self.cond.wait() time.sleep(0.4) class Consumer(threading.Thread): def __init__(self, name, cond, storage): threading.Thread.__init__(self) self.name = name self.cond = cond self.storage = storage def run(self): while True: if self.cond.acquire(): if len(self.storage) > 1: pro = self.storage.pop(0) print(f"-> {self.name} consumed {pro}") self.cond.notify() else: print(f"-> {self.name} storage empty: no product to consume") self.cond.wait() self.cond.release() time.sleep(1)
信號量Semaphore
信號量對象內部維護一個計數器:
acquire(blocking=True,timeout=None)
時減1,當計數為0就阻塞請求的線程;release()
時加1,當計數大於0恢復被阻塞的線程;
threading中有Semaphore和BoundedSemaphore兩個信號量;BoundedSemaphore限制瞭release的次數,任何時候計數器的值,都不不能大於初始值(release時會檢測計數器的值,若大於等於初始值,則拋出ValueError異常)。
通過Semaphore維護生產(release一個)、消費(acquire一個)量:
# products = threading.Semaphore(0) def produceOne(label, sem: threading.Semaphore): sem.release() print(f"{label} produce one") def consumeOne(label, sem: threading.Semaphore): sem.acquire() print(f"{label} consume one")
通過BoundedSemaphore來控制並發數量(最多有Semaphore初始值數量的線程並發):
# runner = threading.BoundedSemaphore(3) def runBound(name, sem: threading.BoundedSemaphore): with sem: print(f"{name} is running") time.sleep(1) print(f"{name} finished")
事件Event
事件對象內部有個標志字段,用於線程等待事件的發生:
- isSet():返回event的狀態值;
- wait():狀態為False時,一直阻塞;否則立即返回;
- set(): 設置狀態值為True,激活所有被阻塞的線程;
- clear():恢復狀態值為False。
多線程等待事件發生,然後開始執行:
def waiters(name, evt: threading.Event): evt.wait() print(f"{name} is running") time.sleep(1) print(f"{name} finished") def starting(evt: threading.Event): evt.set() print("event is set")
屏障Barrier
屏障用於設定等待線程數量,當數量達到指定值時,開始執行:
threading.Barrier(parties, action=None, timeout=None)
屏障屬性與方法:
- wait(timeout=None):等待通過屏障;線程被阻塞,直到阻塞的數量達到parties時,被阻塞的線程被同時全部釋放;
- reset():重置屏障到默認的空狀態;
- abort():將障礙置為斷開狀態;導致等待的線程引發BrokenBarrierError異常;
- partier():通過障礙所需的線程數;
- n_waiting():當前在屏障中等待的線程數;
- broken():如果屏障處於斷開狀態,則返回True。
def waitBarrier(name, barr: threading.Barrier): print(f"{name} waiting for open") try: barr.wait() print(f"{name} running") time.sleep(5) except threading.BrokenBarrierError: print(f"{name} exception") print(f"{name} finished")
GIL全局解釋器鎖
GIL(Global Interpreter Lock,全局解釋器鎖);cpython中,某個線程想要執行,必須先拿到GIL(可以把GIL看作是“通行證”)。每次釋放GIL鎖,線程都要進行鎖競爭,切換線程,會消耗資源。
由於GIL鎖的存在,python裡一個進程永遠隻能同時執行一個線程(拿到GIL的線程),這就是為什麼在多核CPU上,python的多線程效率並不高:
- CPU密集型代碼:由於計算工作多,會很快用完時間片,然後觸發GIL的釋放與再競爭;
- IO密集型代碼(文件處理、網絡爬蟲等):多線程能夠有效提升效率(單線程下有IO操作會進行IO等待,造成不必要的時間浪費,而開啟多線程能在線程A等待時,自動切換到線程B,可以不浪費CPU的資源,從而能提升程序執行效率)。
python在使用多線程的時候,調用的是c語言的原生線程:
- 拿到公共數據
- 申請GIL
- python解釋器調用os原生線程
- os操作cpu執行運算
- 當線程執行時間到後,就進行切換(context switch)
到此這篇關於Python多線程與同步機制淺析的文章就介紹到這瞭,更多相關Python多線程內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Python中threading庫實現線程鎖與釋放鎖
- python基礎之並發編程(一)
- Python學習筆記之線程
- 一篇文章帶你瞭解Python的進程,線程和協程
- Python線程編程之Thread詳解