再談多執行緒模型之生產者消費者(多生產者和單一消費者 )(c++11實現)
阿新 • • 發佈:2020-10-21
1.多生產者&單一消費者
-
1.1 與 單一生產者和多消費者模型類似, 因為存在多個生產者,需要考慮生產者之間的互斥訪問; 消費者只有一個,因此不存在消費者之間的互斥與競爭。
-
1.2 多個生產者, 可能同時放入商品,類比吃水果,父母同時向果盤放入水果,只有子女中的一個吃水果的情況。
-
1.3 具體點
情況 處理 生產者速率 > 消費者速率 消費者只有一個,因此,不存在消費者之間的競爭。生產者存在多個,多個生產者之間生產好資料就需要按照競爭將資料放入緩衝區,誰先拿到鎖,誰就先放入。最開始,生產者有多個,只能通過競爭生產。但是由於生產效率大於消費速率, 所以定然會出現商品數量 > 消費者數量。當商品總量達到總數,則需要暫停生產,等待消費者消費 生產者速率 < 消費者速率 最開始,剩餘放入總數 > 生產者總數,可以同時放入,隨著時間的推移,可能會出現: 剩餘放入空間 > 生產者總數 和 剩餘放入空間 < 生產者總數。 當出現 剩餘空間 < 生產者總數 時,已經不滿足同時放入,此時就需要鎖。來保證 -
1.4 結構體模型
template<typename T> struct repo_ { // 用作互斥訪問緩衝區 std::mutex _mtx_queue; // 緩衝區最大size unsigned int _count_max_queue_10 = 10; // 緩衝區 std::queue<T> _queue; // 緩衝區沒有滿,通知生產者繼續生產 std::condition_variable _cv_queue_not_full; // 緩衝區不為空,通知消費者繼續消費 std::condition_variable _cv_queue_not_empty; // 用於生產者之間的競爭 std::mutex _mtx_pro; // 計算當前已經生產了多少資料了 unsigned int _cnt_cur_pro = 0; repo_(const unsigned int count_max_queue = 10) :_count_max_queue_10(count_max_queue) , _cnt_cur_con(0) { ; } repo_(const repo_&instance) = delete; repo_& operator = (const repo_& instance) = delete; repo_(const repo_&&instance) = delete; repo_& operator = (const repo_&& instance) = delete; };
對比單一生產者和單一消費者 可知,僅僅增加了下面的程式碼
// 用於生產者之間的競爭
std::mutex _mtx_pro;
// 計算當前已經生產了多少資料了
unsigned int _cnt_cur_pro = 0;
- 1.5 生產者執行緒變化如下
template< typename T > void thread_pro(const int thread_index, const int count_max_produce, repo<T>* param_repo) { if (nullptr == param_repo || NULL == param_repo) return; while (true) { bool is_running = true; { // 用於生產者之間競爭 std::unique_lock<std::mutex> lock(param_repo->_mtx_pro); // 緩衝區沒有滿,繼續生產 if (param_repo->_cnt_cur_pro < cnt_total_10) { thread_produce_item<T>(thread_index, *param_repo, param_repo->_cnt_cur_pro); ++param_repo->_cnt_cur_pro; } else is_running = false; } std::this_thread::sleep_for(std::chrono::microseconds(16)); if (!is_running) break; } }
- 1.6 完整原始碼
#pragma once
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
std::mutex _mtx;
std::condition_variable _cv_not_full;
std::condition_variable _cv_not_empty;
const int max_queue_size_10 = 10;
enum
{
// 總生產數目
cnt_total_10 = 10,
};
template<typename T>
struct repo_
{
// 用作互斥訪問緩衝區
std::mutex _mtx_queue;
// 緩衝區最大size
unsigned int _count_max_queue_10 = 10;
// 緩衝區
std::queue<T> _queue;
// 緩衝區沒有滿,通知生產者繼續生產
std::condition_variable _cv_queue_not_full;
// 緩衝區不為空,通知消費者繼續消費
std::condition_variable _cv_queue_not_empty;
// 用於生產者之間的競爭
std::mutex _mtx_pro;
// 計算當前已經生產了多少資料了
unsigned int _cnt_cur_pro = 0;
repo_(const unsigned int count_max_queue = 10) :_count_max_queue_10(count_max_queue)
, _cnt_cur_con(0)
{
;
}
repo_(const repo_&instance) = delete;
repo_& operator = (const repo_& instance) = delete;
repo_(const repo_&&instance) = delete;
repo_& operator = (const repo_&& instance) = delete;
};
template <typename T>
using repo = repo_<T>;
//----------------------------------------------------------------------------------------
// 生產者生產資料
template <typename T>
void thread_produce_item(const int &thread_index, repo<T>& param_repo, const T& repo_item)
{
std::unique_lock<std::mutex> lock(param_repo._mtx_queue);
// 1. 生產者只要發現緩衝區沒有滿, 就繼續生產
param_repo._cv_queue_not_full.wait(lock, [&] { return param_repo._queue.size() < param_repo._count_max_queue_10; });
// 2. 將生產好的商品放入緩衝區
param_repo._queue.push(repo_item);
// log to console
std::cout << "生產者" << thread_index << "生產資料:" << repo_item << "\n";
// 3. 通知消費者可以消費了
//param_repo._cv_queue_not_empty.notify_one();
param_repo._cv_queue_not_empty.notify_one();
}
//----------------------------------------------------------------------------------------
// 消費者消費資料
template <typename T>
T thread_consume_item(const int thread_index, repo<T>& param_repo)
{
std::unique_lock<std::mutex> lock(param_repo._mtx_queue);
// 1. 消費者需要等待【緩衝區不為空】的訊號
param_repo._cv_queue_not_empty.wait(lock, [&] {return !param_repo._queue.empty(); });
// 2. 拿出資料
T item;
item = param_repo._queue.front();
param_repo._queue.pop();
std::cout << "消費者" << thread_index << "從緩衝區中拿出一組資料:" << item << std::endl;
// 3. 通知生產者,繼續生產
param_repo._cv_queue_not_full.notify_one();
return item;
}
//----------------------------------------------------------------------------------------
/**
* @ brief: 生產者執行緒
* @ thread_index - 執行緒標識,區分是哪一個執行緒
* @ count_max_produce - 最大生產次數
* @ param_repo - 緩衝區
* @ return - void
*/
template< typename T >
void thread_pro(const int thread_index, const int count_max_produce, repo<T>* param_repo)
{
if (nullptr == param_repo || NULL == param_repo)
return;
while (true)
{
bool is_running = true;
{
// 用於生產者之間競爭
std::unique_lock<std::mutex> lock(param_repo->_mtx_pro);
// 緩衝區沒有滿,繼續生產
if (param_repo->_cnt_cur_pro < cnt_total_10)
{
thread_produce_item<T>(thread_index, *param_repo, param_repo->_cnt_cur_pro);
++param_repo->_cnt_cur_pro;
}
else
is_running = false;
}
std::this_thread::sleep_for(std::chrono::microseconds(16));
if (!is_running)
break;
}
}
/**
* @ brief: 消費者執行緒
* @ thread_index - 執行緒標識,區分執行緒
* @ param_repo - 緩衝區
* @ return - void
*/
template< typename T >
void thread_con(const int thread_index, repo<T>* param_repo)
{
if (nullptr == param_repo || NULL == param_repo)
return;
static int cnt_cur_con = 0;
while (true)
{
bool is_running = true;
{
// std::unique_lock<std::mutex> lock(param_repo->_mtx_con);
// 還沒消費到指定的數目,繼續消費
if (cnt_cur_con < cnt_total_10)
{
thread_consume_item<T>(thread_index, *param_repo);
++cnt_cur_con;
}
else
is_running = false;
}
std::this_thread::sleep_for(std::chrono::microseconds(16));
// 結束執行緒
if ((!is_running))
break;
}
}
// 入口函式
//----------------------------------------------------------------------------------------
int main(int argc, char *argv, char *env[])
{
// 緩衝區
repo<int> repository;
// 執行緒池
std::vector<std::thread> vec_thread;
// 生產者
vec_thread.push_back(std::thread(thread_pro<int>, 1, cnt_total_10, &repository));
vec_thread.push_back(std::thread(thread_pro<int>, 2, cnt_total_10, &repository));
// 消費者
vec_thread.push_back(std::thread(thread_con<int>, 1, &repository));
for (auto &item : vec_thread)
{
item.join();
}
return 0;
}
- 1.7 可能結果