1. 程式人生 > >【Qt開發】QThread中的互斥、讀寫鎖、訊號量、條件變數

【Qt開發】QThread中的互斥、讀寫鎖、訊號量、條件變數

在gemfield的《從pthread到QThread》一文中我們瞭解了執行緒的基本使用,但是有一大部分的內容當時說要放到這片文章裡討論,那就是執行緒的同步問題。關於這個問題,gemfield在《從進 程到執行緒》中有一個比喻,有必要重新放在下面溫習下:

*******************************
最後用一個比喻來總結下:
1、一個程序就好比一個房子裡有一個人;
2、clone建立執行緒就相當於在這個房子裡新增一個人;
3、fork建立程序就相當於再造一個房子,然後在新房子裡新增一個人;

有了上面的比喻後,我們就清楚很多了:
1、執行緒之間有很多資源可以共享:比如廚房資源、洗手間資源、熱水器資源等;
2、而對於程序來說,一個概念就是程序間通訊(你要和另外一個房子裡的人通訊要比一個房子裡的兩個人之間通訊複雜);
3、執行緒之間因為共享記憶體,所以通過一個全域性的變數就可以交換資料了;
4、但與此同時,對於執行緒來說,又有新的概念產生了:
a、一個人使用洗手間的時候,得鎖上以防止另一個人對洗手間的訪問;
b、一個人(或幾個人)睡覺的時候,另外一個人可以按照之前約定的方式來叫醒他;
c、熱水器的電源要一直開著,直到想洗澡的人數減為0;

上面的概念,在gemfield的後文中術語化的時候,你就不會再覺得很深奧或者枯燥了。
********************************
對於上面的a:一個人使用洗手間的時候,得鎖上以防止另一個人對洗手間的訪問。我們在QThread裡使用的就是QMutext這個互斥了。mutex是mutual exclusion(互相排斥)的簡寫。在pthread中也有pthread_mutex_*族,但是在QThread中我們能在Qt的框架下通過原始碼看到具體實現,所以pthread_mutex_*就靠你自行研究了。

第一部分、QMutex的研究

1、來一小段程式碼背景:
************************
int number = 6;

void gemfield1()
{
number *= 5;
number /= 4;
}

void gemfield2()
{
number *= 3;
number /= 2;
}
**************************
如果下面的程式碼是順序執行的,則會有下面這樣的輸出邏輯:
**************************
// gemfield1()
number *= 5; // number 為 30
number /= 4; // number 為 7

// gemfield2()
number *= 3; // number 為 21
number /= 2; // number 為 10
**************************
但如果是在2個執行緒中(執行緒1、執行緒2)分別同時呼叫了gemfield1()、gemfield2()呢?
**************************
// 執行緒1呼叫gemfield1()
number *= 5; // number 為30

// 執行緒2 呼叫 gemfield2().
// 執行緒1 被系統排程出去了,而把執行緒2排程進來執行
number *= 3; // number 為90
number /= 2; // number 為45

// 執行緒1 結束執行
number /= 4; // number 為11, 而不是上面的10
**************************

2、如何解決這個問題?

很明顯我們想要一個執行緒(比如執行緒1)在訪問變數number的時候,除非該執行緒(比如執行緒1)允許,否則其他執行緒(比如執行緒2)不能訪問number;這就好比一個人訪問洗手間,另一個人就無法訪問一樣(我們把對number的訪問區域,或者洗手間這個區域稱作臨界區域);下面就是QMutex的使用:
***************************
QMutex mutex;
int number = 6;

void gemfield1()
{
mutex.lock();
number *= 5;
number /= 4;
mutex.unlock();
}

void gemfield2()
{
mutex.lock();
number *= 3;
number /= 2;
mutex.unlock();
}
****************************
當mutex這個互斥lock上之後,直到unlock之前,都只有1個執行緒訪問number;注意:mutex變數和number一樣是全域性變數!

在QMutex的使用中,我們關注以下4個方法和2個屬性:

1、QMutex ()//構造1個mutex
2、lock ()//鎖
3、tryLock ()//嘗試著鎖
4、unlock ()//釋放鎖

另外兩個屬性是:遞迴和非遞迴。如果這個mutex是遞迴的話,表明它可以被一個執行緒鎖多次,也就是鎖和解鎖中再巢狀鎖和解鎖;非遞迴的話,就表明mutex只能被鎖一次。

這四個的用法已經在上面的程式碼中展示過了,現在來看看QMutex是怎麼做到這一點的?

3、QMutex是如何做到保護臨界區域的?

設想一下我們的洗手間問題:洗手間提供了什麼機制,讓一個人在使用的時候,另一個人無法闖入?門鎖!現在開始我們的QMutex之旅:

a、首先得構造出一個QMutex物件吧,要了解這一點,我們得先了解下QMutex的型別層次及成員。

class QBasicMutex
{
public:
inline void lock() {
if (!fastTryLock())
lockInternal();
}

inline void unlock() {
Q_ASSERT(d_ptr.load()); //mutex must be locked
if (!d_ptr.testAndSetRelease(dummyLocked(), 0))
unlockInternal();
}

bool tryLock(int timeout = 0) {
return fastTryLock() || lockInternal(timeout);
}

private:
inline bool fastTryLock() {
return d_ptr.testAndSetAcquire(0, dummyLocked());
}
bool lockInternal(int timeout = -1);
void unlockInternal();
QBasicAtomicPointer<QMutexData> d_ptr;

static inline QMutexData *dummyLocked() {
return reinterpret_cast<QMutexData *>(quintptr(1));
}

friend class QMutex;
friend class QMutexData;
};
————————————————–
class QMutex : public QBasicMutex {
public:
enum RecursionMode { NonRecursive, Recursive };
explicit QMutex(RecursionMode mode = NonRecursive);
};

————————————————–
class QMutexData
{
public:
bool recursive;
QMutexData(QMutex::RecursionMode mode = QMutex::NonRecursive)
: recursive(mode == QMutex::Recursive) {}
};

————————————————–
class QMutexPrivate : public QMutexData {
public:
QMutexPrivate();
bool wait(int timeout = -1);
void wakeUp();
// Conrol the lifetime of the privates
QAtomicInt refCount;
int id;

bool ref() {
Q_ASSERT(refCount.load() >= 0);
int c;
do {
c = refCount.load();
if (c == 0)
return false;
} while (!refCount.testAndSetRelaxed(c, c + 1));
Q_ASSERT(refCount.load() >= 0);
return true;
}
void deref() {
Q_ASSERT(refCount.load() >= 0);
if (!refCount.deref())
release();
Q_ASSERT(refCount.load() >= 0);
}
void release();
static QMutexPrivate *allocate();
QAtomicInt waiters; //number of thread waiting
QAtomicInt possiblyUnlocked; //bool saying that a timed wait timed out
enum { BigNumber = 0×100000 }; //Must be bigger than the possible number of waiters (number of threads)
void derefWaiters(int value);
bool wakeup;
pthread_mutex_t mutex;
pthread_cond_t cond;
};
———————————-
QMutex的類層次上面已經展現了,我們來看看怎麼構造一個QMutex物件吧:

QMutex::QMutex(RecursionMode mode)
{
d_ptr.store(mode == Recursive ? new QRecursiveMutexPrivate : 0);
}
其中的d_ptr是在QBasicMutex中定義的:

QBasicAtomicPointer<QMutexData> d_ptr;
根據QMutex構造時的引數,將QMutexData中的recursive成員賦值:預設是0,也就是QMutex::NonRecursive。

b、使用lock(),那麼lock()又是怎麼實現的呢?

從上面的型別層次可以看出,這個介面是QBasicMutex類實現的,如下:

inline void lock() {
if (!fastTryLock())
lockInternal();
}

也就是說,必須是fastTryLock()返回值為0才有實際的動作,那fastTryLock()又是什麼呢?

inline bool fastTryLock() {
return d_ptr.testAndSetAcquire(0, dummyLocked());
}

testAndSetAcquire()又是什麼呢?
****************************************************************************************
原型:bool testAndSetAcquire(T *expectedValue, T *newValue);

對於x86平臺來說,實現在arch\qatomic_i386.h中:

template <typename T>
Q_INLINE_TEMPLATE bool QBasicAtomicPointer<T>::testAndSetAcquire(T *expectedValue, T *newValue)
{
return testAndSetOrdered(expectedValue, newValue);
}

testAndSetOrdered(expectedValue, newValue)又是怎樣實現的?根平臺和編譯器相關,對於gemfield本文來說,就是 Linux上的GCC編譯器,那麼:

template <typename T>
Q_INLINE_TEMPLATE bool QBasicAtomicPointer<T>::testAndSetOrdered(T *expectedValue, T *newValue)
{
unsigned char ret;
asm volatile(“lock\n”
“cmpxchgl %3,%2\n”
“sete %1\n”
: “=a” (newValue), “=qm” (ret), “+m” (_q_value)
: “r” (newValue), “0″ (expectedValue)
: “memory”);
return ret != 0;
}

**************************************************************************************
d_ptr.testAndSetAcquire(0, dummyLocked());的含義就是判斷d_ptr當前的值是不是0,如果是0的話,則將dummyLocked()的值賦給d_ptr,並返回真值;否則什麼都不做,並返回false。

static inline QMutexData *dummyLocked() {
return reinterpret_cast<QMutexData *>(quintptr(1));
}

對不起了,各位,我剛洗了個澡回來。我發現照這樣寫下去本文就寫不完了。我決定把本文介紹的內容的底層實現部分放在《Qt的原子操作》一文之後,本文從簡介紹下互斥、讀寫鎖、條件變數、訊號量這些概念及用法。所以,上面紅顏色裝飾的內容就先不要看了。

第二部分、QMutexLocker的誕生

QMutexLocker相當於QMutex的簡化,提供了簡化了的互斥上的操作(也即簡化了的加鎖和解鎖)。

QMutex實現的互斥功能用的不是挺好的嗎?怎麼又出現了一個QMutexLocker?其實不然,觀察下面的這個程式碼:
****************************************************************
int complexFunction(int flag)
{
mutex.lock();
int retVal = 0;

switch (flag) {
case 0:
case 1:
mutex.unlock();
return moreComplexFunction(flag);
case 2:
{
int status = anotherFunction();
if (status < 0) {
mutex.unlock();
return -2;
}
retVal = status + flag;
}
break;
default:
if (flag > 10) {
mutex.unlock();
return -1;
}
break;
}

mutex.unlock();
return retVal;
}
*******************************************************************
上面的程式碼真實的揭露了QMutex的無力,因為只要有mutex.lock(),必然要有mutex.unlock(),否則臨界區裡的資源將再不能被訪問;而上面的程式碼並不能保證QMutex的物件一定會unlock(程式碼可能從某個地方就走了,再不回來了)。這個時候QMutexLocker就發揮用處了,因為QMutexLocker一定是以函式內部的區域性變數的形式出現的,當它的作用域結束的時候,這個互斥就自然unlock了。程式碼如下:
*******************************************************************
int complexFunction(int flag)
{
QMutexLocker locker(&mutex);//定義的時候就上鎖了

int retVal = 0;

switch (flag) {
case 0:
case 1:
return moreComplexFunction(flag);
case 2:
{
int status = anotherFunction();
if (status < 0)
return -2;
retVal = status + flag;
}
break;
default:
if (flag > 10)
return -1;
break;
}

return retVal;//超出函式的作用域就解鎖了
}
******************************************************************

第三部分:QReadWriteLock的作用

雖然互斥的功能保證了臨界區資源的安全,但是在某些方面並不符合實際;比如一般情況下,資源可以被併發讀!舉個實際的例子:有一本書(比如CivilNet BOOK),當某個人讀到一頁時,另外一個人(或者多個人)也可以過來讀;但是,當1個人往上面寫筆記時,其他人不能一起寫,而且只有這個人把筆記寫完了,再讓大家一起看。

QReadWriteLock的作用就是保證各個執行緒能併發的讀某個資源,但是要寫的話,就得真的lock了(所以,QReadWriteLock適合大量併發讀,偶爾會有寫的情況);程式碼如下:

*************************************************************
QReadWriteLock lock;

void ReaderThread::run()

{

lock.lockForRead();
read_file();
lock.unlock();

}

void WriterThread::run()
{

lock.lockForWrite();
write_file();
lock.unlock();

}
**************************************************************
特別的,對於lock這個全域性鎖來說:

1、只要有任意一個執行緒lock.lockForWrite()之後,所有之後的lock.lockForRead()都將會被阻塞;
2、只要有任意一個執行緒的lock.lockForWrite()動作還在被阻塞著的話,所有之後的lock.lockForRead()都會失敗;
3、如果在被阻塞的佇列中既有lock.lockForWrite()又有lock.lockForRead(),那麼write的優先順序比read高,下一個執行的將會是lock.lockForWrite()。

大多數情況下,QReadWriteLock都是QMutex的直接競爭者.和QMutex類似,QReadWriteLock也提供了它的簡化類來應付複雜的加鎖解鎖(也是通過函式作用域的手段),程式碼如下:
****************************************************************
QReadWriteLock lock;

QByteArray readData()
{
QReadLocker locker(&lock);

return data;
}

void writeData(const QByteArray &data)
{
QWriteLocker locker(&lock);

}
*************************************************************

第四部分:QSemaphore 提供了QMutex的通用情況

反過來,QMutex是QSemaphore的特殊情況,QMutex只能被lock一次,而QSemaphore卻可以獲得多次;當然了,那是因為Semaphores要保護的資源和mutex保護的不是一類;Semaphores保護的一 般是一堆相同的資源; 比如:
1、mutex保護的像是洗手間這樣的,只能供1人使用的資源(不是公共場所的洗手間);
2、Semaphores保護的是像停車場、餐館這樣有很多位子資源的場所;

Semaphores 使用兩個基本操作acquire() 和 release():

比如對於一個停車場來說,一般會在停車場的入口用個LED指示牌來指示已使用車位、可用車位等;你要泊車進去,那就要acquire(1)了,這樣available()就會減一;如果你開車離開停車場 ,那麼就要release(1)了,同時available()就會加一。

讓gemfield用程式碼來演示一個環形緩衝區和其上的訊號量(生產-消費模型):

const int DataSize = 1000;//這個店一共要向這個環形桌上供應1000份涮肉
const int BufferSize = 100;//環形自助餐桌上可以最多容納下100份涮肉
char buffer[BufferSize];//buffer就是這個環形自助餐桌了

QSemaphore freePlace(BufferSize);//freeBytes訊號量控制的是沒有放涮肉盤子的區域,初始化值是100,很明顯,程式剛開始的時候桌子還是空的;
QSemaphore usedPlace;//usedPlace 控制的是已經被使用的位置,也很明顯,程式剛開始的時候,還沒開始吃呢。

好了,對於飯店配羊肉的服務員來說,
class Producer : public QThread

public:
void run();//重新實施run虛擬函式
};

void Producer::run()
{
for (int i = 0; i < DataSize; ++i) {
freePlace.acquire();//空白位置減一
buffer[i % BufferSize] = “M”;//放肉(麥當勞  )
usedPlace.release();//已使用的位置加一
}
}

服務員(producer)要生產1000份涮肉( DataSize), 當他要把生產好的一份涮肉往環形桌子上放之前,必須使用freePlace訊號量從環形桌上獲得一個空地方(一共就100個)。 如果消費 者吃的節奏沒跟的上的話,QSemaphore::acquire() 呼叫可能會被阻塞。

最後,服務員使用usedPlace訊號量來釋放一個名額。一個“空的位置”被成功的轉變為“已被佔用的位置”,而這個位置消費者正準備吃。

對於食客(消費者)來說:

class Consumer : public QThread
{
public:
void run();//重新實施run虛擬函式
};

void Consumer::run()
{
//消費者一共要吃1000份涮肉
for (int i = 0; i < DataSize; ++i) {
usedPlace.acquire();//如果還沒有位置被使用(表明沒有放肉),阻塞
eat(buffer[i % BufferSize]);
freePlace.release();//吃完後,空白位置可以加一了
}
leaveCanting();
}

在main函式中,gemfield建立了2個執行緒,並且通過QThread::wait()來確保在程式退出之前,執行緒都已經執行完了(也即完成了各自的1000次for迴圈)

int main(int argc, char *argv[])
{
QCoreApplication app(argc, argv);
Producer producer;
Consumer consumer;
producer.start();
consumer.start();
producer.wait();
consumer.wait();
return 0;
}

程式是怎麼執行的呢?

初始的時候,只有服務員執行緒可以做任何事; 消費者執行緒被阻塞了——等待著usedPlace訊號量被釋放(available()初始值是0);當服務員把第一份涮肉放到桌子上的時候,
freePlace.available() 的值就變為了BufferSize – 1, 並且usedPlace.available() is 1.這時,兩個執行緒都可以工作了: 消費者可以吃這第一份涮肉,並且服務員再生產第二份涮肉;

在一個多處理器的機器上,這個程式將有可能達到基於mutex的程式的2倍快, 因為兩個執行緒可以同時工作在不同的緩衝區上。

第五部分: QWaitCondition,與QSemaphore的競爭

const int DataSize = 1000;//這個店一共要向這個環形桌上供應1000份涮肉
const int BufferSize = 100;//環形自助餐桌上可以最多容納下100份涮肉
char buffer[BufferSize];

QWaitCondition placeNotEmpty;//當有肉放上來,就發出這個訊號
QWaitCondition placeNotFull;//當消費者吃完一份涮肉後發出這個訊號
QMutex mutex;
int numUsedPlace = 0;//已經放了肉的位置數
為了同步服務員和消費者, 我們需要2個條件變數和1個mutex。變數解釋參考上面的程式碼註釋。讓我們看看服務員這個生產者的類:
************************************************
class Producer : public QThread
{
public:
void run();
};
void Producer::run()
{
for (int i = 0; i < DataSize; ++i) {
mutex.lock();
if (numUsedBytes == BufferSize)
placeNotFull.wait(&mutex);
mutex.unlock();

buffer[i % BufferSize] = “M”;(又是麥當勞)

mutex.lock();
++numUsedBytes;
placeNotEmpty.wakeAll();
mutex.unlock();
}
}
******************************************************

在服務員將肉放到環形桌上之前,先要檢查下桌子是不是放滿了。如果滿了,服務員就等待placeNotFull條件.

在最後,服務員將numUsedBytes自增1,並且發出bufferNotEmpty條件是真的這個訊號,因為numUsedBytes肯定大於0;

注意,QWaitCondition::wait() 函式使用一個mutex作為它的引數,這樣做的意義是:mutex剛開始是lock的,然後當這個執行緒因為placeNotFull.wait(&mutex);而休眠時,這個mutex就會被unlock,而當這個執行緒被喚醒時,mutex再次被加鎖。

另外, 從locked狀態到wait狀態是原子的,以此來防止競態條件的發生。

再來看看消費者類:
**************************************************
class Consumer : public QThread
{
public:
void run();
};

void Consumer::run()
{
for (int i = 0; i < DataSize; ++i) {
mutex.lock();
if (numUsedBytes == 0)
placeNotEmpty.wait(&mutex);
mutex.unlock();

eat(buffer[i % BufferSize]);

mutex.lock();
–numUsedBytes;
placeNotFull.wakeAll();
mutex.unlock();
}
leaveCanting();
}
***************************************************
程式碼和服務員的差不多。再來看看main函式:
***************************************************
int main(int argc, char *argv[])
{
QCoreApplication app(argc, argv);
Producer producer;
Consumer consumer;
producer.start();
consumer.start();
producer.wait();
consumer.wait();
return 0;
}
***************************************************
和訊號一節差不多,程式剛開始的時候,只有服務員執行緒可以做一些事;消費者被阻塞(等肉),直到“位置不為空”訊號發出。

在一個多處理器的機器上,這個程式將有可能達到基於mutex的程式的2倍快, 因為兩個執行緒可以同時工作在不同的緩衝區上。

其實,Qt的執行緒庫所包含的內容正是gemfield上一文《從pthread到QThread》中介紹的QThread類,以及QMutexLocker, QReadWriteLock, QSemaphore, QWaitCondition這些類,再外加一個atomic原子操作的內容(這時gemfield下一篇文章的內容哦);瞭解了這些,我們就可以更加自信的使用Qt的執行緒庫了。

備註:本文屬於gemfield的CivilNet部落格(http://syszux.com/blog)[Qt樂園]版塊,轉載此文時,請保證包括備註在內的本文的完整性。