python 多進程和多線程使用詳解

進程和線程

進程是系統進行資源分配的最小單位,線程是系統進行調度執行的最小單位;

一個應用程序至少包含一個進程,一個進程至少包含一個線程;

每個進程在執行過程中擁有獨立的內存空間,而一個進程中的線程之間是共享該進程的內存空間的;

  • 計算機的核心是CPU,它承擔瞭所有的計算任務。它就像一座工廠,時刻在運行。
  • 假定工廠的電力有限,一次隻能供給一個車間使用。也就是說,一個車間開工的時候,其他車間都必須停工。背後的含義就是,單個CPU一次隻能運行一個任務。編者註: 多核的CPU就像有瞭多個發電廠,使多工廠(多進程)實現可能。
  • 進程就好比工廠的車間,它代表CPU所能處理的單個任務。任一時刻,CPU總是運行一個進程,其他進程處於非運行狀態。
  • 一個車間裡,可以有很多工人。他們協同完成一個任務。
  • 線程就好比車間裡的工人。一個進程可以包括多個線程。
  • 車間的空間是工人們共享的,比如許多房間是每個工人都可以進出的。這象征一個進程的內存空間是共享的,每個線程都可以使用這些共享內存。
  • 可是,每間房間的大小不同,有些房間最多隻能容納一個人,比如廁所。裡面有人的時候,其他人就不能進去瞭。這代表一個線程使用某些共享內存時,其他線程必須等它結束,才能使用這一塊內存。
  • 一個防止他人進入的簡單方法,就是門口加一把鎖。先到的人鎖上門,後到的人看到上鎖,就在門口排隊,等鎖打開再進去。這就叫”互斥鎖”(Mutual exclusion,縮寫 Mutex),防止多個線程同時讀寫某一塊內存區域。
  • 還有些房間,可以同時容納n個人,比如廚房。也就是說,如果人數大於n,多出來的人隻能在外面等著。這好比某些內存區域,隻能供給固定數目的線程使用。
  • 這時的解決方法,就是在門口掛n把鑰匙。進去的人就取一把鑰匙,出來時再把鑰匙掛回原處。後到的人發現鑰匙架空瞭,就知道必須在門口排隊等著瞭。這種做法叫做”信號量”(Semaphore),用來保證多個線程不會互相沖突。
  • 不難看出,mutex是semaphore的一種特殊情況(n=1時)。也就是說,完全可以用後者替代前者。但是,因為mutex較為簡單,且效率高,所以在必須保證資源獨占的情況下,還是采用這種設計。

Python的多進程

Python的多進程依賴於multiprocess模塊;使用多進程可以利用多個CPU進行並行計算;

實例:

from multiprocessing import Process
import os
import time
 
def long_time_task(i):
    print('子進程: {} - 任務{}'.format(os.getpid(), i))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))
 
if __name__=='__main__':
    print('當前母進程: {}'.format(os.getpid()))
    start = time.time()
    p1 = Process(target=long_time_task, args=(1,))
    p2 = Process(target=long_time_task, args=(2,))
    print('等待所有子進程完成。')
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

新創建進程和進程間切換是需要消耗資源的,所以應該控制進程數量;

同時可運行的進程數量收到CPU核數限制;

進程池

使用進程池pool創建進程:

使用進程池可以避免手工進行進程的創建的麻煩,默認數量是CPU核數;

Pool類可以提供指定數量的進程供用戶使用,當有新的請求被提交到Pool中的時候,如果進程池還沒有滿,就會創建一個新的進程來執行請求;如果池已經滿瞭,請求就會等待,等到有空閑進程可以使用時,才會執行請求;

幾個方法:

1.apply_async

作用是向進程池提交需要執行的函數和參數,各個進程采用非阻塞的異步方式調用,每個進程隻管自己運行,是默認方式;

2.map

會阻塞進程直到返回結果;

3.map_sunc

非阻塞進程;

4.close

關閉進程池,不再接受任務;

5.terminate

結束進程;

6.join

主進程阻塞,直到子進程執行結束;

實例:

from multiprocessing import Pool, cpu_count
import os
import time
 
def long_time_task(i):
    print('子進程: {} - 任務{}'.format(os.getpid(), i))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))
 
if __name__=='__main__':
    print("CPU內核數:{}".format(cpu_count()))
    print('當前母進程: {}'.format(os.getpid()))
    start = time.time()
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('等待所有子進程完成。')
    p.close()
    p.join()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

在join之前,必須使用close或者terminate,讓進程池不再接受任務;

多進程間的數據通信與共享

通常,進程之間是相互獨立的,每個進程都有獨立的內存。通過共享內存(nmap模塊),進程之間可以共享對象,使多個進程可以訪問同一個變量(地址相同,變量名可能不同)。多進程共享資源必然會導致進程間相互競爭,所以應該盡最大可能防止使用共享狀態。還有一種方式就是使用隊列queue來實現不同進程間的通信或數據共享,這一點和多線程編程類似。

下例這段代碼中中創建瞭2個獨立進程,一個負責寫(pw), 一個負責讀(pr), 實現瞭共享一個隊列queue。

from multiprocessing import Process, Queue
import os, time, random
 
# 寫數據進程執行的代碼:
def write(q):
    print('Process to write: {}'.format(os.getpid()))
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())
 
# 讀數據進程執行的代碼:
def read(q):
    print('Process to read:{}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)
 
if __name__=='__main__':
    # 父進程創建Queue,並傳給各個子進程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啟動子進程pw,寫入:
    pw.start()
    # 啟動子進程pr,讀取:
    pr.start()
    # 等待pw結束:
    pw.join()
    # pr進程裡是死循環,無法等待其結束,隻能強行終止:
    pr.terminate()

Python的多線程

python 3中的多進程編程主要依靠threading模塊。創建新線程與創建新進程的方法非常類似。threading.Thread方法可以接收兩個參數, 第一個是target,一般指向函數名,第二個時args,需要向函數傳遞的參數。對於創建的新線程,調用start()方法即可讓其開始。我們還可以使用current_thread().name打印出當前線程的名字。 

import threading
import time
 
def long_time_task(i):
    print('當前子線程: {} 任務{}'.format(threading.current_thread().name, i))
    time.sleep(2)
    print("結果: {}".format(8 ** 20))
 
if __name__=='__main__':
    start = time.time()
    print('這是主線程:{}'.format(threading.current_thread().name))
    thread_list = []
    for i in range(1, 3):
        t = threading.Thread(target=long_time_task, args=(i, ))
        thread_list.append(t)
    for t in thread_list:
        t.start()
    for t in thread_list:
        t.join()
    end = time.time()
    print("總共用時{}秒".format((end - start)))

多線程間的數據共享

一個進程所含的不同線程間共享內存,這就意味著任何一個變量都可以被任何一個線程修改,因此線程之間共享數據最大的危險在於多個線程同時改一個變量,把內容給改亂瞭。如果不同線程間有共享的變量,其中一個方法就是在修改前給其上一把鎖lock,確保一次隻有一個線程能修改它。threading.lock()方法可以輕易實現對一個共享變量的鎖定,修改完後release供其它線程使用。

import threading
 
class Account:
    def __init__(self):
        self.balance = 0
 
    def add(self, lock):
        # 獲得鎖
        lock.acquire()
        for i in range(0, 100000):
            self.balance += 1
        # 釋放鎖
        lock.release()
 
    def delete(self, lock):
        # 獲得鎖
        lock.acquire()
        for i in range(0, 100000):
            self.balance -= 1
            # 釋放鎖
        lock.release()
 
if __name__ == "__main__":
    account = Account()
    lock = threading.Lock()
    # 創建線程
   thread_add = threading.Thread(target=account.add, args=(lock,), name='Add')
    thread_delete = threading.Thread(target=account.delete, args=(lock,), name='Delete')
 
    # 啟動線程
   thread_add.start()
    thread_delete.start()
 
    # 等待線程結束
   thread_add.join()
    thread_delete.join()
 
    print('The final balance is: {}'.format(account.balance))

使用queue隊列通信-經典的生產者和消費者模型

from queue import Queue
import random, threading, time
 
# 生產者類
class Producer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.queue = queue
 
    def run(self):
        for i in range(1, 5):
            print("{} is producing {} to the queue!".format(self.getName(), i))
            self.queue.put(i)
            time.sleep(random.randrange(10) / 5)
        print("%s finished!" % self.getName())
 
# 消費者類
class Consumer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.queue = queue
 
    def run(self):
        for i in range(1, 5):
            val = self.queue.get()
            print("{} is consuming {} in the queue.".format(self.getName(), val))
            time.sleep(random.randrange(10))
        print("%s finished!" % self.getName())
 
def main():
    queue = Queue()
    producer = Producer('Producer', queue)
    consumer = Consumer('Consumer', queue)
 
    producer.start()
    consumer.start()
 
    producer.join()
    consumer.join()
    print('All threads finished!')
 
if __name__ == '__main__':
    main()
  • 對CPU密集型代碼(比如循環計算) – 多進程效率更高
  • 對IO密集型代碼(比如文件操作,網絡爬蟲) – 多線程效率更高。

對於IO密集型操作,大部分消耗時間其實是等待時間,在等待時間中CPU是不需要工作的,那你在此期間提供雙CPU資源也是利用不上的,相反對於CPU密集型代碼,2個CPU幹活肯定比一個CPU快很多。那麼為什麼多線程會對IO密集型代碼有用呢?這時因為python碰到等待會釋放GIL供新的線程使用,實現瞭線程間的切換。

以上就是python 多進程和多線程使用詳解的詳細內容,更多關於python 多進程和多線程的資料請關註WalkonNet其它相關文章!

推薦閱讀:

    None Found