C11線程管理:條件變量
1、簡介
C11提供另外一種用於等待的同步機制,它可以阻塞一個或者多個線程,直到收到另外一個線程發出的通知或者超時,才會喚醒當前阻塞的線程。條件變量要和互斥量配合起來使用。
condition_variable,配合std::unique_lock<std::mutex>進行wait操作。
condition_variable_any,和任意帶有lock、unlock語意 的mutex搭配使用,比較靈活,但是效率比condition_variable低。
條件變量的使用過程如下:
a.擁有條件變量的線程獲取互斥量。
b.循環檢查某個條件,如果條件不滿足,則阻塞線程直到滿足;如果條件滿足,則向下執行。
c.某個線程滿足條件並執行完成之後,調用notify_one或者notify_all來喚醒一個或者多個線程。
2、實踐
可以用條件變量來實現一個同步隊列,同步隊列作為一個線程安全的數據共享區,經常用於線程之間的讀取,比如半同步半異步線程池的同步隊列。
#include <iostream> #include <chrono> #include <thread> #include <mutex> #include<condition_variable> #include <list> template<typename T> class SyncQueue { public: SyncQueue(int maxSize) :m_maxSize(maxSize){} void Put(const T & t) { std::lock_guard<std::mutex> locker(m_mutex); while (IsFull()) { std::cout << "緩沖區滿了,需要等待..." << std::endl; m_notFull.wait(m_mutex); } m_queue.push_back(t); m_notEmpty.notify_one(); }void Take(const T & t) { std::lock_guard<std::mutex> locker(m_mutex); while (IsEmpty()) { std::cout << "緩沖區空了,需要等待..." << std::endl; m_notEmpty.wait(m_mutex); } t = m_queue.front(); m_queue.pop_front(t); m_notFull.notify_one(); } bool Empty() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.empty(); } bool Full() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.size() == m_maxSize; } size_t Size() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.size(); } private: bool IsFull() const { return m_queue.size() == m_maxSize; } bool IsEmpty() const { return m_queue.empty(); } private: std::list<T> m_queue; //緩沖區 std::mutex m_mutex; //互斥量 std::condition_variable_any m_notEmpty; //不為空的條件變量 std::condition_variable_any m_notFull; //沒有滿的條件變量 int m_maxSize; //同步隊列最大容量 };
這個隊列中,沒有滿的情況下可以插入數據,如果滿了,則會調用m_notFull阻塞線程等待,等待消費線程取出數據之後發出一個未滿的通知,然後前面阻塞的線程會被喚醒繼續往下執行;如果隊列為空,不能取出數據,調用m_notEmpty來阻塞當前線程,等待插入數據的線程插入數據發出不為空的通知,喚醒被阻塞的線程,往下執行讀出數據。
條件變量的wait方法還有個重載方法,可以接受一個條件。
std::lock_guard<std::mutex> locker(m_mutex); while (IsFull()) { std::cout << "緩沖區滿了,需要等待..." << std::endl; m_notFull.wait(m_mutex); }
可以寫為這樣:
std::lock_guard<std::mutex> locker(m_mutex); m_notFull.wait(locker, [this]{ return !IsFull();});
兩種寫法都一樣,後者代碼更加簡潔,條件變量先檢查判斷式是否滿足條件,如果滿足,重新獲取mutex,結束wait,繼續往下執行;如果不滿足條件,則釋放mutex,將線程置為waiting狀態,繼續等待。
需要註意的是,wait函數會釋放掉mutex,而lock_guard還擁有mutex,他只在出了作用域之後才會釋放掉mutex,所以這時並不會釋放,但是執行wait會提前釋放,而在wait提前釋放掉鎖之後,會處於等待狀態,在notify_one/all喚醒之後,會先獲取mutex,相當於之前的mutex又獲取到了,所以在出作用域的時候,lock_guard釋放鎖不會產生問題。
在這種情況下,如果用unique_lock語意更準確,因為unique_lock不像lock_guard一樣只能在析構的時候才能釋放鎖,它可以隨時釋放鎖,在wait的時候讓uniq_lock釋放鎖,語意更加準確。
上述例子中,可以用unique_lock來替換掉lock_guard,condition_variable來替換掉condition_variable_any,會使代碼更加清晰,效率也更高。
3、超時等待
除了wait還可以使用超時等待函數std::condition_variable::wait_for和std::condition_variable::wait_until。
與 std::condition_variable::wait() 類似,不過 wait_for 可以指定一個時間段,在當前線程收到通知或者指定的時間 rel_time 超時之前,該線程都會處於阻塞狀態。而一旦超時或者收到了其他線程的通知,wait_for 返回,剩下的處理步驟和 wait() 類似。
與 std::condition_variable::wait_for 類似,但是 wait_until 可以指定一個時間點,在當前線程收到通知或者指定的時間點 abs_time 超時之前,該線程都會處於阻塞狀態。而一旦超時或者收到了其他線程的通知,wait_until 返回,剩下的處理步驟和 wait_for() 類似。
// condition_variable::wait_for example #include <iostream> // std::cout #include <thread> // std::thread #include <chrono> // std::chrono::seconds #include <mutex> // std::mutex, std::unique_lock #include <condition_variable> // std::condition_variable, std::cv_status std::condition_variable cv; int value; void read_value() { std::cin >> value; cv.notify_one(); } int main () { std::cout << "Please, enter an integer (I‘ll be printing dots): \n"; std::thread th(read_value); std::mutex mtx; std::unique_lock<std::mutex> lck(mtx); while (cv.wait_for(lck,std::chrono::seconds(1))==std::cv_status::timeout) { std::cout << ‘.‘ << std::endl; } std::cout << "You entered: " << value << ‘\n‘; //等待th線程執行完 th.join(); return 0; }
如果用wait_until,只需要將條件改為時間點即可:
while (cv.wait_for(lck,std::chrono::seconds(1))==std::cv_status::timeout) while (cv.wait_until(lck, std::chrono::system_clock::now() +std::chrono::seconds(1)) == std::cv_status::timeout)
C11線程管理:條件變量