C++線程池實現代碼

前言

這段時間看瞭《C++並發編程實戰》的基礎內容,想著利用最近學的知識自己實現一個簡單的線程池。

什麼是線程池

線程池(thread pool)是一種線程使用模式。線程過多或者頻繁創建和銷毀線程會帶來調度開銷,進而影響緩存局部性和整體性能。而線程池維護著多個線程,等待著管理器分配可並發執行的任務。這避免瞭在處理短時間任務時創建與銷毀線程的代價,以及保證瞭線程的可復用性。線程池不僅能夠保證內核的充分利用,還能防止過分調度。

思路

個人對線程池的理解是:利用已經創建的固定數量的線程去執行指定的任務,從而避免線程重復創建和銷毀帶來的額外開銷。
C++11中,線程我們可以理解為對應一個thread對象,任務可以理解為要執行的函數,通常是耗時的函數。
我們的任務多少和順序並非固定的,因此需要有一個方法能添加指定的任務,任務存放的地方應該是一個任務隊列,因為我們的線程數量有限,當任務很多時同時執行的任務數量也有限,因此任務需要排隊,遵循先來後到的原則。
當要執行一個任務時,意味著先將這個任務從隊列取出,再執行相應任務,而“取出”動作的執行者是線程池中的線程,這意味我們的隊列需要考慮多個線程在同一隊列上執行“取出”操作的問題,實際上,取出任務操作和添加任務操作也不能同時進行,否則會產生競爭條件;另一方面,程序本身如果就是多線程的,多個線程同時添加任務的操作也應該是互斥的。
當沒有任務可以執行時,所有線程應該什麼也不做,當出現瞭一個任務時,應該將這個任務分配到任一線程中執行。實現上我們固然可以使用輪詢的方式判斷當前隊列是否有任務,有則取出(即使加瞭互斥鎖似乎也無法避免競爭條件?),但這樣會消耗無謂的CPU資源,寫輪詢周期難以選取。其實,我們可以使用condition_variable代替輪詢。
上述任務的創建和取出其實就是經典的生產者消費者模型。
我們將上面的內容都封裝在一個類中,取名ThreadPool,用戶可以在構造ThreadPool對象時指定線程池大小,之後可以隨時添加要執行的任務。

實現

class ThreadPool
{
public:
	ThreadPool(int n);
	~ThreadPool();

	void pushTask(packaged_task<void()> &&task);

private:
	vector<thread*> threadPool;
	deque<packaged_task<void()>> taskQueue;

	void taskConsumer();
	mutex taskMutex;
	condition_variable taskQueueCond;
};

ThreadPool::ThreadPool(int n)
{
	for (int i = 0; i < n; i++)
	{
		thread *t = new thread(&ThreadPool::taskConsumer,this);
		threadPool.push_back(t);
		t->detach();
	}
}

ThreadPool::~ThreadPool()
{
	while (!threadPool.empty())
	{
		thread *t=threadPool.back();
		threadPool.pop_back();
		delete t;
	}
}

void ThreadPool::pushTask(packaged_task<void()> &&task)
{
	{
		lock_guard<mutex> guard(taskMutex);
		taskQueue.push_back(std::move(task));
	}
	taskQueueCond.notify_one();
}

void ThreadPool::taskConsumer()
{
	while (true)
	{
		unique_lock<mutex> lk(taskMutex);
		taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); });
		packaged_task<void()> task=std::move(taskQueue.front());
		taskQueue.pop_front();
		lk.unlock();
		task();
	}
}

這裡我使用packaged_task作為任務,每當添加一個任務,就調用condition_variable::notify_one方法,調用condition_variable::wait的線程就會被喚醒,並檢查等待條件。這裡有個小細節是notify_one在解鎖後執行,這樣避免線程喚醒後還要等待互斥鎖解鎖。
使用示例:

void Task1()
{
	Sleep(1000);
	cout << "Task1"<<endl;
}

void Task5()
{
	Sleep(5000);
	cout << "Task5" << endl;
}

class Worker
{
public:
	void run();
};

void Worker::run()
{
	cout << "Worker::run start" << endl;
	Sleep(5000);
	cout << "Worker::run end" << endl;
}

int main()
{
	ThreadPool pool(2);
	pool.pushTask(packaged_task<void()>(Task5));
	pool.pushTask(packaged_task<void()>(Task1));
	pool.pushTask(packaged_task<void()>(Task1));
	Worker worker;
	pool.pushTask(packaged_task<void()>(bind(&Worker::run,&worker)));
	pool.pushTask(packaged_task<void()>([&](){worker.run();}));
	Sleep(20000);
}

這個線程池目前有幾個缺點:

  • 隻能傳入調用形式為void()形式的函數或可調用對象,不能返回任務執行的值,隻能通過其他方式同步任務執行結果(如果有)
  • 傳入參數較為復雜,必須封裝一層packaged_task,調用對象方法時需要使用bind或者lambda表達式的方法封裝

以上缺點在當前版本的實現不予解決,日後另寫博文優化。
2021/12/29 更新之一
事實上,我們隻要將packaged_task改為funtion模板類,就可以簡化我們的調用參數:

class ThreadPool
{
public:
	ThreadPool(int n);
	~ThreadPool();

	void pushTask(function<void()> task);

private:
	vector<thread*> threadPool;
	deque<function<void()>> taskQueue;

	void taskConsumer();
	mutex taskMutex;
	condition_variable taskQueueCond;
};

ThreadPool::ThreadPool(int n)
{
	for (int i = 0; i < n; i++)
	{
		thread *t = new thread(&ThreadPool::taskConsumer,this);
		threadPool.push_back(t);
		t->detach();
	}
}

ThreadPool::~ThreadPool()
{
	while (!threadPool.empty())
	{
		thread *t=threadPool.back();
		threadPool.pop_back();
		delete t;
	}
}

void ThreadPool::pushTask(function<void()> task)
{
	{
		lock_guard<mutex> guard(taskMutex);
		taskQueue.push_back(std::move(task));
	}
	taskQueueCond.notify_one();
}

void ThreadPool::taskConsumer()
{
	while (true)
	{
		unique_lock<mutex> lk(taskMutex);
		taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); });
		function<void()> task=taskQueue.front();
		taskQueue.pop_front();
		lk.unlock();
		task();
	}
}

調用代碼改為如下:

ThreadPool pool(2);

pool.pushTask(&Task5);

pool.pushTask(&Task1);

pool.pushTask(&Task1);

Worker worker;

pool.pushTask((bind(&Worker::run, &worker)));

pool.pushTask([&](){worker.run(); });//1

Sleep(15000);

我們可以執行指定的函數,也可以將要執行的代碼放入lambda表達式的函數體中,正如1處所示,這樣就能在其他線程中執行指定的代碼瞭。
2021/12/29 更新之二
我們發現,main最後都要調用sleep函數來避免主線程在線程任務完成之前就退出,因此我們希望添加一個接口,等待線程所有任務完成,改進如下,其他函數同前:

class ThreadPool
{
public:
	ThreadPool(int n);
	~ThreadPool();

	void pushTask(function<void()> task);
	void waitAllTask();

private:
	vector<thread*> threadPool;
	deque<function<void()>> taskQueue;
	atomic<int> busyCount;
	bool bStop;

	void taskConsumer();
	mutex taskQueueMutex;
	condition_variable taskQueueCond;
	condition_variable taskFinishedCond;
};

void ThreadPool::taskConsumer()
{
	while (!bStop)
	{
		unique_lock<mutex> lk(taskQueueMutex);
		taskQueueCond.wait(lk, [&] {return !taskQueue.empty(); });
		busyCount++;
		function<void()> task=taskQueue.front();
		taskQueue.pop_front();
		lk.unlock();
		task();
		busyCount--;
		taskFinishedCond.notify_one();
	}
}

void ThreadPool::waitAllTask()
{
	unique_lock<mutex> lk(taskQueueMutex);
	taskFinishedCond.wait(lk, [&] {return taskQueue.empty() && busyCount==0; });//所有任務均已完成
}

這樣我們隻要調用waitAllTask就可以等待所有任務完成啦。

到此這篇關於C++線程池實現代碼的文章就介紹到這瞭,更多相關C++線程池內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: