python基礎之並發編程(二)

一、多進程的實現

方法一

# 方法包裝   多進程
from multiprocessing import Process
from time import sleep
def func1(arg):
    print(f'{arg}開始...')
    sleep(2)
    print(f'{arg}結束...')
if __name__ == "__main__":
    p1 = Process(target=func1,args=('p1',))
    p2 = Process(target=func1,args=('p2',))
    p1.start()
    p2.start()

方法二:

二、使用進程的優缺點

1、優點

  • 可以使用計算機多核,進行任務的並發執行,提高執行效率
  • 運行不受其他進程影響,創建方便
  • 空間獨立,數據安全

2、缺點

  • 進程的創建和刪除消耗的系統資源較多

三、進程的通信

Python 提供瞭多種實現進程間通信的機制,主要有以下 2 種:

1. Python multiprocessing 模塊下的 Queue 類,提供瞭多個進程之間實現通信的諸多 方法

2. Pipe,又被稱為“管道”,常用於實現 2 個進程之間的通信,這 2 個進程分別位於管 道的兩端

Pipe 直譯過來的意思是“管”或“管道”,該種實現多進程編程的方式,和實際生活中 的管(管道)是非常類似的。通常情況下,管道有 2 個口,而 Pipe 也常用來實現 2 個進程之 間的通信,這 2 個進程分別位於管道的兩端,一端用來發送數據,另一端用來接收數據 – send(obj)

發送一個 obj 給管道的另一端,另一端使用 recv() 方法接收。需要說明的是,該 obj 必 須是可序列化的,如果該對象序列化之後超過 32MB,則很可能會引發 ValueError 異常 – recv()

接收另一端通過 send() 方法發送過來的數據 – close()

關閉連接 – poll([timeout])

返回連接中是否還有數據可以讀取 – end_bytes(buffer[, offset[, size]])

發送字節數據。如果沒有指定 offset、size 參數,則默認發送 buffer 字節串的全部數 據;如果指定瞭 offset 和 size 參數,則隻發送 buffer 字節串中從 offset 開始、長度為 size 的字節數據。通過該方法發送的數據,應該使用 recv_bytes() 或 recv_bytes_into 方法接收 – recv_bytes([maxlength])

接收通過 send_bytes() 方法發送的數據,maxlength 指定最多接收的字節數。該方法返 回接收到的字節數據 – recv_bytes_into(buffer[, offset])

功能與 recv_bytes() 方法類似,隻是該方法將接收到的數據放在 buffer 中

1、Queue 實現進程間通信

from multiprocessing import Process,current_process,Queue   # current_process 指的是當前進程
# from queue import Queue
import os
def func(name,mq):
    print('進程ID {} 獲取瞭數據:{}'.format(os.getpid(),mq.get()))
    mq.put('shiyi')
if __name__ == "__main__":
    # print('進程ID:{}'.format(current_process().pid))
    # print('進程ID:{}'.format(os.getpid()))
    mq = Queue()
    mq.put('yangyang')
    p1 = Process(target=func,args=('p1',mq))
    p1.start()
    p1.join()
    print(mq.get())

2、Pipe 實現進程間通信(一邊發送send(obj),一邊接收(obj))

from multiprocessing import Process,current_process,Pipe
import os
def func(name,con):
    print('進程ID {} 獲取瞭數據:{}'.format(os.getpid(),con.recv()))
    con.send('你好!')
if __name__ == "__main__":
    # print('進程ID:{}'.format(current_process().pid))
    con1,con2 = Pipe()
    p1 = Process(target=func,args=('p1',con1))
    p1.start()
    con2.send("hello!")
    p1.join()
    print(con2.recv())

四、Manager管理器

管理器提供瞭一種創建共享數據的方法,從而可以在不同進程中共享

from multiprocessing import Process,current_process
import os
from multiprocessing import Manager
def func(name,m_list,m_dict):
    print('子進程ID {} 獲取瞭數據:{}'.format(os.getpid(),m_list))
    print('子進程ID {} 獲取瞭數據:{}'.format(os.getpid(),m_dict))
    m_list.append('你好')
    m_dict['name'] = 'shiyi'    
if __name__ == "__main__":
    print('主進程ID:{}'.format(current_process().pid))
    with Manager() as mgr:
        m_list = mgr.list()
        m_dict = mgr.dict()
        m_list.append('Hello!!')
        p1 = Process(target=func,args=('p1',m_list,m_dict))
        p1.start()
        p1.join()
        print(m_list)
        print(m_dict)

五、進程池

Python 提供瞭更好的管理多個進程的方式,就是使用進程池。

進程池可以提供指定數量的進程給用戶使用,即當有新的請求提交到進程池中時,如果池 未滿,則會創建一個新的進程用來執行該請求;反之,如果池中的進程數已經達到規定最大 值,那麼該請求就會等待,隻要池中有進程空閑下來,該請求就能得到執行。

使用進程池的優點

1. 提高效率,節省開辟進程和開辟內存空間的時間及銷毀進程的時間

2. 節省內存空間

類/方法 功能 參數
Pool(processes)
創建進程池對象
processes 表示進程池
中有多少進程
pool.apply_async(func,a
rgs,kwds)
異步執行 ;將事件放入到進 程池隊列
func 事件函數
args 以元組形式給
func 傳參
kwds 以字典形式給
func 傳參 返回值:返
回一個代表進程池事件的對
象,通過返回值的 get 方法
可以得到事件函數的返回值
pool.apply(func,args,kw
ds)
同步執行;將事件放入到進程 池隊列
func 事件函數 args 以
元組形式給 func 傳參
kwds 以字典形式給 func
傳參
pool.close()
關閉進程池
pool.join()
回收進程池
pool.map(func,iter)
類似於 python 的 map 函
數,將要做的事件放入進程池
func 要執行的函數
iter 迭代對象
from multiprocessing import Pool
import os
from time import sleep
def func1(name):
    print(f"當前進程的ID:{os.getpid()},{name}")
    sleep(2)
    return name
def func2(args):
    print(args)
if __name__ == "__main__":
    pool = Pool(5)
    pool.apply_async(func = func1,args=('t1',),callback=func2)
    pool.apply_async(func = func1,args=('t2',),callback=func2)
    pool.apply_async(func = func1,args=('t3',),callback=func2)
    pool.apply_async(func = func1,args=('t4',))
    pool.apply_async(func = func1,args=('t5',))
    pool.apply_async(func = func1,args=('t6',))
    pool.close()
    pool.join()
from multiprocessing import Pool
import os
from time import sleep
def func1(name):
    print(f"當前進程的ID:{os.getpid()},{name}")
    sleep(2)
    return name
if __name__ == "__main__":
   with Pool(5) as pool:
        args = pool.map(func1,('t1,','t2,','t3,','t4,','t5,','t6,','t7,','t8,'))
        for a in args:
            print(a)

總結

本篇文章就到這裡瞭,希望能夠給你帶來幫助,也希望您能夠多多關註WalkonNet的更多內容!

推薦閱讀: