1. 程式人生 > >基於單鏈表、環形佇列(併發有鎖)的多執行緒生產者消費者模型

基於單鏈表、環形佇列(併發有鎖)的多執行緒生產者消費者模型

基於單鏈表

基於環形佇列

1、環形緩衝區

緩衝區的好處,就是空間換時間和協調快慢執行緒。緩衝區可以用很多設計法,這裡說一下環形緩衝區的幾種設計方案,可以看成是幾種環形緩衝區的模式。設計環形緩衝區涉及到幾個點,一是超出緩衝區大小的的索引如何處理,二是如何表示緩衝區滿和緩衝區空,三是如何入隊、出隊,四是緩衝區中資料長度如何計算。
(以下所有方案,在緩衝區滿時不可再寫入資料,緩衝區空時不能讀資料)

1.1、常規陣列環形緩衝區

設緩衝區大小為N,隊頭out,隊尾in,out、in均是下標表示:

初始時,in=out=0
隊頭隊尾的更新用取模操作,out=(out+1)%N,in=(in+1)%N
out==in表示緩衝區空,(in+1)%N==out表示緩衝區滿
入隊que[in]=value;in=(in+1)%N;
出隊ret =que[out];out=(out+1)%N;
資料長度 len =( in - out + N) % N

1.2、改進版陣列環形緩衝區

同樣假設緩衝區大小為N,隊頭out,隊尾in,out、in為陣列下標,但資料型別為unsigned int。

初始時,in=out=0
上調緩衝區大小N為2的冪,假設為M
隊頭隊尾更新不再取模,直接++out,++in
out==in表示緩衝區空,(in-out)==M表示緩衝區滿
入隊que[in&(M-1)] = value ; ++in;
出隊ret = que[out&(M-1)] ; ++out;
in-out表示資料長度
這個改進的思想來自linux核心迴圈佇列kfifo,這裡解釋一下幾個行為的含義及原理

⑴上調緩衝區大小至2的冪

這是方便取模,x%M == x&(M-1) 為真,位運算的效率比取模要高。用一個例子來分析一下為什麼等式成立的:

假設M=8=2³,那麼M-1=7,二進位制為0000 0111

①若 x<8 —-> x&7=x , x%8 = x,等式成立

②若 x>8 —-> x = 2^a+2^b+2^c+… 比如,51 = 1+2+16+32 = 2^0+2^1+2^4+2^5 ,求 51&7時,由於7的二進位制0000 0111,所以2的冪只要大於等於2³的數,與上7結果都是0,所以2^4 & 7 = 0 , 2^5 & 7 = 0, (2^0+2^1+2^4+2^5) & (7) = 2^0+2^1=3。而根據①,(2^0+2^1)&7 = (2^0+2^1)%8 ,所以51&7=51%8

綜上得證。

⑵out、in型別設計為unsigned int

無符號整形的溢位之後,又從0開始計數:MAX_ UNSIGNED_ INT + 1 = 0 , MAX _ UNSIGNED_INT + 2 = 1 ,… 。

in、out溢位之前,都能通過&把in、out對映到正確的位置上,那溢位之後呢?可以舉個例子來:

假設現在in=MAX_ UNSIGNED_INT,那麼in & (M-1) = M-1 ,也就是最後一個位置,再入隊時,應該從頭開始入隊,也就是0,而in+1也為0,所以即使溢位了,(in+1)&(M-1)仍然能對映到正確的位置。這就是為什麼我們入隊出隊只要做個與對映和++操作就能保證正確的原因。

而根據入隊和出隊的操作,佇列中的元素總是維持在[out,in)這個區間中,由於溢位可能存在,這個區間有三種情況:

out沒溢位,in沒溢位,in-out就是這個緩衝區中資料的長度。
out沒溢位,in溢位,此時資料長度應該是MAX_UNSIGNED_INT - out +1 + in = in - out + MAX_UNSIGNED_INT +1 = in-out。
out溢位,in溢位,此時資料長度也是in-out。
根據上面三種情況,in-out總是表示環形佇列中資料的長度

不得不驚歎,linux核心中的kfifo實現實在是太精妙了。相比前面的版本,所有的取餘操作都改成了與運算,入隊出隊,求緩衝區資料長度都變得非常簡單。

1.3、連結串列實現的環形緩衝區

環形緩衝區的連結串列實現比陣列實現要簡單一些,可以用下圖的這種設計方案:

這裡寫圖片描述

假設要求環形緩衝區大小為N

佇列長度:可以設計一個size的成員,每次O(1)取size,也可以O(N)遍歷佇列求size
佇列空:head->next == NULL
佇列滿:size == N
出隊核心
ret = out;
  out = out->next;
  head->next = out;
入隊核心new_node表示新申請的結點
  new_node->next = in->next;
  in->next = in_node;
  ++size;
當然,連結串列結點的設定是自由的,連結串列結點本身可以內含陣列、連結串列、雜湊表等等,例如下面這樣,內含一個數組
這裡寫圖片描述

這時,可以增設兩個變數out_pos,in_pos。假設結點內陣列的大小為N_ ELEMS,整個連結串列結點的數量為node_nums

佇列長度:(nodes_nums-2)*N+N-out_pos+in_pos
佇列空:head->next == NULL
佇列滿:佇列長度 == N
出隊核心
out_pos == N_ELEMS;
     delete_node = out;
free(delete_node);
out = out->next;
out_pos = 0;
   head->next = out;
ret = out[out_pos++];
入隊核心,new_node表示新申請的核心
  in_pos == N_ELEMS;
new_node->next = in->next;
in = new_node;
in_pos = 0;
in[in_pos++] = value;

1.4、改進連結串列環形緩衝區

上面連結串列環形隊列出佇列可能釋放記憶體,入佇列可能申請記憶體,所以,可以用個空閒連結串列把該釋放的記憶體管理起來,入佇列時,如果要增加結點,先從空閒連結串列中取結點,取不到再去申請記憶體,這樣就可以避免多次分配釋放記憶體了,至於其他的操作都是一樣的。

上邊只是簡單的說了下入隊出隊等操作,事實上,緩衝區往往是和讀寫執行緒伴隨出現的,緩衝區中的每一個資源,對於同類執行緒可能需要互斥訪問,也可能可以共享使用,而不同類執行緒間(讀寫執行緒)往往需要做同步操作。比如,讀執行緒之間可能共享緩衝區的每一個資源,也可能互斥使用每個資源,通常,在緩衝區滿時寫執行緒不能寫,緩衝區空時讀執行緒不能讀,也就是讀寫執行緒要求同步。這其實就是作業系統課程上PV操作的幾個經典模式,如果讀讀之間、寫寫之間要求互斥使用資源,並且讀寫執行緒間不要求互斥,就是生產者消費者問題,如果讀讀之間不要求互斥(每個資源可供多個讀執行緒共同使用),寫寫之間要求互斥(每個資源僅供一個寫執行緒使用),並且讀寫執行緒也要求互斥(讀的時候不能寫,寫的時候不能讀),就是讀寫者問題。

下面會以生產者消費者模式和1.2節改進版的迴圈緩衝區為例,來說說併發迴圈佇列有鎖實現,下一篇說無鎖實現。關於讀寫者的問題,以後有時間再詳談。

2、生產者消費者

先提一下生產者消費者的優點吧
併發,若緩衝區中資料處理方式一致,可以開多個執行緒或程序處理資料或生產資料。
非同步,生產者無需乾等著消費者消費資料,消費者也無需乾等著生產者生產資料,只需根據緩衝區的狀態做出相應反應,如果結合io多用複用技術,也就是所謂的反應器模式,可以設計很好的非同步通訊架構,像zeromq底層的執行緒通訊就是使用這種方案來做的。
解耦,解耦可以說是一個附帶作用,由於生產者和消費者無直接關聯,也就是生產者中不會去呼叫任何消費者的方法或者反過來,所以任何一方的變動不影響另一方。
緩衝,主要是保持各自的效能,比如生產者很快,那沒關係,消費者雖然消費不過來,但可以把資料放緩衝區裡。
現在正式開工,根據生產者和消費者的數量,可以把生產者消費者劃分為四種類型,1:1,1:N,M:1,M:N。

然後再做個規定,規定環形緩衝區的大小為M,M為2的冪次方,in、out統一稱為slot。

2.1、單生產者單消費者

一個生產者,一個消費者,緩衝區可用資源數為M。

這種情況只要同步生產者和消費者,同步的方法是用兩個訊號量available_in_slots,available_out_slots分別表示生產者有多個可用資源、消費者有多個可用資源,每生產一個產品,生產者可用資源減1,消費者可用資源加1,這點可用PV操作來實現,用P操作可以消耗1個資源,P操作結束資源數減1,V操作可以生產1個資源,V操作結束後資源數加1。初始時,available_ in_ slots=M,表示生產者有M個空間可放產品,available_ out_slots=0,表示消費者還沒有可用資源:

available_in_slots = M;
available_out_slots  = 0;

in=out=0;

void producer()
{
    while(true){
        P(available_in_slots);
        queue[(in++)&(M-1)] = data;
        V(available_out_slots)
    }
}

void consumer()
{
    while(true){
        P(available_out_slots);
        queue[(out++)&(M-1)] = data;
        V(available_in_slots)
    }
}

2.2、單生產者多消費者

一個生產者,多個消費者,緩衝區可用資源數位M。

這種情況下,消費者有多個,消費者之間對out slot要互斥訪問,用out_ slot_ mutex來實現消費者間的互斥,拿到out_ slot_ mutex的消費者執行緒才得以繼續執行,沒拿到的只能阻塞。生產者消費者要同步,用available_ in_ slots,available_ out_slots來實現生產者消費者的同步。

available_writes_slots = M;
available_read_slots   = 0;

out_mutex = 1;

in=out=0;

void producer()
{
    while(true){
        P(available_writes_slots);
        queue[(in++)&(M-1)] = data;
        V(available_read_slots)
    }
}

void consumer()
{
    while(true){
        P(available_read_slots);
        P(out_mutex);
        queue[(out++)&(M-1)] = data;
        V(out_mutex);
        V(available_writes_slots)
    }
}

2.3、多生產者單消費者

這種情況與2.2是一致的,所用方法也一樣

多個生產者,生產者之間對in slot要互斥訪問,用in_slot_mutex來實現生產者間的互斥,拿到in_slot_mutex的生產者執行緒才得以繼續執行,沒拿到的只能阻塞。生產者消費者要同步,用available_in_slots,available_out_slots來實現生產者消費者的同步。

available_in_slots  = M;
available_out_slots = 0;

in_slot_mutex = 1;

in=out=0;

void producer()
{
    while(true){
        P(available_in_slots);
        P(in_slot_mutex);
        queue[(in++)&(M-1)] = data;
        V(in_slot_mutex);
        V(available_out_slots)
    }
}

void consumer()
{
    while(true){
        P(available_out_slots);
        queue[(out++)&(M-1)] = data;
        V(available_in_slots)
    }
}

2.4、多生產者多消費者

多個生產者,多個消費者,緩衝區可用資源數位M。

多個生產者,所以對in slot要互斥訪問,用in_slot_mutex來實現生產者間的互斥;多個消費者,所以對out slot也要互斥訪問,用out_slot_mutex來實現消費者間的互斥;生產者消費間的同步用available_ in_ slots,available_ out_slots來實現

available_in_slots  = M;
available_out_slots = 0;

in_slot_mutex  = 1;
out_slot_mutex = 1

in=out=0;

void producer()
{
    while(true){
        P(available_in_slots);
        P(in_slot_mutex);
        queue[(in++)&(M-1)] = data;
        V(in_slot_mutex);
        V(available_out_slots)
    }
}

void consumer()
{
    while(true){
        P(available_out_slots);
        P(out_slot_mutex);
        queue[(out++)&(M-1)] = data;
        P(out_slot_mutex);
        V(available_in_slots)
    }
}

以上就是以生產者消費者為使用場景的併發有鎖環形佇列的實現演算法。可以看到鎖機制確實很好用,但是鎖機制有個很大的問題,如果由於某些原因擁有鎖的一方掛掉了,可能導致死鎖,所以這種方法存在一定隱患的,另一方面它導致作業系統暫停當前的任務或使其進入睡眠狀態(等待,不佔用任何的處理器),直到資源(例如互斥鎖)可用,被阻塞的任務才可以解除阻塞狀態(喚醒)。在一個負載較重的應用程式中使用這樣的阻塞佇列來線上程之間傳遞訊息會導致嚴重的爭用問題。也就是說,任務將大量的時間(睡眠,等待,喚醒)浪費在獲得保護佇列資料的互斥鎖,而不是處理佇列中的資料上。