C++線程池實現代碼
前言
這段時間看瞭《C++並發編程實戰》的基礎內容,想著利用最近學的知識自己實現一個簡單的線程池。
什麼是線程池
線程池(thread pool)是一種線程使用模式。線程過多或者頻繁創建和銷毀線程會帶來調度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著管理器分配可並發執行的任務。這避免瞭在處理短時間任務時創建與銷毀線程的代價,以及保證瞭線程的可復用性。線程池不僅能夠保證內核的充分利用,還能防止過分調度。
思路
個人對線程池的理解是:利用已經創建的固定數量的線程去執行指定的任務,從而避免線程重復創建和銷毀帶來的額外開銷。
C++11中,線程我們可以理解為對應一個thread對象,任務可以理解為要執行的函數,通常是耗時的函數。
我們的任務多少和順序並非固定的,因此需要有一個方法能添加指定的任務,任務存放的地方應該是一個任務隊列,因為我們的線程數量有限,當任務很多時同時執行的任務數量也有限,因此任務需要排隊,遵循先來後到的原則。
當要執行一個任務時,意味著先將這個任務從隊列取出,再執行相應任務,而“取出”動作的執行者是線程池中的線程,這意味我們的隊列需要考慮多個線程在同一隊列上執行“取出”操作的問題,實際上,取出任務操作和添加任務操作也不能同時進行,否則會產生競爭條件;另一方面,程序本身如果就是多線程的,多個線程同時添加任務的操作也應該是互斥的。
當沒有任務可以執行時,所有線程應該什麼也不做,當出現瞭一個任務時,應該將這個任務分配到任一線程中執行。實現上我們固然可以使用輪詢的方式判斷當前隊列是否有任務,有則取出(即使加瞭互斥鎖似乎也無法避免競爭條件?),但這樣會消耗無謂的CPU資源,寫輪詢周期難以選取。其實,我們可以使用condition_variable代替輪詢。
上述任務的創建和取出其實就是經典的生產者消費者模型。
我們將上面的內容都封裝在一個類中,取名ThreadPool,用戶可以在構造ThreadPool對象時指定線程池大小,之後可以隨時添加要執行的任務。
實現
class ThreadPool { public: ThreadPool(int n); ~ThreadPool(); void pushTask(packaged_task<void()> &&task); private: vector<thread*> threadPool; deque<packaged_task<void()>> taskQueue; void taskConsumer(); mutex taskMutex; condition_variable taskQueueCond; }; ThreadPool::ThreadPool(int n) { for (int i = 0; i < n; i++) { thread *t = new thread(&ThreadPool::taskConsumer,this); threadPool.push_back(t); t->detach(); } } ThreadPool::~ThreadPool() { while (!threadPool.empty()) { thread *t=threadPool.back(); threadPool.pop_back(); delete t; } } void ThreadPool::pushTask(packaged_task<void()> &&task) { { lock_guard<mutex> guard(taskMutex); taskQueue.push_back(std::move(task)); } taskQueueCond.notify_one(); } void ThreadPool::taskConsumer() { while (true) { unique_lock<mutex> lk(taskMutex); taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); }); packaged_task<void()> task=std::move(taskQueue.front()); taskQueue.pop_front(); lk.unlock(); task(); } }
這裡我使用packaged_task作為任務,每當添加一個任務,就調用condition_variable::notify_one方法,調用condition_variable::wait的線程就會被喚醒,並檢查等待條件。這裡有個小細節是notify_one在解鎖後執行,這樣避免線程喚醒後還要等待互斥鎖解鎖。
使用示例:
void Task1() { Sleep(1000); cout << "Task1"<<endl; } void Task5() { Sleep(5000); cout << "Task5" << endl; } class Worker { public: void run(); }; void Worker::run() { cout << "Worker::run start" << endl; Sleep(5000); cout << "Worker::run end" << endl; } int main() { ThreadPool pool(2); pool.pushTask(packaged_task<void()>(Task5)); pool.pushTask(packaged_task<void()>(Task1)); pool.pushTask(packaged_task<void()>(Task1)); Worker worker; pool.pushTask(packaged_task<void()>(bind(&Worker::run,&worker))); pool.pushTask(packaged_task<void()>([&](){worker.run();})); Sleep(20000); }
這個線程池目前有幾個缺點:
- 隻能傳入調用形式為void()形式的函數或可調用對象,不能返回任務執行的值,隻能通過其他方式同步任務執行結果(如果有)
- 傳入參數較為復雜,必須封裝一層packaged_task,調用對象方法時需要使用bind或者lambda表達式的方法封裝
以上缺點在當前版本的實現不予解決,日後另寫博文優化。
2021/12/29 更新之一:
事實上,我們隻要將packaged_task改為funtion模板類,就可以簡化我們的調用參數:
class ThreadPool { public: ThreadPool(int n); ~ThreadPool(); void pushTask(function<void()> task); private: vector<thread*> threadPool; deque<function<void()>> taskQueue; void taskConsumer(); mutex taskMutex; condition_variable taskQueueCond; }; ThreadPool::ThreadPool(int n) { for (int i = 0; i < n; i++) { thread *t = new thread(&ThreadPool::taskConsumer,this); threadPool.push_back(t); t->detach(); } } ThreadPool::~ThreadPool() { while (!threadPool.empty()) { thread *t=threadPool.back(); threadPool.pop_back(); delete t; } } void ThreadPool::pushTask(function<void()> task) { { lock_guard<mutex> guard(taskMutex); taskQueue.push_back(std::move(task)); } taskQueueCond.notify_one(); } void ThreadPool::taskConsumer() { while (true) { unique_lock<mutex> lk(taskMutex); taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); }); function<void()> task=taskQueue.front(); taskQueue.pop_front(); lk.unlock(); task(); } }
調用代碼改為如下:
ThreadPool pool(2);
pool.pushTask(&Task5);
pool.pushTask(&Task1);
pool.pushTask(&Task1);
Worker worker;
pool.pushTask((bind(&Worker::run, &worker)));
pool.pushTask([&](){worker.run(); });//1
Sleep(15000);
我們可以執行指定的函數,也可以將要執行的代碼放入lambda表達式的函數體中,正如1處所示,這樣就能在其他線程中執行指定的代碼瞭。
2021/12/29 更新之二:
我們發現,main最後都要調用sleep函數來避免主線程在線程任務完成之前就退出,因此我們希望添加一個接口,等待線程所有任務完成,改進如下,其他函數同前:
class ThreadPool { public: ThreadPool(int n); ~ThreadPool(); void pushTask(function<void()> task); void waitAllTask(); private: vector<thread*> threadPool; deque<function<void()>> taskQueue; atomic<int> busyCount; bool bStop; void taskConsumer(); mutex taskQueueMutex; condition_variable taskQueueCond; condition_variable taskFinishedCond; }; void ThreadPool::taskConsumer() { while (!bStop) { unique_lock<mutex> lk(taskQueueMutex); taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); }); busyCount++; function<void()> task=taskQueue.front(); taskQueue.pop_front(); lk.unlock(); task(); busyCount--; taskFinishedCond.notify_one(); } } void ThreadPool::waitAllTask() { unique_lock<mutex> lk(taskQueueMutex); taskFinishedCond.wait(lk, [&] {return taskQueue.empty() && busyCount==0; });//所有任務均已完成 }
這樣我們隻要調用waitAllTask就可以等待所有任務完成啦。
到此這篇關於C++線程池實現代碼的文章就介紹到這瞭,更多相關C++線程池內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- C++ 對多線程/並發的支持(上)
- C++單例模式實現線程池的示例代碼
- C++11各種鎖的具體使用
- C++11 condition_variable條件變量的用法說明
- C++ 多線程編程建議之 C++ 對多線程/並發的支持(下)