Python線程池thread pool創建使用及實例代碼分享

前言

首先線程和線程池不管在哪個語言裡面,理論都是通用的。對於開發來說,解決高並發問題離不開對多個線程處理。我們先從線程到線程池,從每個線程的運行到多個線程並行,再到線程池管理。由淺入深的理解如何在實際開發中,使用線程池來提高處理線程的效率。

一、線程

1.線程介紹

線程(英語:thread)是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以並發多個線程,每條線程並行執行不同的任務。在Unix System V及Sun中也被稱為輕量進程(lightweight processes),但輕量進程更多指內核線程(kernel thread),而把用戶線程(user thread)稱為線程。

60年代,在OS中能擁有資源和獨立運行的基本單位是進程,然而隨著計算機技術的發展,進程出現瞭很多弊端,一是由於進程是資源擁有者,創建、撤消與切換存在較大的時空開銷,因此需要引入輕型進程;二是由於對稱多處理機(SMP)出現,可以滿足多個運行單位,而多個進程並行開銷過大。因此在80年代,出現瞭能獨立運行的基本單位——線程(Threads)。

線程是獨立調度和分派的基本單位。線程可以為操作系統內核調度的內核線程,如Win32線程;由用戶進程自行調度的用戶線程,如Linux平臺的POSIX Thread;或者由內核與用戶進程,如Windows 10的線程,進行混合調度。

同一進程中的多條線程將共享該進程中的全部系統資源,如虛擬地址空間,文件描述符和信號處理等等。但同一進程中的多個線程有各自的調用棧(call stack),自己的寄存器環境(register context),自己的線程本地存儲(thread-local storage)。

一個進程可以有很多線程,每條線程並行執行不同的任務。        

2.線程特性

輕型實體

線程中的實體基本上不擁有系統資源,隻是有一點必不可少的、能保證獨立運行的資源。

線程的實體包括程序、數據和TCB。線程是動態概念,它的動態特性由線程控制塊TCB(Thread Control Block)描述。TCB包括以下信息:

  • (1)線程狀態
  • (2)當線程不運行時,被保存的現場資源。
  • (3)一組執行堆棧。
  • (4)存放每個線程的局部變量主存區。
  • (5)訪問同一個進程中的主存和其它資源。

用於指示被執行指令序列的、保留局部變量、少數狀態參數和返回地址等的一組寄存器和堆棧。

獨立調度和分派的基本單位

在多線程OS中,線程是能獨立運行的基本單位,因而也是獨立調度和分派的基本單位。由於線程很“輕”,故線程的切換非常迅速且開銷小(在同一進程中的)。

可並發執行

在一個進程中的多個線程之間,可以並發執行,甚至允許在一個進程中所有線程都能並發執行;同樣,不同進程中的線程也能並發執行,充分利用和發揮瞭處理機與外圍設備並行工作的能力。

共享進程資源

在同一進程中的各個線程,都可以共享該進程所擁有的資源,這首先表現在:所有線程都具有相同的地址空間(進程的地址空間),這意味著,線程可以訪問該地址空間的每一個虛地址;此外,還可以訪問進程所擁有的已打開文件、定時器等。由於同一個進程內的線程共享內存和文件,所以線程之間互相通信不必調用內核。

二、線程池

線程池(英語:thread pool):一種線程使用模式。線程過多會帶來調度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著監督管理者分配可並發執行的任務。這避免瞭在處理短時間任務時創建與銷毀線程的代價。線程池不僅能夠保證內核的充分利用,還能防止過分調度。可用線程數量應該取決於可用的並發處理器、處理器內核、內存、網絡sockets等的數量。 例如,線程數一般取cpu數量+2比較合適,線程數過多會導致額外的線程切換開銷。

任務調度以執行線程的常見方法是使用同步隊列,稱作任務隊列。池中的線程等待隊列中的任務,並把執行完的任務放入完成隊列中。

線程池模式一般分為兩種:HS/HA半同步/半異步模式、L/F領導者與跟隨者模式。

HS/HA半同步/半異步模式:

半同步/半異步模式又稱為生產者消費者模式,是比較常見的實現方式,比較簡單。分為同步層、隊列層、異步層三層。同步層的主線程處理工作任務並存入工作隊列,工作線程從工作隊列取出任務進行處理,如果工作隊列為空,則取不到任務的工作線程進入掛起狀態。由於線程間有數據通信,因此不適於大數據量交換的場合。

L/F領導者與跟隨者模式:

        領導者跟隨者模式,在線程池中的線程可處在3種狀態之一:領導者leader、追隨者follower或工作者processor。任何時刻線程池隻有一個領導者線程。事件到達時,領導者線程負責消息分離,並從處於追隨者線程中選出一個來當繼任領導者,然後將自身設置為工作者狀態去處置該事件。處理完畢後工作者線程將自身的狀態置為追隨者。這一模式實現復雜,但避免瞭線程間交換任務數據,提高瞭CPU cache相似性。在ACE(Adaptive Communication Environment)中,提供瞭領導者跟隨者模式實現。

線程池的伸縮性對性能有較大的影響。

  • 創建太多線程,將會浪費一定的資源,有些線程未被充分使用。
  • 銷毀太多線程,將導致之後浪費時間再次創建它們。
  • 創建線程太慢,將會導致長時間的等待,性能變差。
  • 銷毀線程太慢,導致其它線程資源饑餓。

在面向對象編程中,創建和銷毀對象是很費時間的,因為創建一個對象要獲取內存資源或者其它更多資源。在Java中更是如此,虛擬機將試圖跟蹤每一個對象,以便能夠在對象銷毀後進行垃圾回收。所以提高服務程序效率的一個手段就是盡可能減少創建和銷毀對象的次數,特別是一些很耗資源的對象創建和銷毀。如何利用已有對象來服務就是一個需要解決的關鍵問題,其實這就是一些""池化資源""技術產生的原因。比如大傢所熟悉的數據庫連接池正是遵循這一思想而產生的,本文將介紹的線程池技術同樣符合這一思想。

三、線程池的設計思路

首先我們根據上述已經瞭解瞭線程和線程池創建目的以及作用。讓我們自己思考一下,如果是自己的業務上要用到大量的請求或者是查詢處理,而我們隻能的機器並不能一下就接受這麼多的task湧入計算,這將消耗我們計算機大量資源。這時我們就該創建線程池來對線程進行管理,我們可以給線程預留一定的空間,讓請求逐個進入線程處理,當請求超過我們給的線程數量時,等一個線程跑完瞭再跑下一個,這樣就不會造成資源的浪費和達到資源重復利用。

那麼我們建立線程池的思路就有一下幾點:

  • 控制線程,給予每個線程任務保證線程正常運行。
  • 限制線程數量,保證系統有足夠的運行空間。
  • 資源復用,保證每個線程運行完成任務後能再度利用。
  • 控制運行時間,線程運行超過一定時間後停止任務轉接下個任務,防止線程堵塞。

有瞭這些思路,我們就可以充分利用Python自帶的庫來構建線程池瞭。

四、Python線程池構建

1.構建思路

第一步,我們需要在線程池裡面創建出很多個線程。第二步,當得到一個請求時候,就使用一個線程來運行·它。第三步,若多個任務則分配多個線程來運行。當其中一個線程運行完它的任務之後,將再次進行下一個任務使用。

2.實現庫功能函數

首先python標準庫裡面是有threading庫的,但是該庫並沒有線程池這個模塊。要快速構建線程池,可以利用concurrent.futures,該庫提供瞭ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現瞭對threading和multiprocessing的進一步抽象。

這裡我們隻討論ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor

這裡我們可以看JAVA關於線程池的設計:

構造方法:

public ThreadPoolExecutor(int corePoolSize, //核心線程數量
                              int maximumPoolSize,//     最大線程數
                              long keepAliveTime, //       最大空閑時間
                              TimeUnit unit,         //        時間單位
                              BlockingQueue<Runnable> workQueue,   //   任務隊列
                              ThreadFactory threadFactory,    // 線程工廠
                              RejectedExecutionHandler handler  //  飽和處理機制
    ) 
{ ... }

參數和Python創建線程池是一樣的,python創建線程池:

#encoding:utf-8
from concurrent.futures import ThreadPoolExecutor
import threading
#創建一個包含2條線程的線程池
pool = ThreadPoolExecutor(max_workers = 2)  #定義兩個線程

這樣就建立瞭一條簡單的線程池,其中最大線程數為2 .

def task(i):
    sleep_seconds = random.randint(1, 3)    #隨機睡眠時間
    print('線程名稱:%s,參數:%s,睡眠時間:%s' % (threading.current_thread().name, i, sleep_seconds))
    time.sleep(sleep_seconds)   #定義睡眠時間
for i in range(10):#創建十個任務
    future1 = pool.submit(task, i)

ThreadPoolExecutor()

構造線程池實例,傳入max_workers可以設置線程池中最多能同時運行的線程數目

submit()

提交線程需要執行的任務(函數名和參數)到線程池中,立刻返回一個future對象。

result()

取task的執行結果:

cancel()

取消該 Future 代表的線程任務。如果該任務正在執行,不可取消,則該方法返回 False;否則,程序會取消該任務,並返回 True。

 調高點睡眠時間:

cancelled()

返回 Future 代表的線程任務是否被成功取消。

for i in range(5):#創建十個線程
    future1 = pool.submit(task, i)
    print(future1.cancelled())

running()

for i in range(5):#創建十個線程
    future1 = pool.submit(task, i)
    print(future1.running())

as_completed()

as_completed()方法是一個生成器,在沒有任務完成的時候,會阻塞,在有某個任務完成的時候,會yield這個任務,就能執行for循環下面的語句,然後繼續阻塞住,循環到所有的任務結束。從結果也可以看出,先完成的任務會先通知主線程。

map()

除瞭submit,ThreadPoolExecutor還提供瞭map函數來添加線程,與常規的map類似,區別在於線程池的 map() 函數會為 iterables 的每個元素啟動一個線程,以並發方式來執行 func 函數. 同時,使用map函數,還會自動獲取返回值。

#向線程池提交5個任務
x = np.arange(5)
for i in pool.map(task,x):
      print('successful')

到此這篇關於Python線程池(thread pool)創建使用及實例代碼分享的文章就介紹到這瞭,更多相關Python線程池創建內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: