1. 程式人生 > >C++11中的併發

C++11中的併發

在 C++98 的時代,C++標準並沒有包含多執行緒的支援,人們只能直接呼叫作業系統提供的 SDK API 來編寫多執行緒程式,不同的作業系統提供的 SDK API 以及執行緒控制能力不盡相同。到了 C++11,終於在標準之中加入了正式的多執行緒的支援,從而我們可以使用標準形式的類來建立與執行執行緒,也使得我們可以使用標準形式的鎖、原子操作、執行緒本地儲存 (TLS) 等來進行復雜的各種模式的多執行緒程式設計,而且,C++11 還提供了一些高階概念,比如 promise/future,packaged_task,async 等以簡化某些模式的多執行緒程式設計。

 

一:thread

         標頭檔案<thread>中提供了std::thread 對執行緒進行封裝。C++11 所定義的執行緒是和操作系的執行緒是一一對應的,也就是說我們生成的執行緒都是直接接受作業系統的排程的,通過作業系統的相關命令(比如 ps -M 命令)是可以看到的,一個程序所能建立的執行緒數目以及一個作業系統所能建立的總的執行緒數目等都由執行時作業系統限定。

         std::thread類如果在建構函式中指明瞭執行緒入口函式的話,則從建立好thread物件那一刻起,執行緒就開始運行了,此時std::thread物件就表示一個正在執行的執行緒了。但是std::thread建立的執行緒,執行緒入口函式的返回值被忽略了,如果執行緒中丟擲了異常,則會直接呼叫std::terminate結束程序。

         std::thread也可以與底層執行緒不關聯,比如使用預設建構函式建立的std::thread物件、被移動了的std::thread物件、detach或join過的std::thread物件。沒有兩個std::thread物件會表示同一個執行緒,std::thread不支援複製構造或複製賦值的,只能移動構造或移動賦值。

         std::thread有可能丟擲std::system_error異常,表示執行緒無法啟動。該異常表示要麼是std::errc::resource_unavailable_try_again,要麼就是底層實現建立執行緒時發生了問題。

         建立std::thread同時指明執行緒入口函式的建構函式是:

template< class Function, class... Args > 
explicit thread( Function&& f, Args&&... args );

          呼叫執行緒入口函式f時,引數是按值複製或移動傳遞的。因此如果需要傳遞一個引用給入口函式的話,需要使用std::ref或std::cref封裝。

 

std::thread中定義了native_handle_type型別,它具體是什麼型別取決於底層的執行緒庫。 native_handle_type是連線 std::thread 和作業系統執行緒庫之間的橋樑,在 g++(libstdc++) for Linux 裡面,native_handle_type 其實就是 pthread 裡面的 pthread_t 型別,當 std::thread 類的功能不能滿足我們的要求的時候(比如改變某個執行緒的優先順序),可以通過 std::thread 類例項的 native_handle() 返回值作為引數來呼叫相關的 pthread 函式達到目的。

std::thread中還定義了內部id類std::thread::id,id物件表示std::thread物件的唯一標識,如果std::thread目前沒有關聯的執行緒,則其id值為一個預設值std::thread::id()。std::thread定義了get_id成員函式返回其id值。

std::thread物件如果有關聯的活動執行緒的話,則稱其為joinable的,成員函式joinable()可用於檢視std::thread是否為joinable的。如果std::thread是joinable的,則其get_id() != std::thread::id()。如果std::thread其底層執行緒已經執行完,但是尚未被join,則認為其依然是joinable的。下面幾種情況,std::thread::joinable會返回false:std::thread是通過預設建構函式建立(未指定執行緒入口函式);std::thread被移動了(複製或賦值給其他std::thread);std::thread呼叫了detach之後;std::thread呼叫了join之後。

 std::thread成員函式detach()斷開std::thread物件與底層執行緒的關聯,底層執行緒退出後,其資源自動free掉。如果當前std::thread物件沒有關聯底層執行緒,則呼叫detach會丟擲std::system_error異常。呼叫了detach之後,就不能再該std::thread物件上呼叫join了;

std::thread成員函式join,能阻塞當前執行緒的執行,直到執行join的std::thread其底層執行緒執行完成。針對同一個std::thread物件在多個另外執行緒中執行join是未定義行為;如果發生錯誤則會丟擲std::system_error異常,比如若std::thread的joinable為false,則是invalid_argument錯誤;如果執行緒內部自己呼叫join,則是resource_deadlock_would_occur錯誤。

std::thread成員函式swap可以將當前std::thread與其他std::thread物件的底層控制代碼進行互換。相當於二者互換了身份,各自掌握對方的執行緒。

如果std::thread執行析構時,其依然關聯著底層執行緒(也就是joinable返回true),則會呼叫std::terminate。

如果將std::thread物件A移動賦值給物件B,則若B依然關聯底層執行緒(joinable返回true)的話,則會呼叫std::terminate。移動賦值之後,B就掌握了A的執行緒,而A就成了預設構造狀態。

std::thread還有一個靜態成員函式hardware_concurrency,用於返回當前底層實現支援的最大執行緒併發數,其值只能用於參考。

 

除了定義std::thread類,<thread>標頭檔案中還定義了名稱空間this_thread表示當前執行緒,並在其中定義了4個輔助函式:yield、get_id、sleep_for、sleep_until。這四個函式一般是線上程執行函式中呼叫。

std::this_thread::yield用於提示底層實現排程執行其他執行緒。它的具體實現依賴於底層實現,特別是作業系統當前使用的排程策略。比如對於實時先入先出排程策略(如linux中的SCHED_FIFO)而言,該函式會掛起當前執行緒,將其插入到相同優先順序的就緒佇列末尾(如果當前沒有相同優先順序的其他執行緒,則yield無效果);

std::this_thread::get_id使用者獲取當前執行緒的id;

std::this_thread::sleep_for和std::this_thread::sleep_until用於將當前執行緒的執行休眠一段時間;

 

 

二:鎖

         標頭檔案<mutex>中定義了std::mutex類,用於表示非遞迴鎖。如果某個執行緒已經擁有了std::mutex,仍然呼叫std::mutex::lock的話,是未定義行為;如果std::mutex析構時,仍有任一執行緒擁有它,則是未定義行為;如果某執行緒結束時,依然擁有某個std::mutex,則是未定義行為。std::mutex不可複製,不可移動,它只有一個預設建構函式。

         std::mutex定義了native_handle_type型別,類似於std::thread::native_handle_type,它取決於具體的底層實現,成員函式std::mutex::native_handle用於返回該鎖的底層實現控制代碼;

         std::mutex定義了lock、try_lock和unlock成員函式函式。一般情況下,不直接呼叫這些函式,而是使用std::unique_lock或std::lock_guard等以RAII的方式管理鎖。

 

         <mutex>中還定義了std::timed_mutex,它類似於std::mutex,只不過另外提供了try_lock_for和try_lock_until函式,這倆函式要麼在能立即持有鎖時返回true,要麼最多阻塞一段時間後返回false。

         <mutex>中還定義了recursive_mutex、recursive_timed_mutex兩種遞迴鎖,以及shared_mutex、shared_timed_mutex兩種共享鎖(讀寫鎖)。

 

         <mutex>中定義了lock_guard,用於對鎖進行RAII式的封裝,建立lock_guard時會持有鎖,析構lock_guard時會釋放鎖。lock_guard不可複製。

int g_i = 0;
std::mutex g_i_mutex;  // protects g_i
 
void safe_increment() {
    std::lock_guard<std::mutex> lock(g_i_mutex);
    ++g_i;
 
    std::cout << std::this_thread::get_id() << ": " << g_i << '\n';
 
    // g_i_mutex is automatically released when lock goes out of scope
}

std::thread t1(safe_increment);
std::thread t2(safe_increment);

t1.join();
t2.join();

         

         <mutex>中還提供了defer_lock_t、try_to_lock_t、adopt_lock_t以及unique_lock、shared_lock,結合std::lock或std::try_lock函式,可以方便的對上面的鎖進行封裝,最常見的就是封裝多個鎖,以避免死鎖的發生。具體可參考en.cppreference.com中的例子。

 

         <mutex>中還提供了std::call_once函式,保證某個函式即使在多個執行緒中同時呼叫時,也只被呼叫一次。

template< class Callable, class... Args >
void call_once( std::once_flag& flag, Callable&& f, Args&&... args );

          如果呼叫call_once時flag已經被設定,說明函式f已經被呼叫過了,這種情況下call_once直接返回;如果flag未被設定,則呼叫call_once時會直接呼叫std​::​forward<Callable>(f),並向其傳遞std​::​forward<Args>(args)...引數。如果此時f內丟擲了異常,則異常會傳遞給call_once的呼叫者,並且不會設定flag,這樣可以使得後續使用同一標誌呼叫call_once時能繼續呼叫f函式。

std::once_flag flag1, flag2;
 
void simple_do_once() {
    std::call_once(flag1, [](){ std::cout << "Simple example: called once\n"; });
}
 
void may_throw_function(bool do_throw) {
  if (do_throw) {
    std::cout << "throw: call_once will retry\n"; // this may appear more than once
    throw std::exception();
  }
  std::cout << "Didn't throw, call_once will not attempt again\n"; // guaranteed once
}
 
void do_once(bool do_throw) {
  try {
    std::call_once(flag2, may_throw_function, do_throw);
  }
  catch (...) {
  }
}
 
std::thread st1(simple_do_once);
std::thread st2(simple_do_once);
std::thread st3(simple_do_once);
std::thread st4(simple_do_once);
st1.join();
st2.join();
st3.join();
st4.join();

std::thread t1(do_once, true);
std::thread t2(do_once, true);
std::thread t3(do_once, false);
std::thread t4(do_once, true);
t1.join();
t2.join();
t3.join();
t4.join();

          上面程式碼的結果是:

Simple example: called once
throw: call_once will retry
throw: call_once will retry
Didn't throw, call_once will not attempt again

  

 

三:條件變數

         <condition_variable>標頭檔案中提供了condition_variable類以對條件變數進行支援。條件變數也是一種同步原語,它可以使多個執行緒阻塞,直到另一個執行緒修改了某共享變數並且對條件變數進行通知之後,才解除阻塞。

         修改共享變數的執行緒需要:持有某種鎖(一般是通過std::lock_guard),在持有鎖的情況下修改變數,在std::condition_variable上執行notify_one或notify_all(執行通知時不需要持有鎖)。即使共享變數是原子的,也需要在持有鎖的情況下進行修改,以便能夠正確的通知到等待條件變數的執行緒。

         等待條件變數std::condition_variable的執行緒需要:持有一個std::unique_lock<std::mutex>(該鎖也是修改共享變數執行緒需要持有的鎖);執行wait、wait_for或wait_until,這些等待操作會原子的釋放鎖並掛起執行緒;當條件變數得到通知時,或超時時間到時,或者發生虛假喚醒時,執行緒醒來並且原子性的獲取鎖。此時執行緒應該檢查條件,如果條件未滿足(虛假喚醒、超時時間到時)繼續等待。

         std::condition_variable只能與std::unique_lock<std::mutex>一起使用,這種約束使得在一些平臺上能夠獲得最大效率。<condition_variable>中提供了std::condition_variable_any條件變數,可以與任意型別的鎖(如std::shared_lock)一起工作。

         std::condition_variable不能進行復制構造、移動構造,也不能複製賦值或移動賦值。

 

         類似於std::thread和std::mutex,std::condition_variable也提供了native_handle_type型別和native_handle函式,用於返回條件變數底層實現的控制代碼。

 

         std::condition_variable的wait成員函式:

void wait( std::unique_lock<std::mutex>& lock );

template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );

          wait會阻塞當前執行緒,直到其他執行緒在相同條件變數上呼叫了nofify_one或notify_all,或者直到發生了虛假喚醒。呼叫wait之前需要先鎖住lock,呼叫wait會原子的釋放lock,阻塞當前執行緒,將當前執行緒新增到等待條件變數(*this)的執行緒列表中。當在條件變數上呼叫nofify_one或notify_all,或是發生虛假喚醒時,當前執行緒解除阻塞,並且再次鎖住lock。

         第二個過載實際上相當於:

while (!pred()) {
    wait(lock);
}

          注意lock只能是std::unique_lock<std::mutex>。如果lock沒有鎖住就呼叫wait,這是未定義行為;如果鎖住的lock與其他等待相同條件變數的執行緒使用lock不是同一個lock,這也是未定義行為。如果在notify_one之後才呼叫wait,則該wait不會喚醒;

std::condition_variable cv;
std::mutex cv_m; // This mutex is used for three purposes:
                 // 1) to synchronize accesses to i
                 // 2) to synchronize accesses to std::cerr
                 // 3) for the condition variable cv
int i = 0;
 
void waits() {
    std::unique_lock<std::mutex> lk(cv_m);
    std::cerr << "Waiting... \n";
    cv.wait(lk, []{return i == 1;});
    std::cerr << "...finished waiting. i == 1\n";
}
 
void signals() {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    {
        std::lock_guard<std::mutex> lk(cv_m);
        std::cerr << "Notifying...\n";
    }
    cv.notify_all();
 
    std::this_thread::sleep_for(std::chrono::seconds(1));
 
    {
        std::lock_guard<std::mutex> lk(cv_m);
        i = 1;
        std::cerr << "Notifying again...\n";
    }
    cv.notify_all();
}
 
int main() {
    std::thread t1(waits), t2(waits), t3(waits), t4(signals);
    t1.join(); 
    t2.join(); 
    t3.join();
    t4.join();
}

 結果是:

Waiting...
Waiting...
Waiting...
Notifying...
Notifying again...
...finished waiting. i == 1
...finished waiting. i == 1
...finished waiting. i == 1

 wait_for和wait_until類似於wait,只不過它們能指定等待的時間。

 

如果當前有任一執行緒等待條件變數,則notify_one函式會喚醒其中的一個。注意,呼叫notify_one的執行緒沒必要鎖住等待條件變數執行緒使用的lock,實際上這麼做有時候是有害無益的,因為這會導致被喚醒的執行緒重新阻塞,直到發起notify的執行緒釋放該lock。不過在需要精確排程事件的場景中,這樣做也是有必要的。比如如果等待執行緒在條件滿足之後會結束程序,這就會導致條件變數被析構,如果在解鎖之後notify之前發生了虛假喚醒,將會導致在已析構的條件變數上呼叫notify。

notify_all將會喚醒所有等待在條件變數上的執行緒。

 

         std::condition_variable只能和std::unique_lock<std::mutex>一起工作,如果需要能夠使用其他鎖,則可以使用std::condition_variable_any。該類是不可複製,也不可移動的。如果std::condition_variable_any也使用std::unique_lock<std::mutex>的話,其效率沒有std::condition_variable高。

 

         <condition_variable>中還提供了std::notify_all_at_thread_exit函式:

void notify_all_at_thread_exit( std::condition_variable& cond,
                                std::unique_lock<std::mutex> lk );

 該函式用於通知其他執行緒當前執行緒已經結束。該函式的操作是:將已經獲取的鎖lk的擁有權轉移到內部儲存,然後當前執行緒退出時,呼叫條件變數的notify_all:

lk.unlock();
cond.notify_all();

          lk會一直鎖住,直到執行緒退出。該函式一般用於detached執行緒退出時呼叫。

 

 

四:Futures

         std::thread有個缺點,就是無法獲取呼叫函式的返回值或者捕獲其丟擲的異常。因此,<future>提供了一系列用於獲取非同步執行緒的返回值或者其丟擲的異常的機制。這些返回值通過共享狀態(shared state)進行通訊,執行緒將返回值或丟擲的異常儲存在共享狀態中,然後其他執行緒通過操作與這些共享狀態關聯的std::future或std::shared_future來獲取內部的值。

        

std::promise提供了這樣一種機制:它關聯一個共享狀態(shared state),該共享狀態中儲存了一些狀態資訊,並用於儲存值或異常。該值或異常之後的某個時刻會被std::promise建立的std::future物件非同步的取出。std::promise的使用是一次性的,而且它不支援複製,只支援移動,當物件A移動構造/賦值給物件B後,A就沒有關聯的共享狀態了。

std::promise對共享狀態可以做三件事:

標記為ready,std::promise將值或異常儲存到共享狀態中,將狀態標記為ready。這樣就解除了其他執行緒在關聯該共享狀態上的期值上的阻塞;

         release,std::promise放棄與共享狀態的關聯。如果當前是共享狀態最後的引用,則共享狀態被銷燬。如果該共享狀態是std::async建立的,並且尚未ready,則該動作會阻塞;

         abandon,std::promise將以std::future_errc::broken_promise為錯誤碼的std::future_error的異常儲存在共享狀態中,然後將共享狀態標記為ready後release。可以這樣理解,promise意思是許諾,許諾將來會把值儲存在共享狀態中,現在它放棄了這個諾言,因而是broken_promise。

如果當前關聯的共享狀態是ready的,std::promise的解構函式會release共享狀態;如果共享狀態不是ready的,則解構函式以std::future_errc::broken_promise為錯誤碼將異常std::future_error儲存到共享狀態中,並將其標記為ready後release。

std::promise的set_value是原子的將值儲存到共享狀態中,並且將其置為ready;set_exception用於在共享狀態中儲存異常;而set_value_at_thread_exit將值儲存到共享狀態後並不會立即將其置為ready,而是線上程退出時才將其置為ready;set_exception_at_thread_exit的行為類似。set_value、set_exception、set_value_at_thread_exit或set_exception_at_thread_exit的行為,就好像他們在更新promise狀態時獲取一個鎖。如果當前沒有關聯的共享狀態,或者共享狀態中已經儲存了值或異常,則這些操作就會丟擲異常。

        

std::promise處於promise-future通訊通道的寫端,在共享狀態中寫入了值之後,任何在該共享狀態上等待的操作(比如std::future::get)就會成功返回。std::promise的get_future成員函式,返回一個關聯相同共享狀態的std::future物件。如果當前沒有關聯的共享狀態,或是get_future已經被呼叫過一次了,則會丟擲異常。如果需要在promise-future傳輸通道上有多個讀端,則可以使用std::future::share。呼叫get_future並不會和set_value、set_exception、set_value_at_thread_exit或set_exception_at_thread_exit產生data race。

         std::future用於訪問非同步操作的結果,非同步操作(通過std::async,std::package_task或std::promise建立)可以提供給非同步操作的建立者一個std::future,然後建立者使用std::future的各種方法訪問、等待,或從std::future中取值。當非同步操作尚未提供一個值時,這些操作可能會導致阻塞。

         std::future的valid成員函式可以判斷該std::future物件是否關聯了共享狀態。預設構造的std::future就沒有關聯的共享狀態,因而其valid返回false;std::future不支援複製(構造或賦值),只支援移動(構造或賦值)。當移動A到B時,A的valid就會返回false;

std::future的解構函式會release共享狀態,如果當前是關聯到共享狀態最後一個引用,則該共享狀態被銷燬;std::future斷開與共享狀態的關聯。如果當前std::future由std::async返回,且共享狀態尚未ready,且當前std::future是該共享狀態的最後引用,則析構時會阻塞。

         std::future的get操作將阻塞到future能夠從共享狀態中取到值。如果當前未關聯共享狀態,則呼叫get是未定義行為;呼叫get後,共享狀態被release了,且valid會返回false。如果是常規的future,如std::future<int>,則get從共享狀態中取到的值,相當於執行了std::move操作;如果模板實參為T&,如std::future<int&>,則get返回的是共享狀態中儲存的值的引用;如果共享狀態中儲存的是異常的話,則呼叫get將會丟擲該異常;

         std::future的wait,wait_for和wait_until用於從共享狀態中等待其變為ready;

         std::future的share操作將共享狀態轉移給std::shared_future,呼叫完share之後,當前future的valid返回false。多個std::shared_future可以關聯同一個共享狀態,std::shared_future支援複製,多執行緒通過自己的shared_future副本同時訪問同一個共享狀態是安全的。

 

         <future>提供的std::packaged_task,可以封裝任意可呼叫物件,而且其本身也具有operator()成員函式,所以它可以作為std::thread的呼叫物件,與普通可呼叫物件不同的是,std::packaged_task關聯了共享狀態,它的返回值或者丟擲的異常可以儲存到該共享狀態中。該值或異常就可以通過get_future函式(get_future只能呼叫一次)返回的std::future進行訪問。

std::packaged_task不可複製,只能移動;類似於std::promise,如果共享狀態在ready之前,std::packaged_task被析構了,則該共享狀態中會儲存一個以std::future_errc::broken_promise為錯誤碼的std::future_error異常。

        

         <future>提供的std::async模板函式可以非同步的呼叫f,返回一個std::future,用於獲取f的返回值,或者其丟擲的異常。std::async呼叫f的方式有兩種,一種是std::launch::async,表示啟動一個新執行緒執行f;另一種是std::launch::deferred,表示在獲取f返回值(通過future)時,才同步的執行f(延遲計算)。

        

https://en.cppreference.com/w/cpp/thread

https://www.ibm.com/developerworks/cn/linux/1412_zhupx_thread/index.html