Python通過隊列實現進程間通信詳情
一、前言
在多進程中,每個進程之間是什麼關系呢?其實每個進程都有自己的地址空間、內存、數據棧以及其他記錄其運行狀態的輔助數據。下面通過一個例子,驗證一下進程之間能否直接共享信息。
定義一個全局變量g_num,分別創建2個子進程對g_num執行不同的操作,並輸出操作後的結果。
代碼如下:
# _*_ coding:utf-8 _*_ from multiprocessing import Process def plus(): print("-------子進程1開始----------") global g_num g_num += 50 print("g_num is %d" % g_num) print("-------子進程1結束----------") def minus(): print("-------子進程2開始----------") global g_num g_num -= 50 print("g_num is %d" % g_num) print("-------子進程2結束----------") g_num = 100 # 定義一個全局變量 if __name__ == "__main__": print("-------主進程開始----------") print("g_num is %d" % g_num) p1 = Process(target=plus) # 實例化進程p1 p2 = Process(target=minus) # 實例化進程p2 p1.start() # 開啟p1進程 p2.start() # 開啟p2進程 p1.join() # 等待p1進程結束 p2.join() # 等待p2進程結束 print("-------主進程結束----------")
運行結果如圖所示:
上述代碼中,分別創建瞭2個子進程,一個子進程中令g_num加上50,另一個子進程令g_num減去50。但是從運行結果可以看出來,g_num在父進程和2個子進程中的初始值都是100。也就是全局變量g_num在一個進程中的結果,沒有傳到下一個進程中,即進程之間沒有共享信息。
進程間示意圖如圖所示:
要如何才能實現進程間的通信呢?Python的multiprocessing模塊包裝瞭底層的機制,提供瞭Queue(隊列)、Pipes(管道)等多種方式來交換數據。本文將講解通過隊列(Queue)來實現進程間的通信。
二、隊列簡介
隊列(Queue)就是模型仿現實中的排隊。例如學生在食堂排隊買飯。新來的學生排隊到隊伍最後,最前面的學生買完飯走開,後面的學生跟上。
可以看出隊列有兩個特點:
- 新來的學生都排在隊尾。
- 最前的學生完成後離隊,後面一個跟上。
根據以上特點,可以歸納出隊列的結構如圖所示:
三、多進程隊列的使用
進程之間有時需要通信,操作系統提供瞭很多機制來實現進程間的通信。可以使用multiprocessing模塊的Queue實現多進程之間的數據傳遞。Queue本身是一個消息隊列程序,下面介紹一下Queue的使用。
初始化Queue()對象時(例如:q=Queue(num)),若括號中沒有指定最大可接收的消息數量,或數量為負值,那麼就代表可接收的消息數量沒有上限(直到內存的盡頭)。
Queue的常用方法如下:
Queue.qsize():返回當前隊列包含的消息數量。Queue.empty():如果隊列為空,返回True;返之返回False。Queue.full():如果隊列滿瞭,返回True;反之返回False。Queue.get(block[,timeout]):獲取隊列中的一條信息,然後將其從隊列中移除,block默認值為True。
如果block使用默認值,且沒有設置timeout(單位秒),消息隊列為空,此時程序將被阻塞(停在讀取狀態),直到從消息隊列讀到消息為止。如果設置瞭timeout,則會等待timeout秒,若還沒有讀取任何消息,則拋出“Queue.Empty”異常。
如果block值為False,消息隊列為空,則會立刻拋出“Queue.Empty”異常。
Queue.get_nowait():相當於Queue.get(False)。Queue.put(item,[block[,timeout]]):將item消息寫入隊列,block默認值為True。
如果block使用默認值,且沒有設置timeout(單位秒),消息隊列如果已經沒有空間可以寫入,此時程序將被阻塞(停在寫入狀態),直到從消息隊列騰出空間為止,如果設置瞭timeout,則會等待timeout秒,若還沒有空間,則拋出“Queue.Full”異常。
如果block值為False,消息隊列沒有空間可寫入,則會立刻拋出“Queue.Full”異常
Queue.put_nowait(item):相當Queue.put(item,False)。
下面,通過一個例子學習一下如何使用processing.Queue。
代碼如下:
# _*_ coding:utf-8 _*_ from multiprocessing import Queue if __name__ == "__main__": q = Queue(3) q.put("消息1") q.put("消息2") print(q.full()) # 返回False q.put("消息3") print(q.full()) # 返回True # 因為消息隊列已滿,下面的try都會拋出異常 # 第一個try會等待2秒再拋出異常,第二個try會立刻拋出異常 try: q.put("消息4", True, 2) except: print("消息隊列已滿,現有消息數量:%s" % q.qsize()) try: q.put_nowait("消息4") except: print("消息隊列已滿,現有消息數量:%s" % q.qsize()) # 讀取消息時,先判斷消息隊列是否為空,再讀取 if not q.empty(): print("-----從隊列中獲取消息-------") for i in range(q.qsize()): print(q.get_nowait()) # 先判讀消息隊列是否已滿,再寫入: if not q.full(): q.put_nowait("消息4")
運行結果如圖所示:
四、使用隊列在進程間通信
我們知道使用multiprocessing.Process可以創建多進程,使用multiprocessing.Queue可以實現隊列的操作。接下來,通過一個示例結合Process和Queue實現進程間的通信。
創建2個子進程,一個子進程負責向隊列中寫入數據,另外一個子進程負責從隊列中讀取數據。為瞭保證能夠正確從隊列中讀取數據,設置讀取數據的進程等待時間為2秒。如果2秒後乃然無法讀取數據,則拋出異常。
代碼如下:
# _*_ coding:utf-8 _*_ from multiprocessing import Process, Queue import time # 向隊列中寫入數據 def write_task(q): if not q.full(): for i in range(5): message = "消息" + str(i) q.put(message) print("寫入:%s" % message) # 從隊列中讀取數據 def read_task(q): time.sleep(1) # 休眠1秒 while not q.empty(): print("讀取:%s" % q.get(True, 2)) # 等待2秒中,如果沒有讀取到任何信息,則拋出異常 if __name__ == "__main__": print("--------父進程開始---------") q = Queue() # 父進程創建Queue,並傳給各個子進程 pw = Process(target=write_task, args=(q,)) # 實例化寫入隊列的子進程,並傳遞給隊列 pr = Process(target=read_task, args=(q,)) # 實例化讀取隊列的子進程,並傳遞給隊列 pw.start() # 啟動子進程pw,寫入 pr.start() # 啟動子進程pr,讀取 pw.join() # 等待pw結束 pr.join() # 等待pr結束 print("-------父進程結束-----------")
運行結果如下:
到此這篇關於Python通過隊列實現進程間通信詳情的文章就介紹到這瞭,更多相關Python進程間通信 內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!