C++ 多線程編程建議之 C++ 對多線程/並發的支持(下)

前言:

本文承接前文  C++ 對多線程/並發的支持(上) ,翻譯自 C++ 之父 Bjarne Stroustrup 的 C++ 之旅(A Tour of C++)一書的第 13 章 Concurrency。本文將繼續介紹 C++ 並發中的 future/promise,packaged_task 以及 async() 的用法。

1、通信任務

標準庫還在頭文件 <future> 中提供瞭一些機制,能夠讓編程人員基於更高的抽象層次任務來開發,而不是直接使用低層的線程、鎖:

  • future promise:用於從任務(另一個線程)中返回一個值
  • packaged_task:幫助啟動任務,封裝瞭 future promise,並且建立兩者之間的關聯
  • async() :像調用一個函數那樣啟動一個任務。形式最簡單,但也最強大!

1.1 future 和 promise

future promise 可以在兩個任務之間傳值,而無需顯式地使用鎖,實現瞭高效地數據傳輸。其基本想法很簡單:當一個任務向另一個任務傳值時,把值放入 promise,通過特定的實現,使得值可以通過與之關聯的 future 讀出(一般誰啟動瞭任務,誰從 future 中取結果)。

假如有一個 future<X>fx,我們可以通過 get() 獲取類型 X 的值:

X v = fx.get(); // if necessary, wait for the value to get computed

如果值還沒有計算出,則調用 get() 的線程阻塞,直到有值返回。如果值無法計算出,get()可能拋出異常。

promise 的主要目的是提供一個簡單的“put”的操作(set_value set_exception),和 future get() 相呼應。

如果你有一個 promise,需要發送一個類型為 X 的結果到一個 future,你要麼傳遞一個值,要麼傳遞一個異常。舉個例子:

void f(promise<X>& px) // 一個任務:把結果放入 px
{
    try {
        X res;
        // 計算 res 的值
        px.set_value(res);
    }
    catch(...) { // 如果無法計算 res 的值
        px.set_exception(current_exception()); // 傳異常到 future 的線程
    }
}


current_exception() 即捕獲到的異常。

要處理通過 future 傳遞的異常,get() 的調用者必須在什麼地方捕獲,例如:

void g(future<X>& fx) // 一個任務;從 fx 提取結果
{
    try {
        X v = fx.get(); // 如有必要,等待值計算完成
        // 使用 v
    }
    catch(...){ // 無法計算 v
        // 錯誤處理
    }
}

如果 g() 不需要自己處理錯誤,代碼可以進一步簡化:

void g(future<X>& fx) // 一個任務;從 fx 提取結果
{
    X v = fx.get(); // 如有必要,等待值計算完成
    // 使用 v
}

思考:future 和 promise 是怎麼關聯起來的?

1.2 packaged_task

如何把 future 放入一個需要結果的任務,並且把與之關聯的、產生結果的 promise 放入線程?packaged_task 可以簡化任務的設置,關聯 future/promisepackaged_task 封裝瞭把返回值或異常放入 promise 的操作,並且調用 packaged_task get_future() 方法,可以得到一個與 promise 關聯的 future。舉個例子,我們可以設置兩個任務,借助標準庫的 accumulate() 分別累加 vector<double> 的前後部分:

double accum (double* beg, double* end, double init) // 計算以 init 為初值,[beg,end) 的和
{
    return accumulate(beg,end,init);
}

double comp2(vector<double>& v)
{
    using Task_type = double(double*,double*,double); // 任務的類型

    packaged_task<Task_type> pt0 {accum}; // 打包任務(即 accum)
    packaged_task<Task_type> pt1 {accum};

    future<double> f0 {pt0.get_future()}; // 取得 pt0 的 future
    future<double> f1 {pt1.get_future()}; // 取得 pt1 的 future

    double* first = &v[0];
    thread t1{move(pt0),first,first+v.size()/2,0};          // 為 pt0 啟動線程
    thread t2{move(pt1),first+v.size()/2,first+v.size(),0}; // 為 pt1 啟動線程

    return f0.get() + f1.get();
}

packaged_task 模板以任務的類型(Task_type,double(double*,double*,double) 的別名)作為其模板參數,以任務(accum)作為其構造函數的參數。move() 操作是必要的,因為 packaged_task 不可拷貝(隻能移動)。packaged_task 不可拷貝是因為它是一個資源處理程序(resource handler),擁有 promise 的所有權,並且(間接地)負責與之關聯的任務可能擁有的資源。

請註意,這裡的代碼沒有顯式地使用鎖:我們能夠專註於要完成的任務,而不是來管理它們通信的機制。這兩個任務在不同的線程中執行,具有瞭潛在的並發性。

1.3 async()

我在本章所追求的思路,最簡單,但也非常強大:把任務看成是一個恰巧可能和其他任務同時運行的函數。這並不是 C++ 標準庫所支持的唯一模型,但它能很好地滿足各類廣泛的需求。其他更微妙、棘手的模型,如依賴於共享內存的編程風格也可以根據實際需要使用。

要啟動潛在異步執行的任務,我們可以用 async():

double comp4(vector<double>& v) // 如果 v 足夠大,派生多個任務
{
    if(v.size()<10000) // 犯得著用並發嗎?
        return accum(v.begin(),v.end(),0);
    
    auto v0 = &v[0];
    auto sz = v.size();
    
    auto f0 = async(accum,v0,v0+sz/4,0.0);
    auto f1 = async(accum,v0+sz/4,v0+sz/2,0.0);
    auto f2 = async(accum,v0+sz/2,v0+sz*3/4,0.0);
    auto f3 = async(accum,v0+sz*3/4,v0+sz,0.0);
    
    return f0.get()+f1.get()+f2.get()+f3.get(); // 收集 4 部分的結果,求和
}

大體上,async() 把“調用部分”和“獲取結果部分“分離開來,並且將兩者和實際執行的任務分離。使用 async() 你不需要考慮線程、鎖;你隻要從任務(潛在地、異步地計算結果)的角度去考慮就可以瞭。async() 也有明顯的限制:使用瞭共享資源、需要上鎖的任務無法使用 async() ,你甚至不知道會用到多少線程,這完全是由 async() 決定的,它會根據調用時系統可用資源的情況,決定使用多少線程。例如,async() 在決定使用幾個線程前,會檢查有多少核心(處理器)空閑。

示例代碼中的猜測計算開銷和啟動線程的相對開銷(v.size()<10000)隻是一個很原始、粗略的性能估計。這裡不適合展開討論怎麼去管理線程,但這個估計僅僅是一個簡單(可能很爛)的猜測。

請註意,async()不僅僅是專門用於並行計算、提高性能的機制。例如,它也能用於派生任務,從用戶獲取輸入,讓“主程序”忙其他事情。

2、建議

使用並發改善響應性和吞吐量
盡可能在最高級別的抽象上工作(比如優先考慮 asyncpackaged_task 而不是 threadmutex
考慮使用進程作為線程的替代方案
標準庫的並發支持是類型安全的
內存模型把多數程序員從考慮機器架構的工作中解放出來
內存模型使得內存的表現和我們的預期基本一致
原子操作為無鎖編程提供瞭可能性
把無鎖編程留給專傢
有時順序操作比起並發更簡單、更快
避免數據競爭(不受控地同時訪問可變數據)
std::thread 是類型安全的系統線程接口
join() 等待一個線程結束
盡量避免顯式共享數據
unique_lock 管理 mutexes
lock() 一次性獲取多個鎖
condition_variable 管理線程之間的通信
從(可以並行執行的)任務的角度思考,而非線程
不要低估“簡單性”的價值
選擇 packaged_task future,而不是直接使用 thread mutex
promise 返回結果,從 future 獲取結果
packaged_task 處理任務拋出的異常或返回值
packaged_task future 來表示對外部服務的請求,以及等待其回復
async() 啟動簡單的任務

到此這篇關於 C++ 多線程編程建議之 C++ 對多線程/並發的支持的文章就介紹到這瞭,更多相關C++ 對多線程/並發的支持內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

上一篇:  C++ 對多線程/並發的支持(上)

推薦閱讀: