超詳細講解Linux C++多線程同步的方式
背景問題:在特定的應用場景下,多線程不進行同步會造成什麼問題?
通過多線程模擬多窗口售票為例:
#include <iostream> #include<pthread.h> #include<stdio.h> #include<stdlib.h> #include<string.h> #include<unistd.h> using namespace std; int ticket_sum=20; void *sell_ticket(void *arg) { for(int i=0; i<20; i++) { if(ticket_sum>0) { sleep(1); cout<<"sell the "<<20-ticket_sum+1<<"th"<<endl; ticket_sum--; } } return 0; } int main() { int flag; pthread_t tids[4]; for(int i=0; i<4; i++) { flag=pthread_create(&tids[i],NULL,&sell_ticket,NULL); if(flag) { cout<<"pthread create error ,flag="<<flag<<endl; return flag; } } sleep(20); void *ans; for(int i=0; i<4; i++) { flag=pthread_join(tids[i],&ans); if(flag) { cout<<"tid="<<tids[i]<<"join erro flag="<<flag<<endl; return flag; } cout<<"ans="<<ans<<endl; } return 0; }
分析:總票數隻有20張,卻賣出瞭23張,是非常明顯的超買超賣問題,而造成這個問題的根本原因就是同時發生的各個線程都可以對ticket_sum進行讀取和寫入!
ps:
1.在並發情況下,指令執行的先後順序由內核決定,同一個線程內部,指令按照先後順序執行,但不同線程之間的指令很難說清楚是哪一個先執行,如果運行的結果依賴於不同線程執行的先後的話,那麼就會形成競爭條件,在這樣的情況下,計算的結果很難預知,所以應該盡量避免競爭條件的形成
2.最常見的解決競爭條件的方法是將原先分離的兩個指令構成一個不可分割的原子操作,而其他任務不能插入到原子操作中!
3.對多線程來說,同步指的是在一定時間內隻允許某一個線程訪問某個資源,而在此時間內,不允許其他線程訪問該資源!
4.線程同步的常見方法:互斥鎖,條件變量,讀寫鎖,信號量
一.互斥鎖
本質就是一個特殊的全局變量,擁有lock和unlock兩種狀態,unlock的互斥鎖可以由某個線程獲得,一旦獲得,這個互斥鎖會鎖上變成lock狀態,此後隻有該線程由權力打開該鎖,其他線程想要獲得互斥鎖,必須得到互斥鎖再次被打開之後
采用互斥鎖來同步資源:
#include <iostream> #include<pthread.h> #include<stdio.h> #include<stdlib.h> #include<string.h> #include<unistd.h> using namespace std; int ticket_sum=20; pthread_mutex_t mutex_x=PTHREAD_MUTEX_INITIALIZER;//static init mutex void *sell_ticket(void *arg) { for(int i=0; i<20; i++) { pthread_mutex_lock(&mutex_x);//atomic opreation through mutex lock if(ticket_sum>0) { sleep(1); cout<<"sell the "<<20-ticket_sum+1<<"th"<<endl; ticket_sum--; } pthread_mutex_unlock(&mutex_x); } return 0; } int main() { int flag; pthread_t tids[4]; for(int i=0; i<4; i++) { flag=pthread_create(&tids[i],NULL,&sell_ticket,NULL); if(flag) { cout<<"pthread create error ,flag="<<flag<<endl; return flag; } } sleep(20); void *ans; for(int i=0; i<4; i++) { flag=pthread_join(tids[i],&ans); if(flag) { cout<<"tid="<<tids[i]<<"join erro flag="<<flag<<endl; return flag; } cout<<"ans="<<ans<<endl; } return 0; }
分析:通過為售票的核心代碼段加互斥鎖使得其變成瞭一個原子性操作!不會被其他線程影響
1.互斥鎖的初始化
互斥鎖的初始化分為靜態初始化和動態初始化
靜態:pthread_mutex_t mutex_x=PTHREAD_MUTEX_INITIALIZER;//static init mutex
動態:pthread_mutex_init函數
ps:互斥鎖靜態初始化和動態初始化的區別?
待補充。。。。
2.互斥鎖的相關屬性及分類
//初始化互斥鎖屬性 pthread_mutexattr_init(pthread_mutexattr_t attr); //銷毀互斥鎖屬性 pthread_mutexattr_destroy(pthread_mutexattr_t attr); //用於獲取互斥鎖屬性 int pthread_mutexattr_getpshared(const pthread_mutexattr_t *restrict attr , int *restrict pshared); //用於設置互斥鎖屬性 int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr , int pshared);
attr表示互斥鎖的屬性
pshared表示互斥鎖的共享屬性,由兩種取值:
1)PTHREAD_PROCESS_PRIVATE:鎖隻能用於一個進程內部的兩個線程進行互斥(默認情況)
2)PTHREAD_PROCESS_SHARED:鎖可用於兩個不同進程中的線程進行互斥,使用時還需要在進程共享內存中分配互斥鎖,然後為該互斥鎖指定屬性就可以瞭
互斥鎖的分類:
//獲取互斥鎖類型 int pthread_mutexattr_gettype(const pthread_mutexattr_t *restrict attr , int *restrict type); //設置互斥鎖類型 int pthread_mutexattr_settype(const pthread_mutexattr_t *restrict attr , int type);
參數type表示互斥鎖的類型,總共有以下四種類型:
1.PTHREAD_MUTEX_NOMAL:標準互斥鎖,第一次上鎖成功,第二次上鎖會失敗並阻塞
2.PTHREAD_MUTEX_RECURSIVE:遞歸互斥鎖,第一次上鎖成功,第二次上鎖還是會成功,可以理解為內部有一個計數器,每加一次鎖計數器加1,解鎖減1
3.PTHREAD_MUTEX_ERRORCHECK:檢查互斥鎖,第一次上鎖會成功,第二次上鎖出錯返回錯誤信息,不會阻塞
4.PTHREAD_MUTEX_DEFAULT:默認互斥鎖,第一次上鎖會成功,第二次上鎖會失敗
3.測試加鎖函數
int pthread_mutex_lock(&mutex):測試加鎖函數在鎖已經被占據時返回EBUSY而不是掛起等待,當然,如果鎖沒有被占領的話可以獲得鎖
為瞭清楚的看到兩個線程爭用資源的情況,我們使得其中一個函數使用測試加鎖函數進行加鎖,而另外一個使用正常的加鎖函數進行加鎖
#include <iostream> #include<pthread.h> #include<stdio.h> #include<stdlib.h> #include<string.h> #include<unistd.h> #include<errno.h> using namespace std; int ticket_sum=20; pthread_mutex_t mutex_x=PTHREAD_MUTEX_INITIALIZER;//static init mutex void *sell_ticket_1(void *arg) { for(int i=0; i<20; i++) { pthread_mutex_lock(&mutex_x); if(ticket_sum>0) { sleep(1); cout<<"thread_1 sell the "<<20-ticket_sum+1<<"th ticket"<<endl; ticket_sum--; } sleep(1); pthread_mutex_unlock(&mutex_x); sleep(1); } return 0; } void *sell_ticket_2(void *arg) { int flag; for(int i=0; i<10; i++) { flag=pthread_mutex_trylock(&mutex_x); if(flag==EBUSY) { cout<<"sell_ticket_2:the variable is locked by sell_ticket_1"<<endl; } else if(flag==0) { if(ticket_sum>0) { sleep(1); cout<<"thread_2 sell the "<<20-ticket_sum+1<<"th tickets"<<endl; ticket_sum--; } pthread_mutex_unlock(&mutex_x); } sleep(1); } return 0; } int main() { int flag; pthread_t tids[2]; flag=pthread_create(&tids[0],NULL,&sell_ticket_1,NULL); if(flag) { cout<<"pthread create error ,flag="<<flag<<endl; return flag; } flag=pthread_create(&tids[1],NULL,&sell_ticket_2,NULL); if(flag) { cout<<"pthread create error ,flag="<<flag<<endl; return flag; } void *ans; sleep(30); flag=pthread_join(tids[0],&ans); if(flag) { cout<<"tid="<<tids[0]<<"join erro flag="<<flag<<endl; return flag; } else { cout<<"ans="<<ans<<endl; } flag=pthread_join(tids[1],&ans); if(flag) { cout<<"tid="<<tids[1]<<"join erro flag="<<flag<<endl; return flag; } else { cout<<"ans="<<ans<<endl; } return 0; }
分析:通過測試加鎖函數我們可以清晰的看到兩個線程爭用資源的情況
二.條件變量
互斥量不是萬能的,比如某個線程正在等待共享數據內某個條件出現,可可能需要重復對數據對象加鎖和解鎖(輪詢),但是這樣輪詢非常耗費時間和資源,而且效率非常低,所以互斥鎖不太適合這種情況
我們需要這樣一種方法:當線程在等待滿足某些條件時使線程進入睡眠狀態,一旦條件滿足,就換線因等待滿足特定條件而睡眠的線程
如果我們能夠實現這樣一種方法,程序的效率無疑會大大提高,而這種方法正是條件變量!
樣例:
#include <iostream> #include<pthread.h> #include<stdio.h> #include<stdlib.h> #include<string.h> #include<unistd.h> #include<errno.h> using namespace std; pthread_cond_t qready=PTHREAD_COND_INITIALIZER; //cond pthread_mutex_t qlock=PTHREAD_MUTEX_INITIALIZER; //mutex int x=10,y=20; void *f1(void *arg) { cout<<"f1 start"<<endl; pthread_mutex_lock(&qlock); while(x<y) { pthread_cond_wait(&qready,&qlock); } pthread_mutex_unlock(&qlock); sleep(3); cout<<"f1 end"<<endl; return 0; } void *f2(void *arg) { cout<<"f2 start"<<endl; pthread_mutex_lock(&qlock); x=20; y=10; cout<<"has a change,x="<<x<<" y="<<y<<endl; pthread_mutex_unlock(&qlock); if(x>y) { pthread_cond_signal(&qready); } cout<<"f2 end"<<endl; return 0; } int main() { pthread_t tids[2]; int flag; flag=pthread_create(&tids[0],NULL,f1,NULL); if(flag) { cout<<"pthread 1 create error "<<endl; return flag; } sleep(2); flag=pthread_create(&tids[1],NULL,f2,NULL); if(flag) { cout<<"pthread 2 create erro "<<endl; return flag; } sleep(5); return 0; }
分析:線程1不滿足條件被阻塞,然後線程2運行,改變瞭條件,線程2發行條件改變瞭通知線程1運行,然後線程2結束,然後線程1繼續運行,然後線程1結束,為瞭確保線程1先執行,在創建線程2之前我們sleep瞭2秒
ps:
1.條件變量通過運行線程阻塞和等待另一個線程發送信號的方法彌補互斥鎖的不足,常常和互斥鎖一起使用,使用時,條件變量被用來阻塞一個線程,當條件不滿足時,線程往往解開響應的互斥鎖並等待條件發生變化,一旦其他的某個線程改變瞭條件變量,它將通知響應的條件變量換線一個或多個正被此條件變量阻塞的線程,這些線程將重新鎖定互斥鎖並且重新測試條件是否滿足
1.條件變量的相關函數
1)創建
靜態方式:pthread_cond_t cond PTHREAD_COND_INITIALIZER
動態方式:int pthread_cond_init(&cond,NULL)
Linux thread 實現的條件變量不支持屬性,所以NULL(cond_attr參數)
2)註銷
int pthread_cond_destory(&cond)
隻有沒有線程在該條件變量上,該條件變量才能註銷,否則返回EBUSY
因為Linux實現的條件變量沒有分配什麼資源,所以註銷動作隻包括檢查是否有等待線程!(請參考條件變量的底層實現)
3)等待
條件等待:int pthread_cond_wait(&cond,&mutex)
計時等待:int pthread_cond_timewait(&cond,&mutex,time)
1.其中計時等待如果在給定時刻前條件沒有被滿足,則返回ETIMEOUT,結束等待
2.無論那種等待方式,都必須有一個互斥鎖配合,以防止多個線程同時請求pthread_cond_wait形成競爭條件!
3.在調用pthread_cond_wait前必須由本線程加鎖
4)激發
激發一個等待線程:pthread_cond_signal(&cond)
激發所有等待線程:pthread_cond_broadcast(&cond)
重要的是,pthread_cond_signal不會存在驚群效應,也就是是它最多給一個等待線程發信號,不會給所有線程發信號喚醒提他們,然後要求他們自己去爭搶資源!
pthread_cond_signal會根據等待線程的優先級和等待時間來確定激發哪一個等待線程
下面看一個程序,找到程序存在的問題
#include <iostream> #include<pthread.h> #include<stdio.h> #include<stdlib.h> #include<string.h> #include<unistd.h> #include<errno.h> using namespace std; pthread_cond_t taxi_cond=PTHREAD_COND_INITIALIZER; //taix arrive cond pthread_mutex_t taxi_mutex=PTHREAD_MUTEX_INITIALIZER;// sync mutex void *traveler_arrive(void *name) { cout<<"Traveler:"<<(char*)name<<" needs a taxi now!"<<endl; pthread_mutex_lock(&taxi_mutex); pthread_cond_wait(&taxi_cond,&taxi_mutex); pthread_mutex_unlock(&taxi_mutex); cout<<"Traveler:"<<(char*)name<<" now got a taxi!"<<endl; pthread_exit((void*)0); } void *taxi_arrive(void *name) { cout<<"Taxi:"<<(char*)name<<" arriver."<<endl; pthread_cond_signal(&taxi_cond); pthread_exit((void*)0); } int main() { pthread_t tids[3]; int flag; flag=pthread_create(&tids[0],NULL,taxi_arrive,(void*)("Jack")); if(flag) { cout<<"pthread_create error:flag="<<flag<<endl; return flag; } cout<<"time passing by"<<endl; sleep(1); flag=pthread_create(&tids[1],NULL,traveler_arrive,(void*)("Susan")); if(flag) { cout<<"pthread_create error:flag="<<flag<<endl; return flag; } cout<<"time passing by"<<endl; sleep(1); flag=pthread_create(&tids[2],NULL,taxi_arrive,(void*)("Mike")); if(flag) { cout<<"pthread_create error:flag="<<flag<<endl; return flag; } cout<<"time passing by"<<endl; sleep(1); void *ans; for(int i=0; i<3; i++) { flag=pthread_join(tids[i],&ans); if(flag) { cout<<"pthread_join error:flag="<<flag<<endl; return flag; } cout<<"ans="<<ans<<endl; } return 0; }
分析:程序由一個條件變量,用於提示乘客有出租車到達,還有一個同步鎖,乘客到達之後就是等車(條件變量),出租車到達之後就是通知乘客,我們看到乘客Susan到達之後,並沒有乘坐先到的Jack的車,而是等到Mike的車到瞭之後再乘坐Mike的車,Jack的車白白的閑置瞭,為什麼會造成這種原因呢?分析一下代碼:我們發現Jack出租車到達之後調用pthread_cond_signal(&taxi_cond)發現沒有乘客,然後就直接結束線程瞭。。。。
正確的操作應該是:先到的Jack發現沒有乘客,然後一直等待乘客,有乘客到瞭就直接走,而且我們應該統計一下乘客的數量
做如下改進:
1.增加乘客計數器,使得出租車在有乘客到達之後可以直接走,而不是又在原地等待別的乘客(僵死線程)
2.出租車到達函數加個while循環,沒有乘客的時候一直等待,直到乘客到來
#include <iostream> #include<pthread.h> #include<stdio.h> #include<stdlib.h> #include<string.h> #include<unistd.h> #include<errno.h> using namespace std; pthread_cond_t taxi_cond=PTHREAD_COND_INITIALIZER; //taix arrive cond pthread_mutex_t taxi_mutex=PTHREAD_MUTEX_INITIALIZER;// sync mutex void *traveler_arrive(void *name) { cout<<"Traveler:"<<(char*)name<<" needs a taxi now!"<<endl; pthread_mutex_lock(&taxi_mutex); pthread_cond_wait(&taxi_cond,&taxi_mutex); pthread_mutex_unlock(&taxi_mutex); cout<<"Traveler:"<<(char*)name<<" now got a taxi!"<<endl; pthread_exit((void*)0); } void *taxi_arrive(void *name) { cout<<"Taxi:"<<(char*)name<<" arriver."<<endl; pthread_exit((void*)0); } int main() { pthread_t tids[3]; int flag; flag=pthread_create(&tids[0],NULL,taxi_arrive,(void*)("Jack")); if(flag) { cout<<"pthread_create error:flag="<<flag<<endl; return flag; } cout<<"time passing by"<<endl; sleep(1); flag=pthread_create(&tids[1],NULL,traveler_arrive,(void*)("Susan")); if(flag) { cout<<"pthread_create error:flag="<<flag<<endl; return flag; } cout<<"time passing by"<<endl; sleep(1); flag=pthread_create(&tids[2],NULL,taxi_arrive,(void*)("Mike")); if(flag) { cout<<"pthread_create error:flag="<<flag<<endl; return flag; } cout<<"time passing by"<<endl; sleep(1); void *ans; for(int i=0; i<3; i++) { flag=pthread_join(tids[i],&ans); if(flag) { cout<<"pthread_join error:flag="<<flag<<endl; return flag; } cout<<"ans="<<ans<<endl; } return 0; }
三.讀寫鎖
可以多個線程同時讀,但是不能多個線程同時寫
1.讀寫鎖比互斥鎖更加具有適用性和並行性
2.讀寫鎖最適用於對數據結構的讀操作讀操作次數多餘寫操作次數的場合!
3.鎖處於讀模式時可以線程共享,而鎖處於寫模式時隻能獨占,所以讀寫鎖又叫做共享-獨占鎖
4.讀寫鎖有兩種策略:強讀同步和強寫同步
在強讀同步中,總是給讀者更高的優先權,隻要寫者沒有進行寫操作,讀者就可以獲得訪問權限
在強寫同步中,總是給寫者更高的優先權,讀者隻能等到所有正在等待或者執行的寫者完成後才能進行讀
不同的系統采用不同的策略,比如航班訂票系統使用強寫同步,圖書館查閱系統采用強讀同步
根據不同的業務場景,采用不同的策略
1)初始化的銷毀讀寫鎖
靜態初始化:pthread_rwlock_t rwlock=PTHREAD_RWLOCK_INITIALIZER
動態初始化:int pthread_rwlock_init(rwlock,NULL),NULL代表讀寫鎖采用默認屬性
銷毀讀寫鎖:int pthread_rwlock_destory(rwlock)
在釋放某個讀寫鎖的資源之前,需要先通過pthread_rwlock_destory函數對讀寫鎖進行清理。釋放由pthread_rwlock_init函數分配的資源
如果你想要讀寫鎖使用非默認屬性,則attr不能為NULL,得給attr賦值
int pthread_rwlockattr_init(attr),給attr初始化
int pthread_rwlockattr_destory(attr),銷毀attr
2)以寫的方式獲取鎖,以讀的方式獲取鎖,釋放讀寫鎖
int pthread_rwlock_rdlock(rwlock),以讀的方式獲取鎖
int pthread_rwlock_wrlock(rwlock),以寫的方式獲取鎖
int pthread_rwlock_unlock(rwlock),釋放鎖
上面兩個獲取鎖的方式都是阻塞的函數,也就是說獲取不到鎖的話,調用線程不是立即返回,而是阻塞執行,在需要進行寫操作的時候,這種阻塞式獲取鎖的方式是非常不好的,你想一下,我需要進行寫操作,不但沒有獲取到鎖,我還一直在這裡等待,大大拖累效率
所以我們應該采用非阻塞的方式獲取鎖:
int pthread_rwlock_tryrdlock(rwlock)
int pthread_rwlock_trywrlock(rwlock)
讀寫鎖的樣例:
#include <iostream> #include<pthread.h> #include<stdio.h> #include<stdlib.h> #include<string.h> #include<unistd.h> #include<errno.h> using namespace std; int num=5; pthread_rwlock_t rwlock; void *reader(void *arg) { pthread_rwlock_rdlock(&rwlock); cout<<"reader "<<(long)arg<<" got the lock"<<endl; pthread_rwlock_unlock(&rwlock); return 0; } void *writer(void *arg) { pthread_rwlock_wrlock(&rwlock); cout<<"writer "<<(long)arg<<" got the lock"<<endl; pthread_rwlock_unlock(&rwlock); return 0; } int main() { int flag; long n=1,m=1; pthread_t wid,rid; pthread_attr_t attr; flag=pthread_rwlock_init(&rwlock,NULL); if(flag) { cout<<"rwlock init error"<<endl; return flag; } pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);//thread sepatate for(int i=0;i<num;i++) { if(i%3) { pthread_create(&rid,&attr,reader,(void *)n); cout<<"create reader "<<n<<endl; n++; }else { pthread_create(&wid,&attr,writer,(void *)m); cout<<"create writer "<<m<<endl; m++; } } sleep(5);//wait other done return 0; }
分析:3個讀線程,2個寫線程,讀線程比寫線程多
當讀寫鎖是寫狀態時,在鎖被解鎖之前,所有試圖對這個鎖加鎖的線程都會被阻塞
當讀寫鎖是讀狀態時,在鎖被解鎖之前,所有視圖以讀模式對它進行加鎖的線程都可以得到訪問權,但是以寫模式對它進行加鎖的線程會被阻塞
所以讀寫鎖默認是強讀模式!
四.信號量
信號量(sem)和互斥鎖的區別:互斥鎖隻允許一個線程進入臨界區,而信號量允許多個線程進入臨界區
1)信號量初始化
int sem_init(&sem,pshared,v)
pshared為0表示這個信號量是當前進程的局部信號量
pshared為1表示這個信號量可以在多個進程之間共享
v為信號量的初始值
成功返回0,失敗返回-1
2)信號量值的加減
int sem_wait(&sem):以原子操作的方式將信號量的值減去1
int sem_post(&sem):以原子操作的方式將信號量的值加上1
3)對信號量進行清理
int sem_destory(&sem)
通過信號量模擬2個窗口,10個客人進行服務的過程
樣例:
#include <iostream> #include<pthread.h> #include<stdio.h> #include<stdlib.h> #include<string.h> #include<unistd.h> #include<errno.h> #include<semaphore.h> using namespace std; int num=10; sem_t sem; void *get_service(void *cid) { int id=*((int*)cid); if(sem_wait(&sem)==0) { sleep(5); cout<<"customer "<<id<<" get the service"<<endl; cout<<"customer "<<id<<" done "<<endl; sem_post(&sem); } return 0; } int main() { sem_init(&sem,0,2); pthread_t customer[num]; int flag; for(int i=0;i<num;i++) { int id=i; flag=pthread_create(&customer[i],NULL,get_service,&id); if(flag) { cout<<"pthread create error"<<endl; return flag; }else { cout<<"customer "<<i<<" arrived "<<endl; } sleep(1); } //wait all thread done for(int j=0;j<num;j++) { pthread_join(customer[j],NULL); } sem_destroy(&sem); return 0; }
分析:信號量的值代表空閑的服務窗口,每個窗口一次隻能服務一個人,有空閑窗口,開始服務前,信號量-1,服務完成後信號量+1
總結完畢:Linux c++線程同步的四種方式:互斥鎖,條件變量,讀寫鎖,信號量
到此這篇關於超詳細講解Linux C++多線程同步的方式的文章就介紹到這瞭,更多相關Linux C++多線程同步內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!