C++ 對多線程/並發的支持(上)

前言:

本文翻譯自 C++ 之父 Bjarne Stroustrup 的 C++ 之旅( A Tour of C++ )一書的第 13 章 Concurrency。作者用短短數十頁,帶你一窺現代 C++ 對並發/多線程的支持。原文地址:現代 C++ 對多線程/並發的支持(上) — 節選自 C++ 之父的 《 A Tour of C++ 》 水平有限,有條件的建議直接閱讀原版書籍。

1、 並發介紹

並發,即同時執行多個任務,常用來提高吞吐量(通過利用多處理器進行同一個計算)或者改善響應性(等待回復的時候,允許程序的其他部分繼續執行)。所有現代語言都支持並發。C++ 標準庫提供瞭可移植、類型安全的並發支持,經過 20 多年的發展,幾乎被所有現代硬件所支持。標準庫提供的主要是系統級的並發支持,而非復雜的、更高層次的並發模型;其他庫可以基於標準庫,提供更高級別的並發支持。

C++ 提供瞭適當的內存模型(memory model)和一組原子操作(atomic operation),以支持在同一地址空間內並發執行多個線程。原子操作使得無鎖編程成為可能。內存模型保證瞭在避免數據競爭(data races,不受控地同時訪問可變數據)的前提下,一切按照預期工作。

本章將給出標準庫對並發的主要支持示例:threadmutexlock()packaged_task 以及 future。這些特征直接基於操作系統構建,相較於操作系統原生支持,不會帶來性能損失,也不保證會有顯著的性能提升。

那為什麼要用標準庫而非操作系統的並發?可移植性。

不要把並發當作靈丹妙藥:如果順序執行可以搞定,通常順序會比並發更簡單、更快速!

2、 任務和線程

如果一個計算有可能(potentially)和另一個計算並發執行,我們稱之為任務(task)。線程是任務的系統級表示。任務可以通過構造一個 std::thread 來啟動,任務作為參數。

  • 任務是一個函數或者函數對象。
  • 任務是一個函數或者函數對象。
  • 任務是一個函數或者函數對象。
void f();              // 函數

struct F {             // 函數對象
    void operator()()  // F 的調用操作符
};

void user()
{
    thread t1 {f};     // f() 在另一個線程中執行
    thread t2 {F()};   // F()() 在另一個線程中執行

    t1.join();  // 等待 t1
    t2.join();  // 等待 t2
}

join() 確保線程完成後才退出 user() ,“join 線程”的意思是“等待線程結束”。

一個程序的線程共享同一地址空間。線程不同於進程,進程通常不直接共享數據。線程間可以通過共享對象(shared object)通信,這類通信一般用鎖或其他機制控制,以避免數據競爭。

編寫並發任務可能會非常棘手,假如上述例子中的 f 和 F 實現如下:

void f() {cout << "Hello ";}

struct F {
    void operator()() {cout << "Parallel World!\n";}
};

這裡有個嚴重的錯誤:f 和 F() 都用到瞭 cout 對象,卻沒有任何形式的同步。這會導致輸出的結果不可預測,多次執行的結果可能會得到不同的結果:因為兩個任務的執行順序是未定義的。程序可能產生詭異的輸出,比如:

PaHerallllel o World!


定義一個並發程序中的任務時,我們的目標是保持任務之間完全獨立。最簡單的方法就是把並發任務看作是一個恰巧可以和調用者同時運行的函數:我們隻要傳遞參數、取回結果,保證該過程中沒有使用共享數據(沒有數據競爭)即可。

3、傳遞參數

一般來說,任務需要處理一些數據。我們可以通過參數傳遞數據(或者數據的指針或引用)。

void f(vector<double>& v); // 處理 v 的函數

struct F {                 // 處理 v 的函數對象
    vector<double>& v;
    F(vector<double>& vv) : v(vv) {}
    void operator()();
};

int main()
{
    vector<double> some_vec{1,2,3,4,5,6,7,8,9};
    vector<double> vec2{10,11,12,13,14};

    thread t1{f,ref(some_vec)}; // f(some_vec) 在另一個線程中執行
    thread t2{F{vec2}};         // F{vec2}() 在另一個線程中執行

    t1.join();
    t2.join();
}

F{vec2} 在 F 中保存瞭參數 vector 的引用。F 現在可以使用這個 vector。但願在 F 執行時,沒有其他任務訪問 vec2。如果通過值傳遞 vec2 則可以消除這個隱患。

t1 通過 {f,ref(some_vec)} 初始化,用到瞭 thread 的可變參數模板構造,可以接受任意序列的參數。ref() 是來自 <functional> 的類型函數。為瞭讓可變參數模板把 some_vec 當作一個引用而非對象,ref() 不能省略。編譯器檢查第一個參數可以通過其後面的參數調用,並構建必要的函數對象,傳遞給線程。如果 F::operator()() 和 f() 執行瞭相同的算法,兩個任務的處理幾乎是等同的:兩種情況下,都各自構建瞭一個函數對象,讓 thread 去執行。

可變參數模板需要用 ref()、cref() 傳遞引用

4、返回結果

3 的例子中,我傳瞭一個非 const 的引用。隻有在希望任務修改引用數據時我才這麼做。這是一種很常見的獲取返回結果的方式,但這麼做並不能清晰、明確地向他人傳達你的意圖。稍好一點的方式是通過 const 引用傳遞輸入數據,通過另外單獨的參數傳遞儲存結果的指針。

void f(const vector<double>& v, double *res); // 從 v 獲取輸入; 結果存入 *res

class F {
public:
    F(const vector<double>& vv, double *p) : v(vv), res(p) {}
    void operator()();  // 結果保存到 *res

private:
    const vector<double>& v;  // 輸入源
    double *p;                // 輸出地址
};

int main()
{
    vector<double> some_vec;
    vector<double> vec2;

    double res1;
    double res2;

    thread t1{f,cref(some_vec),&res1}; // f(some_vec,&res1) 在另一個線程中執行
    thread t2{F{vec2,&res2}};          // F{vec2,&res2}() 在另一個線程中執行

    t1.join();
    t2.join();
}

這麼做沒問題,也很常見。但我不覺得通過參數傳遞返回結果有多優雅,我會在 13.7.1 節再次討論這個話題。

通過參數(出參)傳遞結果並不優雅

5、共享數據

有時任務需要共享數據,這種情況下,對共享數據的訪問需要進行同步,同一時刻隻能有一個任務訪問數據(但是多任務同時讀取不變量是沒有問題的)。我們要考慮如何保證在同一時刻最多隻有一個任務能夠訪問一組對象。

解決這個問題需要通過 mutex(mutual exclusion object,互斥對象)。thread 通過 lock() 獲取 mutex

int shared_data;
mutex m;          // 用於控制 shared_data 的 mutex

void f()
{
    unique_lock<mutex> lck{m};  // 獲取 mutex
    shared_data += 7;           // 操作共享數據
}   // 離開 f() 作用域,隱式自動釋放 mutex

unique_lock 的構造函數通過調用 m.lock() 獲取 mutex。如果另一個線程已經獲取這個 mutex,當前線程等待(阻塞)直到另一個線程(通過 m.unlock( ) )釋放該 mutex。當 mutex 釋放,等待該 mutex 的線程恢復執行(喚醒)。互斥、鎖在 <mutex> 頭文件中。

共享數據和 mutex 之間的關聯需要自行約定:程序員需要知道哪個 mutex 對應哪個數據。這樣很容易出錯,但是我們可以通過一些方式使得他們之間的關聯更清晰明確:

class Record {
public:
    mutex rm;
};

不難猜到,對於一個 Record 對象 rec,在訪問 rec 其他數據之前,你應該先獲取 rec.rm。最好通過註釋或者良好的命名讓讀者清楚地知道 mutex 和數據的關聯。

有時執行某些操作需要同時訪問多個資源,有可能導致死鎖。例如,thread1 已經獲取瞭 mutex1,然後嘗試獲取 mutex2;與此同時,thread2 已經獲取 mutex2,嘗試獲取 mutex1。在這種情況下,兩個任務都無法進行下去。為解決這一問題,標準庫支持同時獲取多個鎖:

void f()
{
    unique_lock<mutex> lck1{m1,defer_lock};  // defer_lock:不立即獲取 mutex
    unique_lock<mutex> lck2{m2,defer_lock};
    unique_lock<mutex> lck3{m3,defer_lock};

    lock(lck1,lck2,lck3);                    // 嘗試獲取所有鎖
    // 操作共享數據
}   // 離開 f() 作用域,隱式自動釋放所有 mutexes

lock() 隻有在獲取參數裡所有的 mutex 之後才會繼續執行,並且在其持有 mutex 期間,不會阻塞(go to sleep)。每個 unique_lock 的析構會確保離開作用域時,自動釋放所有的 mutex

通過共享數據通信是相對底層的操作。編程人員要設計一套機制,弄清楚哪些任務完成瞭哪些工作,還有哪些未完成。從這個角度看, 使用共享數據不如直接調用函數、返回結果。另一方面,有些人認為共享數據比拷貝參數和返回值效率更高。這個觀點可能在涉及大量數據的時候成立,但是 locking unlocking 也是相對耗時的操作。不僅如此,現代計算機很擅長拷貝數據,尤其是像 vector 這種元素連續存儲的結構。所以,不要僅僅因為“效率”而選用共享數據進行通信,除非你真正實際測量過。

6、等待事件

有時線程需要等待外部事件,比如另一個線程完成瞭任務或者經過瞭一段時間。最簡單的事件是時間。借助 <chrono>,可以寫出:

using namespace std::chrono;

auto t0 = high_resolution_clock::now();
this_thread::sleep_for(milliseconds{20});
auto t1 = high_resolution_clock::now();

cout << duration_cast<nanoseconds>(t1-t0).count() << " nanoseconds passed\n";

註意,我甚至沒有啟動一個線程;默認情況下,this_thread 指當前唯一的線程。我用 duration_cast 把時間單位轉成瞭我想要的 nanoseconds

condition_variable 提供瞭對通過外部事件通信的支持,允許一個線程等待另一個線程,比如等待另一個線程(完成某個工作,然後)觸發一個事件/條件。

condition_variable 支持很多優雅、高效的共享形式,但也可能會很棘手。考慮一個經典的生產者-消費者例子,兩個線程通過一個隊列傳遞消息:

class Message { /**/ }; // 通信的對象

queue<Message> q;       // 消息隊列
condition_variable cv;  // 傳遞事件的變量
mutex m;                // locking 機制
queue、condition_variable 以及 mutex 由標準庫提供。

消費者讀取並處理 Message

void consumer()
{
    while(true){
        unique_lock<mutex> lck{m}; // 獲取 mutex m
        cv.wait(lck);              // 先釋放 lck,等待事件/條件喚醒
                                   // 喚醒時再次重新獲得 lck
        auto m = q.front();        // 從隊列中取出 Message m
        q.pop();
        lck.unlock();              // 後續處理消息不再操作隊列 q,提前釋放 lck
        // 處理 m
    }
}

這裡我顯式地用 unique_lock<mutex> 保護 queue condition_variable 上的操作。condition_variable 上的 cv.wait(lck) 會釋放參數中的鎖 lck,直到等待結束(隊列非空),然後再次獲取 lck。

相應的生產者代碼:

void producer()
{
    while(true) {
        Message m;
        // 填充 m
        unique_lock<mutex> lck{m}; // 保護操作
        q.push(m);
        cv.notify_one();           // 通知/喚醒等待中的 condition_variable
    } // 作用域結束自動釋放鎖
}

到目前為止,不論是 threadmutexlock 還是 condition_variable,都還是低層次的抽象。接下來我們馬上就能看到 C++ 對並發的高級抽象支持。

7、通信任務

標準庫還在頭文件 <future> 中提供瞭一些機制,能夠讓程序員在更高的任務的概念層次上工作,而不是直接使用低層的線程、鎖:

  • future promise:用於從另一個線程中返回一個值
  • packaged_task:幫助啟動任務,封裝瞭 future promise,並且建立兩者之間的關聯
  • async():像調用一個函數那樣啟動一個任務。形式最簡單,但也最強大!

到此這篇關於C++ 對多線程/並發的支持(上)的文章就介紹到這瞭,更多相關C++ 對多線程並發的支持內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: