1. 程式人生 > >C11線程管理:條件變量

C11線程管理:條件變量

read 條件 而在 最大 integer ase tex thread 如果

1、簡介

  C11提供另外一種用於等待的同步機制,它可以阻塞一個或者多個線程,直到收到另外一個線程發出的通知或者超時,才會喚醒當前阻塞的線程。條件變量要和互斥量配合起來使用。

  condition_variable,配合std::unique_lock<std::mutex>進行wait操作。
  condition_variable_any,和任意帶有lock、unlock語意 的mutex搭配使用,比較靈活,但是效率比condition_variable低。

  條件變量的使用過程如下:

  a.擁有條件變量的線程獲取互斥量。
  b.循環檢查某個條件,如果條件不滿足,則阻塞線程直到滿足;如果條件滿足,則向下執行。
  c.某個線程滿足條件並執行完成之後,調用notify_one或者notify_all來喚醒一個或者多個線程。

2、實踐

  可以用條件變量來實現一個同步隊列,同步隊列作為一個線程安全的數據共享區,經常用於線程之間的讀取,比如半同步半異步線程池的同步隊列。

#include <iostream>   
#include <chrono>  
#include <thread> 
#include <mutex>  
#include<condition_variable>
#include <list>

template<typename T>
class SyncQueue
{
public:
    SyncQueue(
int maxSize) :m_maxSize(maxSize){} void Put(const T & t) { std::lock_guard<std::mutex> locker(m_mutex); while (IsFull()) { std::cout << "緩沖區滿了,需要等待..." << std::endl; m_notFull.wait(m_mutex); } m_queue.push_back(t); m_notEmpty.notify_one(); }
void Take(const T & t) { std::lock_guard<std::mutex> locker(m_mutex); while (IsEmpty()) { std::cout << "緩沖區空了,需要等待..." << std::endl; m_notEmpty.wait(m_mutex); } t = m_queue.front(); m_queue.pop_front(t); m_notFull.notify_one(); } bool Empty() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.empty(); } bool Full() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.size() == m_maxSize; } size_t Size() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.size(); } private: bool IsFull() const { return m_queue.size() == m_maxSize; } bool IsEmpty() const { return m_queue.empty(); } private: std::list<T> m_queue;              //緩沖區 std::mutex m_mutex;              //互斥量 std::condition_variable_any m_notEmpty; //不為空的條件變量 std::condition_variable_any m_notFull; //沒有滿的條件變量 int m_maxSize;            //同步隊列最大容量 };

  這個隊列中,沒有滿的情況下可以插入數據,如果滿了,則會調用m_notFull阻塞線程等待,等待消費線程取出數據之後發出一個未滿的通知,然後前面阻塞的線程會被喚醒繼續往下執行;如果隊列為空,不能取出數據,調用m_notEmpty來阻塞當前線程,等待插入數據的線程插入數據發出不為空的通知,喚醒被阻塞的線程,往下執行讀出數據。

  條件變量的wait方法還有個重載方法,可以接受一個條件。

std::lock_guard<std::mutex> locker(m_mutex);
while (IsFull())
{
    std::cout << "緩沖區滿了,需要等待..." << std::endl;
    m_notFull.wait(m_mutex);
}

  可以寫為這樣:

std::lock_guard<std::mutex> locker(m_mutex);
m_notFull.wait(locker, [this]{ return !IsFull();});

  兩種寫法都一樣,後者代碼更加簡潔,條件變量先檢查判斷式是否滿足條件,如果滿足,重新獲取mutex,結束wait,繼續往下執行;如果不滿足條件,則釋放mutex,將線程置為waiting狀態,繼續等待。

  需要註意的是,wait函數會釋放掉mutex,而lock_guard還擁有mutex,他只在出了作用域之後才會釋放掉mutex,所以這時並不會釋放,但是執行wait會提前釋放,而在wait提前釋放掉鎖之後,會處於等待狀態,在notify_one/all喚醒之後,會先獲取mutex,相當於之前的mutex又獲取到了,所以在出作用域的時候,lock_guard釋放鎖不會產生問題。

  在這種情況下,如果用unique_lock語意更準確,因為unique_lock不像lock_guard一樣只能在析構的時候才能釋放鎖,它可以隨時釋放鎖,在wait的時候讓uniq_lock釋放鎖,語意更加準確。

  上述例子中,可以用unique_lock來替換掉lock_guard,condition_variable來替換掉condition_variable_any,會使代碼更加清晰,效率也更高。

3、超時等待

  除了wait還可以使用超時等待函數std::condition_variable::wait_for和std::condition_variable::wait_until。

  與 std::condition_variable::wait() 類似,不過 wait_for 可以指定一個時間段,在當前線程收到通知或者指定的時間 rel_time 超時之前,該線程都會處於阻塞狀態。而一旦超時或者收到了其他線程的通知,wait_for 返回,剩下的處理步驟和 wait() 類似。

  與 std::condition_variable::wait_for 類似,但是 wait_until 可以指定一個時間點,在當前線程收到通知或者指定的時間點 abs_time 超時之前,該線程都會處於阻塞狀態。而一旦超時或者收到了其他線程的通知,wait_until 返回,剩下的處理步驟和 wait_for() 類似。

// condition_variable::wait_for example
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <chrono>             // std::chrono::seconds
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable, std::cv_status

std::condition_variable cv;

int value;

void read_value() {
  std::cin >> value;
  cv.notify_one();
}

int main ()
{
  std::cout << "Please, enter an integer (I‘ll be printing dots): \n";
  std::thread th(read_value);

  std::mutex mtx;
  std::unique_lock<std::mutex> lck(mtx);
  while (cv.wait_for(lck,std::chrono::seconds(1))==std::cv_status::timeout) {
    std::cout << . << std::endl;
  }
  std::cout << "You entered: " << value << \n;

  //等待th線程執行完
  th.join();

  return 0;
}

  如果用wait_until,只需要將條件改為時間點即可:

while (cv.wait_for(lck,std::chrono::seconds(1))==std::cv_status::timeout)
while (cv.wait_until(lck, std::chrono::system_clock::now() +std::chrono::seconds(1)) == std::cv_status::timeout)

C11線程管理:條件變量