1. 程式人生 > >綜合運用: C++11 多線程下生產者消費者模型詳解(轉)

綜合運用: C++11 多線程下生產者消費者模型詳解(轉)

並發 rep 生產 我會 交流 模型 操作 const ref

生產者消費者問題是多線程並發中一個非常經典的問題,相信學過操作系統課程的同學都清楚這個問題的根源。本文將就四種情況分析並介紹生產者和消費者問題,它們分別是:單生產者-單消費者模型,單生產者-多消費者模型,多生產者-單消費者模型,多生產者-多消費者模型,我會給出四種情況下的 C++11 並發解決方案,如果文中出現了錯誤或者你對代碼有異議,歡迎交流 ;-)。

單生產者-單消費者模型

顧名思義,單生產者-單消費者模型中只有一個生產者和一個消費者,生產者不停地往產品庫中放入產品,消費者則從產品庫中取走產品,產品庫容積有限制,只能容納一定數目的產品,如果生產者生產產品的速度過快,則需要等待消費者取走產品之後,產品庫不為空才能繼續往產品庫中放置新的產品,相反,如果消費者取走產品的速度過快,則可能面臨產品庫中沒有產品可使用的情況,此時需要等待生產者放入一個產品後,消費者才能繼續工作。C++11實現單生產者單消費者模型的代碼如下:

 1 #include <unistd.h>
 2 
 3 #include <cstdlib>
 4 #include <condition_variable>
 5 #include <iostream>
 6 #include <mutex>
 7 #include <thread>
 8 
 9 static const int kItemRepositorySize  = 10; // Item buffer size.
10 static const int kItemsToProduce  = 1000;   // How many items we plan to produce.
11 12 struct ItemRepository { 13 int item_buffer[kItemRepositorySize]; // 產品緩沖區, 配合 read_position 和 write_position 模型環形隊列. 14 size_t read_position; // 消費者讀取產品位置. 15 size_t write_position; // 生產者寫入產品位置. 16 std::mutex mtx; // 互斥量,保護產品緩沖區 17 std::condition_variable repo_not_full; // 條件變量, 指示產品緩沖區不為滿.
18 std::condition_variable repo_not_empty; // 條件變量, 指示產品緩沖區不為空. 19 } gItemRepository; // 產品庫全局變量, 生產者和消費者操作該變量. 20 21 typedef struct ItemRepository ItemRepository; 22 23 24 void ProduceItem(ItemRepository *ir, int item) 25 { 26 std::unique_lock<std::mutex> lock(ir->mtx); 27 while(((ir->write_position + 1) % kItemRepositorySize) 28 == ir->read_position) { // item buffer is full, just wait here. 29 std::cout << "Producer is waiting for an empty slot...\n"; 30 (ir->repo_not_full).wait(lock); // 生產者等待"產品庫緩沖區不為滿"這一條件發生. 31 } 32 33 (ir->item_buffer)[ir->write_position] = item; // 寫入產品. 34 (ir->write_position)++; // 寫入位置後移. 35 36 if (ir->write_position == kItemRepositorySize) // 寫入位置若是在隊列最後則重新設置為初始位置. 37 ir->write_position = 0; 38 39 (ir->repo_not_empty).notify_all(); // 通知消費者產品庫不為空. 40 lock.unlock(); // 解鎖. 41 } 42 43 int ConsumeItem(ItemRepository *ir) 44 { 45 int data; 46 std::unique_lock<std::mutex> lock(ir->mtx); 47 // item buffer is empty, just wait here. 48 while(ir->write_position == ir->read_position) { 49 std::cout << "Consumer is waiting for items...\n"; 50 (ir->repo_not_empty).wait(lock); // 消費者等待"產品庫緩沖區不為空"這一條件發生. 51 } 52 53 data = (ir->item_buffer)[ir->read_position]; // 讀取某一產品 54 (ir->read_position)++; // 讀取位置後移 55 56 if (ir->read_position >= kItemRepositorySize) // 讀取位置若移到最後,則重新置位. 57 ir->read_position = 0; 58 59 (ir->repo_not_full).notify_all(); // 通知消費者產品庫不為滿. 60 lock.unlock(); // 解鎖. 61 62 return data; // 返回產品. 63 } 64 65 66 void ProducerTask() // 生產者任務 67 { 68 for (int i = 1; i <= kItemsToProduce; ++i) { 69 // sleep(1); 70 std::cout << "Produce the " << i << "^th item..." << std::endl; 71 ProduceItem(&gItemRepository, i); // 循環生產 kItemsToProduce 個產品. 72 } 73 } 74 75 void ConsumerTask() // 消費者任務 76 { 77 static int cnt = 0; 78 while(1) { 79 sleep(1); 80 int item = ConsumeItem(&gItemRepository); // 消費一個產品. 81 std::cout << "Consume the " << item << "^th item" << std::endl; 82 if (++cnt == kItemsToProduce) break; // 如果產品消費個數為 kItemsToProduce, 則退出. 83 } 84 } 85 86 void InitItemRepository(ItemRepository *ir) 87 { 88 ir->write_position = 0; // 初始化產品寫入位置. 89 ir->read_position = 0; // 初始化產品讀取位置. 90 } 91 92 int main() 93 { 94 InitItemRepository(&gItemRepository); 95 std::thread producer(ProducerTask); // 創建生產者線程. 96 std::thread consumer(ConsumerTask); // 創建消費之線程. 97 producer.join(); 98 consumer.join(); 99 }

單生產者-多消費者模型

與單生產者和單消費者模型不同的是,單生產者-多消費者模型中可以允許多個消費者同時從產品庫中取走產品。所以除了保護產品庫在多個讀寫線程下互斥之外,還需要維護消費者取走產品的計數器,代碼如下:

  1 #include <unistd.h>
  2 
  3 #include <cstdlib>
  4 #include <condition_variable>
  5 #include <iostream>
  6 #include <mutex>
  7 #include <thread>
  8 
  9 static const int kItemRepositorySize  = 4; // Item buffer size.
 10 static const int kItemsToProduce  = 10;   // How many items we plan to produce.
 11 
 12 struct ItemRepository {
 13     int item_buffer[kItemRepositorySize];
 14     size_t read_position;
 15     size_t write_position;
 16     size_t item_counter;
 17     std::mutex mtx;
 18     std::mutex item_counter_mtx;
 19     std::condition_variable repo_not_full;
 20     std::condition_variable repo_not_empty;
 21 } gItemRepository;
 22 
 23 typedef struct ItemRepository ItemRepository;
 24 
 25 
 26 void ProduceItem(ItemRepository *ir, int item)
 27 {
 28     std::unique_lock<std::mutex> lock(ir->mtx);
 29     while(((ir->write_position + 1) % kItemRepositorySize)
 30         == ir->read_position) { // item buffer is full, just wait here.
 31         std::cout << "Producer is waiting for an empty slot...\n";
 32         (ir->repo_not_full).wait(lock);
 33     }
 34 
 35     (ir->item_buffer)[ir->write_position] = item;
 36     (ir->write_position)++;
 37 
 38     if (ir->write_position == kItemRepositorySize)
 39         ir->write_position = 0;
 40 
 41     (ir->repo_not_empty).notify_all();
 42     lock.unlock();
 43 }
 44 
 45 int ConsumeItem(ItemRepository *ir)
 46 {
 47     int data;
 48     std::unique_lock<std::mutex> lock(ir->mtx);
 49     // item buffer is empty, just wait here.
 50     while(ir->write_position == ir->read_position) {
 51         std::cout << "Consumer is waiting for items...\n";
 52         (ir->repo_not_empty).wait(lock);
 53     }
 54 
 55     data = (ir->item_buffer)[ir->read_position];
 56     (ir->read_position)++;
 57 
 58     if (ir->read_position >= kItemRepositorySize)
 59         ir->read_position = 0;
 60 
 61     (ir->repo_not_full).notify_all();
 62     lock.unlock();
 63 
 64     return data;
 65 }
 66 
 67 
 68 void ProducerTask()
 69 {
 70     for (int i = 1; i <= kItemsToProduce; ++i) {
 71         // sleep(1);
 72         std::cout << "Producer thread " << std::this_thread::get_id()
 73             << " producing the " << i << "^th item..." << std::endl;
 74         ProduceItem(&gItemRepository, i);
 75     }
 76     std::cout << "Producer thread " << std::this_thread::get_id()
 77                 << " is exiting..." << std::endl;
 78 }
 79 
 80 void ConsumerTask()
 81 {
 82     bool ready_to_exit = false;
 83     while(1) {
 84         sleep(1);
 85         std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);
 86         if (gItemRepository.item_counter < kItemsToProduce) {
 87             int item = ConsumeItem(&gItemRepository);
 88             ++(gItemRepository.item_counter);
 89             std::cout << "Consumer thread " << std::this_thread::get_id()
 90                 << " is consuming the " << item << "^th item" << std::endl;
 91         } else ready_to_exit = true;
 92         lock.unlock();
 93         if (ready_to_exit == true) break;
 94     }
 95     std::cout << "Consumer thread " << std::this_thread::get_id()
 96                 << " is exiting..." << std::endl;
 97 }
 98 
 99 void InitItemRepository(ItemRepository *ir)
100 {
101     ir->write_position = 0;
102     ir->read_position = 0;
103     ir->item_counter = 0;
104 }
105 
106 int main()
107 {
108     InitItemRepository(&gItemRepository);
109     std::thread producer(ProducerTask);
110     std::thread consumer1(ConsumerTask);
111     std::thread consumer2(ConsumerTask);
112     std::thread consumer3(ConsumerTask);
113     std::thread consumer4(ConsumerTask);
114 
115     producer.join();
116     consumer1.join();
117     consumer2.join();
118     consumer3.join();
119     consumer4.join();
120 }

多生產者-單消費者模型

與單生產者和單消費者模型不同的是,多生產者-單消費者模型中可以允許多個生產者同時向產品庫中放入產品。所以除了保護產品庫在多個讀寫線程下互斥之外,還需要維護生產者放入產品的計數器,代碼如下:

  1 #include <unistd.h>
  2 
  3 #include <cstdlib>
  4 #include <condition_variable>
  5 #include <iostream>
  6 #include <mutex>
  7 #include <thread>
  8 
  9 static const int kItemRepositorySize  = 4; // Item buffer size.
 10 static const int kItemsToProduce  = 10;   // How many items we plan to produce.
 11 
 12 struct ItemRepository {
 13     int item_buffer[kItemRepositorySize];
 14     size_t read_position;
 15     size_t write_position;
 16     size_t item_counter;
 17     std::mutex mtx;
 18     std::mutex item_counter_mtx;
 19     std::condition_variable repo_not_full;
 20     std::condition_variable repo_not_empty;
 21 } gItemRepository;
 22 
 23 typedef struct ItemRepository ItemRepository;
 24 
 25 
 26 void ProduceItem(ItemRepository *ir, int item)
 27 {
 28     std::unique_lock<std::mutex> lock(ir->mtx);
 29     while(((ir->write_position + 1) % kItemRepositorySize)
 30         == ir->read_position) { // item buffer is full, just wait here.
 31         std::cout << "Producer is waiting for an empty slot...\n";
 32         (ir->repo_not_full).wait(lock);
 33     }
 34 
 35     (ir->item_buffer)[ir->write_position] = item;
 36     (ir->write_position)++;
 37 
 38     if (ir->write_position == kItemRepositorySize)
 39         ir->write_position = 0;
 40 
 41     (ir->repo_not_empty).notify_all();
 42     lock.unlock();
 43 }
 44 
 45 int ConsumeItem(ItemRepository *ir)
 46 {
 47     int data;
 48     std::unique_lock<std::mutex> lock(ir->mtx);
 49     // item buffer is empty, just wait here.
 50     while(ir->write_position == ir->read_position) {
 51         std::cout << "Consumer is waiting for items...\n";
 52         (ir->repo_not_empty).wait(lock);
 53     }
 54 
 55     data = (ir->item_buffer)[ir->read_position];
 56     (ir->read_position)++;
 57 
 58     if (ir->read_position >= kItemRepositorySize)
 59         ir->read_position = 0;
 60 
 61     (ir->repo_not_full).notify_all();
 62     lock.unlock();
 63 
 64     return data;
 65 }
 66 
 67 void ProducerTask()
 68 {
 69     bool ready_to_exit = false;
 70     while(1) {
 71         sleep(1);
 72         std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);
 73         if (gItemRepository.item_counter < kItemsToProduce) {
 74             ++(gItemRepository.item_counter);
 75             ProduceItem(&gItemRepository, gItemRepository.item_counter);
 76             std::cout << "Producer thread " << std::this_thread::get_id()
 77                 << " is producing the " << gItemRepository.item_counter
 78                 << "^th item" << std::endl;
 79         } else ready_to_exit = true;
 80         lock.unlock();
 81         if (ready_to_exit == true) break;
 82     }
 83     std::cout << "Producer thread " << std::this_thread::get_id()
 84                 << " is exiting..." << std::endl;
 85 }
 86 
 87 void ConsumerTask()
 88 {
 89     static int item_consumed = 0;
 90     while(1) {
 91         sleep(1);
 92         ++item_consumed;
 93         if (item_consumed <= kItemsToProduce) {
 94             int item = ConsumeItem(&gItemRepository);
 95             std::cout << "Consumer thread " << std::this_thread::get_id()
 96                 << " is consuming the " << item << "^th item" << std::endl;
 97         } else break;
 98     }
 99     std::cout << "Consumer thread " << std::this_thread::get_id()
100                 << " is exiting..." << std::endl;
101 }
102 
103 void InitItemRepository(ItemRepository *ir)
104 {
105     ir->write_position = 0;
106     ir->read_position = 0;
107     ir->item_counter = 0;
108 }
109 
110 int main()
111 {
112     InitItemRepository(&gItemRepository);
113     std::thread producer1(ProducerTask);
114     std::thread producer2(ProducerTask);
115     std::thread producer3(ProducerTask);
116     std::thread producer4(ProducerTask);
117     std::thread consumer(ConsumerTask);
118 
119     producer1.join();
120     producer2.join();
121     producer3.join();
122     producer4.join();
123     consumer.join();
124 }

多生產者-多消費者模型

該模型可以說是前面兩種模型的綜合,程序需要維護兩個計數器,分別是生產者已生產產品的數目和消費者已取走產品的數目。另外也需要保護產品庫在多個生產者和多個消費者互斥地訪問。

代碼如下:

  1 #include <unistd.h>
  2 
  3 #include <cstdlib>
  4 #include <condition_variable>
  5 #include <iostream>
  6 #include <mutex>
  7 #include <thread>
  8 
  9 static const int kItemRepositorySize  = 4; // Item buffer size.
 10 static const int kItemsToProduce  = 10;   // How many items we plan to produce.
 11 
 12 struct ItemRepository {
 13     int item_buffer[kItemRepositorySize];
 14     size_t read_position;
 15     size_t write_position;
 16     size_t produced_item_counter;
 17     size_t consumed_item_counter;
 18     std::mutex mtx;
 19     std::mutex produced_item_counter_mtx;
 20     std::mutex consumed_item_counter_mtx;
 21     std::condition_variable repo_not_full;
 22     std::condition_variable repo_not_empty;
 23 } gItemRepository;
 24 
 25 typedef struct ItemRepository ItemRepository;
 26 
 27 
 28 void ProduceItem(ItemRepository *ir, int item)
 29 {
 30     std::unique_lock<std::mutex> lock(ir->mtx);
 31     while(((ir->write_position + 1) % kItemRepositorySize)
 32         == ir->read_position) { // item buffer is full, just wait here.
 33         std::cout << "Producer is waiting for an empty slot...\n";
 34         (ir->repo_not_full).wait(lock);
 35     }
 36 
 37     (ir->item_buffer)[ir->write_position] = item;
 38     (ir->write_position)++;
 39 
 40     if (ir->write_position == kItemRepositorySize)
 41         ir->write_position = 0;
 42 
 43     (ir->repo_not_empty).notify_all();
 44     lock.unlock();
 45 }
 46 
 47 int ConsumeItem(ItemRepository *ir)
 48 {
 49     int data;
 50     std::unique_lock<std::mutex> lock(ir->mtx);
 51     // item buffer is empty, just wait here.
 52     while(ir->write_position == ir->read_position) {
 53         std::cout << "Consumer is waiting for items...\n";
 54         (ir->repo_not_empty).wait(lock);
 55     }
 56 
 57     data = (ir->item_buffer)[ir->read_position];
 58     (ir->read_position)++;
 59 
 60     if (ir->read_position >= kItemRepositorySize)
 61         ir->read_position = 0;
 62 
 63     (ir->repo_not_full).notify_all();
 64     lock.unlock();
 65 
 66     return data;
 67 }
 68 
 69 void ProducerTask()
 70 {
 71     bool ready_to_exit = false;
 72     while(1) {
 73         sleep(1);
 74         std::unique_lock<std::mutex> lock(gItemRepository.produced_item_counter_mtx);
 75         if (gItemRepository.produced_item_counter < kItemsToProduce) {
 76             ++(gItemRepository.produced_item_counter);
 77             ProduceItem(&gItemRepository, gItemRepository.produced_item_counter);
 78             std::cout << "Producer thread " << std::this_thread::get_id()
 79                 << " is producing the " << gItemRepository.produced_item_counter
 80                 << "^th item" << std::endl;
 81         } else ready_to_exit = true;
 82         lock.unlock();
 83         if (ready_to_exit == true) break;
 84     }
 85     std::cout << "Producer thread " << std::this_thread::get_id()
 86                 << " is exiting..." << std::endl;
 87 }
 88 
 89 void ConsumerTask()
 90 {
 91     bool ready_to_exit = false;
 92     while(1) {
 93         sleep(1);
 94         std::unique_lock<std::mutex> lock(gItemRepository.consumed_item_counter_mtx);
 95         if (gItemRepository.consumed_item_counter < kItemsToProduce) {
 96             int item = ConsumeItem(&gItemRepository);
 97             ++(gItemRepository.consumed_item_counter);
 98             std::cout << "Consumer thread " << std::this_thread::get_id()
 99                 << " is consuming the " << item << "^th item" << std::endl;
100         } else ready_to_exit = true;
101         lock.unlock();
102         if (ready_to_exit == true) break;
103     }
104     std::cout << "Consumer thread " << std::this_thread::get_id()
105                 << " is exiting..." << std::endl;
106 }
107 
108 void InitItemRepository(ItemRepository *ir)
109 {
110     ir->write_position = 0;
111     ir->read_position = 0;
112     ir->produced_item_counter = 0;
113     ir->consumed_item_counter = 0;
114 }
115 
116 int main()
117 {
118     InitItemRepository(&gItemRepository);
119     std::thread producer1(ProducerTask);
120     std::thread producer2(ProducerTask);
121     std::thread producer3(ProducerTask);
122     std::thread producer4(ProducerTask);
123 
124     std::thread consumer1(ConsumerTask);
125     std::thread consumer2(ConsumerTask);
126     std::thread consumer3(ConsumerTask);
127     std::thread consumer4(ConsumerTask);
128 
129     producer1.join();
130     producer2.join();
131     producer3.join();
132     producer4.join();
133 
134     consumer1.join();
135     consumer2.join();
136     consumer3.join();
137     consumer4.join();
138 }

轉自:http://www.cnblogs.com/haippy/p/3252092.html

綜合運用: C++11 多線程下生產者消費者模型詳解(轉)