python 多線程實現多任務的方法示例

1 多線程實現多任務

1.1 什麼是線程?

        進程是操作系統分配程序執行資源的單位,而線程是進程的一個實體,是CPU調度和分配的單位。一個進程肯定有一個主線程,我們可以在一個進程裡創建多個線程來實現多任務。

1.2 一個程序實現多任務的方法

實現多任務,我們可以用幾種方法。

(1)在主進程裡面開啟多個子進程,主進程和多個子進程一起處理任務。

(2)在主進程裡開啟多個子線程,主線程和多個子線程一起處理任務。

(3)在主進程裡開啟多個協程,多個協程一起處理任務。

        註意:因為用多個線程一起處理任務,會產生線程安全問題,所以在開發中一般使用多進程+多協程來實現多任務。

1.3 多線程的創建方式

1.3.1 創建threading.Thread對象

import threading
p1 = threading.Thread(target=[函數名],args=([要傳入函數的參數]))
p1.start()  # 啟動p1線程

        我們來模擬一下多線程實現多任務。

        假如你在用網易雲音樂一邊聽歌一邊下載。網易雲音樂就是一個進程。假設網易雲音樂內部程序是用多線程來實現多任務的,網易雲音樂開兩個子線程。一個用來緩存音樂,用於現在的播放。一個用來下載用戶要下載的音樂的。這時候的代碼框架是這樣的:

import threading
import time
 
def listen_music(name):
    while True:
        time.sleep(1)
        print(name,"正在播放音樂")
 
 
def download_music(name):
    while True:
        time.sleep(2)
        print(name,"正在下載音樂")
 
 
if __name__ == '__main__':
    p1 = threading.Thread(target=listen_music,args=("網易雲音樂",))
    p2 = threading.Thread(target=download_music,args=("網易雲音樂",))
    p1.start()
    p2.start()

輸出:

觀察上面的輸出代碼可以知道:

CPU是按照時間片輪詢的方式來執行子線程的。cpu內部會合理分配時間片。時間片到a程序的時候,a程序如果在休眠,就會自動切換到b程序。

嚴謹來說,CPU在某個時間點,隻在執行一個任務,但是由於CPU運行速度和切換速度快,因為看起來像多個任務在一起執行而已。

1.3.2 繼承threading.Thread,並重寫run

        除瞭上面的方法創建線程,還有另一種方法。可以編寫一個類,繼承threaing.Thread類,然後重寫父類的run方法。

import threading
import time
 
class MyThread(threading.Thread):
    def run(self):
        for i in range(5):
            time.sleep(1)
            print(self.name,i)
 
t1 = MyThread()
t2 = MyThread()
t3 = MyThread()
t1.start()
t2.start()
t3.start()

輸出:

        運行時無序的,說明已經啟用瞭多任務。

下面是threading.Thread提供的線程對象方法和屬性:

  • start():創建線程後通過start啟動線程,等待CPU調度,為run函數執行做準備;
  • run():線程開始執行的入口函數,函數體中會調用用戶編寫的target函數,或者執行被重載的run函數;
  • join([timeout]):阻塞掛起調用該函數的線程,直到被調用線程執行完成或超時。通常會在主線程中調用該方法,等待其他線程執行完成。
  • name、getName()&setName():線程名稱相關的操作;
  • ident:整數類型的線程標識符,線程開始執行前(調用start之前)為None;
  • isAlive()、is_alive():start函數執行之後到run函數執行完之前都為True;
  • daemon、isDaemon()&setDaemon():守護線程相關;

1.4 線程何時開啟,何時結束

(1)子線程何時開啟,何時運行 當調用thread.start()時 開啟線程,再運行線程的代碼

(2)子線程何時結束 子線程把target指向的函數中的語句執行完畢後,或者線程中的run函數代碼執行完畢後,立即結束當前子線程

(3)查看當前線程數量 通過threading.enumerate()可枚舉當前運行的所有線程

(4)主線程何時結束 所有子線程執行完畢後,主線程才結束

示例一:

import threading
import time
  
def run():
    for i in range(5):
        time.sleep(1)
        print(i)
 
t1 = threading.Thread(target=run)
t1.start()
print("我會在哪裡出現")

輸出:

        為什麼主進程(主線程)的代碼會先出現呢?因為CPU采用時間片輪詢的方式,如果輪詢到子線程,發現他要休眠1s,他會先去運行主線程。所以說CPU的時間片輪詢方式可以保證CPU的最佳運行。

        那如果我想主進程輸出的那句話運行在結尾呢?該怎麼辦呢?這時候就需要用到 join() 方法瞭。

1.5 線程的 join() 方法

import threading
import time
 
def run():
    for i in range(5):
        time.sleep(1)
        print(i)
 
t1 = threading.Thread(target=run)
t1.start()
t1.join()  
print("我會在哪裡出現")

輸出:

        join() 方法可以阻塞主線程(註意隻能阻塞主線程其他子線程是不能阻塞的),直到 t1 子線程執行完,再解阻塞。

1.6 多線程共享全局變量出現的問題

        我們開兩個子線程,全局變量是0,我們每個線程對他自加1,每個線程加一百萬次,這時候就會出現問題瞭,來,看代碼:

import threading
import time
 
num = 0
 
def work1(loop):
    global num
    for i in range(loop):
        # 等價於 num += 1
        temp = num
        num = temp + 1
    print(num)
 
 
def work2(loop):
    global num
    for i in range(loop):
        # 等價於 num += 1
        temp = num
        num = temp + 1
    print(num)
 
 
if __name__ == '__main__':
    t1 = threading.Thread(target=work1,args=(1000000,))
    t2 = threading.Thread(target=work2, args=(1000000,))
    t1.start()
    t2.start()
 
    while len(threading.enumerate()) != 1:
        time.sleep(1)
    print(num)

輸出

1459526 # 第一個子線程結束後全局變量一共加到這個數
1588806 # 第二個子線程結束後全局變量一共加到這個數
1588806 # 兩個線程都結束後,全局變量一共加到這個數

        奇怪瞭,我不是每個線程都自加一百萬次嗎?照理來說,應該最後的結果是200萬才對的呀。問題出在哪裡呢?

        我們知道CPU是采用時間片輪詢的方式進行幾個線程的執行。

        假設我CPU先輪詢到work1(),num此時為100,在我運行到第10行時,時間結束瞭!此時,賦值瞭,但是還沒有自加!即temp=100num=100

        然後,時間片輪詢到瞭work2(),進行賦值自加。num=101瞭。

        又回到work1()的斷點處,num=temp+1,temp=100,所以num=101。

        就這樣!num少瞭一次自加!在次數多瞭之後,這樣的錯誤積累在一起,結果隻得到158806!

        這就是線程安全問題

1.7 互斥鎖可以彌補部分線程安全問題。(互斥鎖和GIL鎖是不一樣的東西!)

        當多個線程幾乎同時修改某一個共享數據的時候,需要進行同步控制

        線程同步能夠保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。

        互斥鎖為資源引入一個狀態:鎖定/非鎖定

        某個線程要更改共享數據時,先將其鎖定,此時資源的狀態為“鎖定”,其他線程不能更改;直到該線程釋放資源,將資源的狀態變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證瞭每次隻有一個線程進行寫入操作,從而保證瞭多線程情況下數據的正確性。

        互斥鎖有三個常用步驟:

lock = threading.Lock()  # 取得鎖
lock.acquire()  # 上鎖
lock.release()  # 解鎖

        下面讓我們用互斥鎖來解決上面例子的線程安全問題。

import threading
import time
 
num = 0
lock = threading.Lock()  # 取得鎖
def work1(loop):
    global num
    for i in range(loop):
        # 等價於 num += 1
        lock.acquire()  # 上鎖
        temp = num
        num = temp + 1
        lock.release()  # 解鎖
    print(num)
 
 
def work2(loop):
    global num
    for i in range(loop):
        # 等價於 num += 1
        lock.acquire()  # 上鎖
        temp = num
        num = temp + 1
        lock.release()  # 解鎖
    print(num)
 
 
if __name__ == '__main__':
    t1 = threading.Thread(target=work1,args=(1000000,))
    t2 = threading.Thread(target=work2, args=(1000000,))
    t1.start()
    t2.start()
 
    while len(threading.enumerate()) != 1:
        time.sleep(1)
    print(num)

輸出:

1945267 # 第一個子線程結束後全局變量一共加到這個數
2000000 # 第二個子線程結束後全局變量一共加到這個數
2000000 # 兩個線程都結束後,全局變量一共加到這個數

1.8 線程池ThreadPoolExecutor

        從Python3.2開始,標準庫為我們提供瞭concurrent.futures模塊,它提供瞭ThreadPoolExecutorProcessPoolExecutor兩個類,實現瞭對threadingmultiprocessing的進一步抽象(這裡主要關註線程池),不僅可以幫我們自動調度線程,還可以做到:

  • 主線程可以獲取某一個線程(或者任務的)的狀態,以及返回值。
  • 當一個線程完成的時候,主線程能夠立即知道。
  • 讓多線程和多進程的編碼接口一致。

1.8.1 創建線程池

示例:

from concurrent.futures import ThreadPoolExecutor
import time
 
# 參數times用來模擬網絡請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times
 
executor = ThreadPoolExecutor(max_workers=2)
# 通過submit函數提交執行的函數到線程池中,submit函數立即返回,不阻塞
task1 = executor.submit(get_html, (3))
task2 = executor.submit(get_html, (2))
# done方法用於判定某個任務是否完成
print("1: ", task1.done())
# cancel方法用於取消某個任務,該任務沒有放入線程池中才能取消成功
print("2: ", task2.cancel())
time.sleep(4)
print("3: ", task1.done())
# result方法可以獲取task的執行結果
print("4: ", task1.result())

輸出:

  • ThreadPoolExecutor構造實例的時候,傳入max_workers參數來設置線程池中最多能同時運行的線程數目。
  • 使用submit函數來提交線程需要執行的任務(函數名和參數)到線程池中,並返回該任務的句柄(類似於文件、畫圖),註意submit()不是阻塞的,而是立即返回。
  • 通過submit函數返回的任務句柄,能夠使用done()方法判斷該任務是否結束。上面的例子可以看出,由於任務有2s的延時,在task1提交後立刻判斷,task1還未完成,而在延時4s之後判斷,task1就完成瞭。
  • 使用cancel()方法可以取消提交的任務,如果任務已經在線程池中運行瞭,就取消不瞭。這個例子中,線程池的大小設置為2,任務已經在運行瞭,所以取消失敗。如果改變線程池的大小為1,那麼先提交的是task1,task2還在排隊等候,這是時候就可以成功取消。
  • 使用result()方法可以獲取任務的返回值。查看內部代碼,發現這個方法是阻塞的。

1.8.2 as_completed

        上面雖然提供瞭判斷任務是否結束的方法,但是不能在主線程中一直判斷啊。有時候我們是得知某個任務結束瞭,就去獲取結果,而不是一直判斷每個任務有沒有結束。這是就可以使用as_completed方法一次取出所有任務的結果。

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
 
# 參數times用來模擬網絡請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times
 
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並不是真的url
all_task = [executor.submit(get_html, (url)) for url in urls]
 
for future in as_completed(all_task):
    data = future.result()
    print("in main: get page {}s success".format(data))
 
# 執行結果
# get page 2s finished
# in main: get page 2s success
# get page 3s finished
# in main: get page 3s success
# get page 4s finished
# in main: get page 4s success

   as_completed()方法是一個生成器,在沒有任務完成的時候,會阻塞,在有某個任務完成的時候,會yield這個任務,就能執行for循環下面的語句,然後繼續阻塞住,循環到所有的任務結束。從結果也可以看出,先完成的任務會先通知主線程

1.8.3 map

        除瞭上面的as_completed方法,還可以使用executor.map方法,但是有一點不同。

from concurrent.futures import ThreadPoolExecutor
import time
 
# 參數times用來模擬網絡請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times
 
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並不是真的url
 
for data in executor.map(get_html, urls):
    print("in main: get page {}s success".format(data))
# 執行結果
# get page 2s finished
# get page 3s finished
# in main: get page 3s success
# in main: get page 2s success
# get page 4s finished
# in main: get page 4s success

        使用map方法,無需提前使用submit方法map方法python標準庫中的map含義相同,都是將序列中的每個元素都執行同一個函數。上面的代碼就是對urls的每個元素都執行get_html函數,並分配各線程池。可以看到執行結果與上面的as_completed方法的結果不同,輸出順序和urls列表的順序相同,就算2s的任務先執行完成,也會先打印出3s的任務先完成,再打印2s的任務完成。

1.8.4 wait

   wait方法可以讓主線程阻塞,直到滿足設定的要求。

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
import time
 
# 參數times用來模擬網絡請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times
 
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並不是真的url
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=ALL_COMPLETED)
print("main")
# 執行結果 
# get page 2s finished
# get page 3s finished
# get page 4s finished
# main

   wait方法接收3個參數,等待的任務序列、超時時間以及等待條件。等待條件return_when默認為ALL_COMPLETED,表明要等待所有的任務都結束。可以看到運行結果中,確實是所有任務都完成瞭,主線程才打印出main。等待條件還可以設置為FIRST_COMPLETED,表示第一個任務完成就停止等待。

2 多進程實行多任務

2.1 多線程的創建方式

創建進程的方式和創建線程的方式類似:

  • 實例化一個multiprocessing.Process的對象,並傳入一個初始化函數對象(initial function )作為新建進程執行入口;
  • 繼承multiprocessing.Process,並重寫run函數;

2.1.1 方式1

       在開始之前,我們要知道什麼是進程。道理很簡單,你平時電腦打開QQ客戶端,就是一個進程。再打開一個QQ客戶端,又是一個進程。那麼,在python中如何用一篇代碼就可以開啟幾個進程呢?通過一個簡單的例子來演示:

import multiprocessing
import time
 
 
def task1():
    while True:
        time.sleep(1)
        print("I am task1")
 
def task2():
    while True:
        time.sleep(2)
        print("I am task2")
 
 
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task1)  # multiprocessing.Process創建瞭子進程對象p1
    p2 = multiprocessing.Process(target=task2)  # multiprocessing.Process創建瞭子進程對象p2
    p1.start()  # 子進程p1啟動
    p2.start()  # 子進程p2啟動
    print("I am main task")  # 這是主進程的任務

輸出:

        可以看到子進程對象是由multiprocessing模塊中的Process類創建的。除瞭p1,p2兩個被創建的子進程外。當然還有主進程。主進程就是我們從頭到尾的代碼,包括子進程也是由主進程創建的。

註意的點有:

(1)首先解釋一下並發:並發就是當任務數大於cpu核數時,通過操作系統的各種任務調度算法,實現多個任務“一起”執行。(實際上總有一些任務不在執行,因為切換任務相當快,看上去想同時執行而已。)

(2)當是並發的情況下,子進程主進程的運行都是沒有順序的,CPU會采用時間片輪詢的方式,哪個程序先要運行就先運行哪個。

(3)主進程會默認等待所有子進程執行完畢後,它才會退出。所以在上面的例子中,p1,p2子進程是死循環進程,主進程的最後一句代碼print(“I am main task”)雖然運行完瞭,但是主進程並不會關閉,他會一直等待著子進程。

(4)主進程默認創建的是非守護進程。註意,結合3.和5.看。

(5)但是!如果子進程守護進程的話,那麼主進程運行完最後一句代碼後,主進程會直接關閉,不管你子進程運行完瞭沒有!

2.1.2 方式2

from multiprocessing import Process  
import os, time
 
class CustomProcess(Process):
    def __init__(self, p_name, target=None):
        # step 1: call base __init__ function()
        super(CustomProcess, self).__init__(name=p_name, target=target, args=(p_name,))
 
    def run(self):
        # step 2:
        # time.sleep(0.1)
        print("Custom Process name: %s, pid: %s "%(self.name, os.getpid()))
 
if __name__ == '__main__':
    p1 = CustomProcess("process_1")
    p1.start()
    p1.join()
    print("subprocess pid: %s"%p1.pid)
    print("current process pid: %s" % os.getpid())

輸出:

        這裡可以思考一下,如果像多線程一樣,存在一個全局的變量share_data,不同進程同時訪問share_data會有問題嗎?

        由於每一個進程擁有獨立的內存地址空間且互相隔離,因此不同進程看到的share_data是不同的、分別位於不同的地址空間,同時訪問不會有問題。這裡需要註意一下。

2.2 守護進程

 測試下:

import multiprocessing
import time
 
 
def task1():
    while True:
        time.sleep(1)
        print("I am task1")
 
def task2():
    while True:
        time.sleep(2)
        print("I am task2")
 
 
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task1)
    p2 = multiprocessing.Process(target=task2)
    p1.daemon = True  # 設置p1子進程為守護進程
    p2.daemon = True  # 設置p2子進程為守護進程
    p1.start()
    p2.start()
    print("I am main task")

輸出:

I am main task

輸出結果是不是有點奇怪。為什麼p1,p2子進程都沒有輸出的?

讓我們來整理一下思路:

  • 創建p1,p2子進程
  • 設置p1,p2子進程為守護進程
  • p1,p2子進程開啟
  • p1,p2子進程代碼裡面都有休眠時間,所以cpu為瞭不浪費時間,先做主進程後續的代碼。
  • 執行主進程後續的代碼,print(“I am main task”)
  • 主進程後續的代碼執行完成瞭,所以剩下的子進程是守護進程的,全都要關閉瞭。但是,如果主進程的代碼執行完瞭,有兩個子進程,一個是守護的,一個非守護的,怎麼辦呢?其實,他會等待非守護的那個子進程運行完,然後三個進程一起關閉。
  • p1,p2還在休眠時間內就被終結生命瞭,所以什麼輸出都沒有。

例如,把P1設為非守護進程:

import multiprocessing
import time
 
 
def task1():
    i = 1
    while i < 5:
        time.sleep(1)
        i += 1
        print("I am task1")
 
def task2():
    while True:
        time.sleep(2)
        print("I am task2")
 
 
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task1)
    p2 = multiprocessing.Process(target=task2)
    p2.daemon = True  # 設置p2子進程為守護進程
    p1.start()
    p2.start()
    print("I am main task")

輸出:

裡面涉及到兩個知識點:

(1)當主進程結束後,會發一個消息給子進程(守護進程),守護進程收到消息,則立即結束

(2)CPU是按照時間片輪詢的方式來運行多進程的。哪個合適的哪個運行,如果你的子進程裡都有time.sleep。那我CPU為瞭不浪費資源,肯定先去幹點其他的事情啊。

        那麼,守護進程隨時會被中斷,他的存在意義在哪裡的?

        其實,守護進程主要用來做與業務無關的任務,無關緊要的任務,可有可無的任務,比如內存垃圾回收,某些方法的執行時間的計時等。

2.3 創建的子進程要傳入參數

import multiprocessing
 
 
def task(a,b,*args,**kwargs):
    print("a")
    print("b")
    print(args)
    print(kwargs)
 
 
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task,args=(1,2,3,4,5,6),kwargs={"name":"chichung","age":23})
    p1.start()
    print("主進程已經運行完最後一行代碼啦")

輸出:

        子進程要運行的函數需要傳入變量a,b,一個元組,一個字典。我們創建子進程的時候,變量a,b要放進元組裡面,task函數取的時候會把前兩個取出來,分別賦值給a,b瞭。

2.4 子進程幾個常用的方法

 

p.start 開始執行子線程
p.name 查看子進程的名稱
p.pid 查看子進程的id
p.is_alive 判斷子進程是否存活
p.join(timeout)

阻塞主進程,當子進程p運行完畢後,再解開阻塞,讓主進程運行後續的代碼

如果timeout=2,就是阻塞主進程2s,這2s內主進程不能運行後續的代碼。過瞭2s後,就算子進程沒有運行完畢,主進程也能運行後續的代碼

p.terminate 終止子進程p的運行
import multiprocessing
 
def task(a,b,*args,**kwargs):
    print("a")
    print("b")
    print(args)
    print(kwargs)
 
 
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task,args=(1,2,3,4,5,6),kwargs={"name":"chichung","age":23})
    p1.start()
    print("p1子進程的名字:%s" % p1.name)
    print("p1子進程的id:%d" % p1.pid)
    p1.join()
    print(p1.is_alive())

輸出:

2.5 進程之間是不可以共享全局變量

        進程之間是不可以共享全局變量的,即使子進程與主進程。道理很簡單,一個新的進程,其實就是占用一個新的內存空間,不同的內存空間,裡面的變量肯定不能夠共享的。實驗證明如下:

示例一:

import multiprocessing
 
g_list = [123]
 
def task1():
    g_list.append("task1")
    print(g_list)
 
def task2():
    g_list.append("task2")
    print(g_list)
 
def main_process():
    g_list.append("main_processs")
    print(g_list)
 
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task1)
    p2 = multiprocessing.Process(target=task2)
    p1.start()
    p2.start()
    main_process()
    print("11111: ", g_list)

輸出:

[123, ‘main_processs’]
11111: [123, ‘main_processs’]
[123, ‘task1’]
[123, ‘task2’]

 示例二:

import multiprocessing
import time
 
 
def task1(loop):
    global num
    for i in range(loop):
        # 等價於 num += 1
        temp = num
        num = temp + 1
    print(num)
    print("I am task1")
 
def task2(loop):
    global num
    for i in range(loop):
        # 等價於 num += 1
        temp = num
        num = temp + 1
    print(num)
    print("I am task2")
 
 
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task1, args=(100000,)  # multiprocessing.Process創建瞭子進程對象p1
    p2 = multiprocessing.Process(target=task2, args=(100000,)  # multiprocessing.Process創建瞭子進程對象p2
    p1.start()  # 子進程p1啟動
    p2.start()  # 子進程p2啟動
    print("I am main task")  # 這是主進程的任務

輸出:

2.6 python進程池:multiprocessing.pool

        進程池可以理解成一個隊列,該隊列可以容易指定數量的子進程,當隊列被任務占滿之後,後續新增的任務就得排隊,直到舊的進程有任務執行完空餘出來,才會去執行新的任務。

        在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,並行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。

        Pool可以提供指定數量的進程供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,才會創建新的進程來它。

2.6.1 使用進程池(非阻塞)

#coding: utf-8
import multiprocessing
import time
 
def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
 
if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3) # 設定進程的數量為3
    for i in range(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #維持執行的進程總數為processes,當一個進程執行完畢後會添加新的進程進去
 
    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束
    print("Sub-process(es) done.")

輸出:

函數解釋

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解區別,看例1例2結果區別)
  • close() 關閉pool,使其不在接受新的任務。
  • terminate() 結束工作進程,不在處理未完成的任務。
  • join() 主進程阻塞,等待子進程的退出, join方法要在close或terminate之後使用。

apply(), apply_async():

  • apply(): 阻塞主進程, 並且一個一個按順序地執行子進程, 等到全部子進程都執行完畢後 ,繼續執行 apply()後面主進程的代碼
  • apply_async() 非阻塞異步的, 他不會等待子進程執行完畢, 主進程會繼續執行, 他會根據系統調度來進行進程切換

執行說明:創建一個進程池pool,並設定進程的數量為3,xrange(4)會相繼產生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數為3,所以0、1、2會直接送到進程中執行,當其中一個執行完事後才空出一個進程處理對象3,所以會出現輸出“msg: hello 3”出現在”end”後。因為為非阻塞,主函數會自己執行自個的,不搭理進程的執行,所以運行完for循環後直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()處等待各個進程的結束。

2.6.2 使用進程池(阻塞)

#coding: utf-8
import multiprocessing
import time
 
def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
 
if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3) # 設定進程的數量為3
    for i in range(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   #維持執行的進程總數為processes,當一個進程執行完畢後會添加新的進程進去
 
    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    pool.close()
    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的進程加入到pool,join函數等待所有子進程結束
    print("Sub-process(es) done.")

輸出:

2.6.3 使用進程池,並關註結果

import multiprocessing
import time
 
def func(msg):
    print("msg:", msg)
    time.sleep(3)
    print("end")
    return "done" + msg
 
if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in range(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print(":::", res.get())
    print("Sub-process(es) done.")

輸出:

  :get()函數得出每個返回結果的值

3 python多線程與多進程比較

先來看兩個例子:

(1)示例一,多線程與單線程,開啟兩個python線程分別做一億次加一操作,和單獨使用一個線程做一億次加一操作:

import threading
import time
 
def tstart(arg):
    var = 0
    for i in range(100000000):
        var += 1
    print(arg, var)
 
if __name__ == '__main__':
    t1 = threading.Thread(target=tstart, args=('This is thread 1',))
    t2 = threading.Thread(target=tstart, args=('This is thread 2',))
    start_time = time.time()
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Two thread cost time: %s" % (time.time() - start_time))
    start_time = time.time()
    tstart("This is thread 0")
    print("Main thread cost time: %s" % (time.time() - start_time))

輸出:

 上面的例子如果隻開啟t1和t2兩個線程中的一個,那麼運行時間和主線程基本一致。

 (2)示例二,使用兩個進程

from multiprocessing import Process  
import os, time
 
def pstart(arg):
    var = 0
    for i in range(100000000):
        var += 1
    print(arg, var)
 
if __name__ == '__main__':
    p1 = Process(target = pstart, args = ("1", ))
    p2 = Process(target = pstart, args = ("2", ))
    start_time = time.time()
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("Two process cost time: %s" % (time.time() - start_time))
    start_time = time.time()
    pstart("0")
    print("Current process cost time: %s" % (time.time() - start_time))

輸出:

 對比分析:

        雙進程並行執行單進程執行相同的運算代碼,耗時基本相同,雙進程耗時會稍微多一些,可能的原因是進程創建和銷毀會進行系統調用,造成額外的時間開銷。

        但是對於python線程,雙線程並行執行耗時比單線程要高的多,效率相差近10倍。如果將兩個並行線程改成串行執行,即:

import threading
import time
 
def tstart(arg):
    var = 0
    for i in range(100000000):
        var += 1
    print(arg, var)
 
if __name__ == '__main__':
    t1 = threading.Thread(target=tstart, args=('This is thread 1',))
    t2 = threading.Thread(target=tstart, args=('This is thread 2',))
    start_time = time.time()
    t1.start()
    t1.join()
    print("thread1 cost time: %s" % (time.time() - start_time))
    start_time = time.time()
    t2.start()
    t2.join()
    print("thread2 cost time: %s" % (time.time() - start_time))
    start_time = time.time()
    tstart("This is thread 0")
    print("Main thread cost time: %s" % (time.time() - start_time))

輸出:

可以看到三個線程串行執行,每一個執行的時間基本相同。

本質原因雙線程是並發執行的,而不是真正的並行執行。原因就在於GIL鎖

4 GIL鎖

        提起python多線程就不得不提一下GIL(Global Interpreter Lock 全局解釋器鎖),這是目前占統治地位的python解釋器CPython中為瞭保證數據安全所實現的一種鎖。不管進程中有多少線程,隻有拿到瞭GIL鎖的線程才可以在CPU上運行,即使是多核處理器對一個進程而言,不管有多少線程,任一時刻,隻會有一個線程在執行。對於CPU密集型的線程,其效率不僅僅不高,反而有可能比較低。python多線程比較適用於IO密集型的程序。對於的確需要並行運行的程序,可以考慮多進程。

        多線程對鎖的爭奪,CPU對線程的調度,線程之間的切換等均會有時間開銷。

5 線程和進程比較

5.1 線程和進程的區別

下面簡單的比較一下線程與進程

  • 進程是資源分配的基本單位,線程是CPU執行和調度的基本單位;
  • 通信/同步方式:
    • 進程:
      • 通信方式:管道,FIFO,消息隊列,信號,共享內存,socket,stream流;
      • 同步方式:PV信號量,管程
    • 線程:
      • 同步方式:互斥鎖,遞歸鎖,條件變量,信號量
      • 通信方式:位於同一進程的線程共享進程資源,因此線程間沒有類似於進程間用於數據傳遞的通信方式,線程間的通信主要是用於線程同步。
  • CPU上真正執行的是線程,線程比進程輕量,其切換和調度代價比進程要小;
  • 線程間對於共享的進程數據需要考慮線程安全問題,由於進程之間是隔離的,擁有獨立的內存空間資源,相對比較安全,隻能通過上面列出的IPC(Inter-Process Communication)進行數據傳輸;
  • 系統有一個個進程組成,每個進程包含代碼段、數據段、堆空間和棧空間,以及操作系統共享部分 ,有等待,就緒和運行三種狀態;
  • 一個進程可以包含多個線程,線程之間共享進程的資源(文件描述符、全局變量、堆空間等),寄存器變量和棧空間等是線程私有的;
  • 操作系統中一個進程掛掉不會影響其他進程,如果一個進程中的某個線程掛掉而且OS對線程的支持是多對一模型,那麼會導致當前進程掛掉;
  • 如果CPU和系統支持多線程與多進程,多個進程並行執行的同時,每個進程中的線程也可以並行執行,這樣才能最大限度的榨取硬件的性能;

å¨è¿éæå¥å¾çæè¿°

5.2 線程和進程的上下文切換

進程切換過程切換牽涉到非常多的東西,寄存器內容保存到任務狀態段TSS,切換頁表,堆棧等。簡單來說可以分為下面兩步:

  • 頁全局目錄切換,使CPU到新進程的線性地址空間尋址;
  • 切換內核態堆棧和硬件上下文,硬件上下文包含CPU寄存器的內容,存放在TSS中;

線程運行於進程地址空間,切換過程不涉及到空間的變換,隻牽涉到第二步;

5.3 使用多線程還是多進程?

  • CPU密集型:程序需要占用CPU進行大量的運算和數據處理;適合多進程;
  • I/O密集型:程序中需要頻繁的進行I/O操作;例如網絡中socket數據傳輸和讀取等;適合多線程

        由於python多線程並不是並行執行,因此較適合與I/O密集型程序,多進程並行執行適用於CPU密集型程序;

python多線程實現多任務:https://www.cnblogs.com/chichung/p/9566734.html

python通過多進程實行多任務:https://www.cnblogs.com/chichung/p/9532962.html

python多線程與多進程及其區別:https://www.cnblogs.com/yssjun/p/11302500.html

python進程池:multiprocessing.pool:https://www.cnblogs.com/kaituorensheng/p/4465768.html

到此這篇關於python 多線程實現多任務的方法示例的文章就介紹到這瞭,更多相關python 多線程實現多任務的方法示例內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: