Python多進程之進程同步及通信詳解
上篇文章介紹瞭什麼是進程、進程與程序的關系、進程的創建與使用、創建進程池等,接下來就來介紹一下進程同步及進程通信。
進程同步
當多個進程使用同一份數據資源的時候,因為進程的運行沒有順序,運行起來也無法控制,如果不加以幹預,往往會引發數據安全或順序混亂的問題,所以要在多個進程讀寫共享數據資源的時候加以適當的策略,來保證數據的一致性問題。
Lock(鎖)
一個Lock對象有兩個方法:acquire()和release()來控制共享數據的讀寫權限, 看下面這張圖片,使用多進程的時候會經常出現這種情況,這是因為多個進程都在搶占輸出資源,共享同一打印終端,從而造成瞭輸出信息的錯亂。
那麼就可以使用Lock機制:
import multiprocessing import random import time def work(lock, i): lock.acquire() print("work'{}'執行中......".format(i), multiprocessing.current_process().name, multiprocessing.current_process().pid) time.sleep(random.randint(0, 2)) print("work'{}'執行完畢......".format(i)) lock.release() if __name__ == '__main__': lock = multiprocessing.Lock() for i in range(5): p = multiprocessing.Process(target=work, args=(lock, i)) p.start()
由於引入瞭Lock機制,同一時間隻能有一個進程搶占到輸出資源,其他進程等待該進程結束,鎖釋放到,才可以搶占,這樣會解決多進程間資源競爭導致數據錯亂的問題,但是由並發執行變成瞭串行執行,會犧牲運行效率。
進程通信
上篇文章說過,進程之間互相隔離,數據是獨立的,默認情況下互不影響,那要如何實現進程間通信呢?Python提供瞭多種進程通信的方式,下面就來說一下。
Queue(隊列)
multiprocessing
模塊提供的Queue多進程安全的消息隊列,可以實現多進程之間的數據傳遞。
說明
- 初始化Queue()對象時(例如:q=Queue()),若括號中沒有指定最⼤可接收的消息數量,或數量為負值,那麼就代表可接受的消息數量沒有上限(直到內存的盡頭)。
Queue.qsize()
:返回當前隊列包含的消息數量。Queue.empty()
:如果隊列為空,返回True,反之False。Queue.full()
:如果隊列滿瞭,返回True,反之False。Queue.get(block, timeout)
:獲取隊列中的⼀條消息,然後將其從列隊中移除,block默認值為True。如果block使⽤默認值,且沒有設置timeout(單位秒),消息列隊如果為空,此時程序將被阻塞(停在讀取狀態),直到從消息列隊讀到消息為⽌,如果設置瞭timeout,則會等待timeout秒,若還沒讀取到任何消息,則拋出Queue.Empty異常;如果block值為False,消息列隊如果為空,則會⽴刻拋出Queue.Empty異常。Queue.get_nowait()
:相當Queue.get(False)。Queue.put(item, block, timeout)
:將item消息寫⼊隊列,block默認值為True,如果block使⽤默認值,且沒有設置timeout(單位秒),消息列隊如果已經沒有空間可寫⼊,此時程序將被阻塞(停在寫⼊狀態),直到消息列隊騰出空間為⽌,如果設置瞭timeout,則會等待timeout秒,若還沒空間,則拋出Queue.Full異常;如果block值為False,消息列隊如果沒有空間可寫⼊,則會⽴刻拋出Queue.Full異常。Queue.put_nowait(item)
:相當於Queue.put(item, False)。
from multiprocessing import Process, Queue import time def write_task(queue): """ 向隊列中寫入數據 :param queue: 隊列 :return: """ for i in range(5): if queue.full(): print("隊列已滿!") message = "消息{}".format(str(i)) queue.put(message) print("消息{}寫入隊列".format(str(i))) def read_task(queue): """ 從隊列讀取數據 :param queue: 隊列 :return: """ while True: print("從隊列讀取:{}".format(queue.get(True))) if __name__ == '__main__': print("主進程執行......") # 主進程創建Queue,最大消息數量為3 queue = Queue(3) pw = Process(target=write_task, args=(queue, )) pr = Process(target=read_task, args=(queue, )) pw.start() pr.start()
運行結果為:
從結果我們可以看出,隊列最大可以放入3條消息,後面再來消息,要等read_task從隊列裡取出後才行。
Pipe(管道)
Pipe常用於兩個進程,兩個進程分別位於管道的兩端,Pipe(duplex)方法返回(conn1,conn2)代表一個管道的兩端,duplex參數默認為True,即全雙工模式,若為False,conn1隻負責接收信息,conn2負責發送。
send()和recv()方法分別是發送和接受消息的方法。
import multiprocessing import time import random def proc_send(pipe): """ 發送消息 :param pipe:管道一端 :return: """ for i in range(10): print("process send:{}".format(str(i))) pipe.send(i) time.sleep(random.random()) def proc_recv(pipe): """ 接收消息 :param pipe:管道一端 :return: """ while True: print("Process recv:{}".format(pipe.recv())) time.sleep(random.random()) if __name__ == '__main__': # 主進程創建pipe pipe = multiprocessing.Pipe() p1 = multiprocessing.Process(target=proc_send,args=(pipe[0], )) p2 = multiprocessing.Process(target=proc_recv,args=(pipe[1], )) p1.start() p2.start() p1.join() p2.terminate()
執行結果為:
Semaphore(信號量)
Semaphore用來控制對共享資源的訪問數量,和進程池的最大連接數類似。
import multiprocessing import random import time def work(s, i): s.acquire() print("work'{}'執行中......".format(i), multiprocessing.current_process().name, multiprocessing.current_process().pid) time.sleep(i*2) print("work'{}'執行完畢......".format(i)) s.release() if __name__ == '__main__': s = multiprocessing.Semaphore(2) for i in range(1, 7): p = multiprocessing.Process(target=work, args=(s, i)) p.start()
上面的代碼中使用Semaphore限制瞭最多有2個進程同時執行,那麼來一個進程獲得一把鎖,計數加1,當計數等於2時,後面再來的進程均需要等待,等前面的進程釋放掉,才可以獲得鎖。
信號量與進程池的概念上類似,但是要區分開來,信號量涉及到加鎖的概念。
Event(事件)
Event用來實現進程間同步通信的。運行的機制是:全局定義瞭一個flag,如果flag值為False,當程序執行event.wait()方法時就會阻塞,如果flag值為True時,程序執行event.wait()方法時不會阻塞繼續執行。
Event常⽤函數:
event.wait()
:在進程中插入一個標記(flag),默認為False,可以設置timeout。event.set()
:使flag為Ture。event.clear()
:使flag為False。event.is_set()
:判斷flag是否為True。
import multiprocessing import time def wait_for_event(e): print("wait_for_event執行") e.wait() print("wait_for_event: e.is_set():{}".format(e.is_set())) def wait_for_event_timeout(e, t): print("wait_for_event_timeout執行") # 隻會阻塞2s e.wait(t) print("wait_for_event_timeout:e.is_set:{}".format(e.is_set())) if __name__ == "__main__": e = multiprocessing.Event() p1 = multiprocessing.Process(target=wait_for_event, args=(e,)) p1.start() p2 = multiprocessing.Process(target=wait_for_event_timeout, args=(e, 2)) p2.start() time.sleep(4) # 4s之後使用e.set()將flag設為Ture e.set() print("主進程:flag設置為True")
執行結果如下:
總結
本篇文章就到這裡瞭,希望能夠給你帶來幫助,也希望您能夠多多關註WalkonNet的更多內容!