1. 程式人生 > 程式設計 >深入理解Golang之channel

深入理解Golang之channel

前言

Golang在併發程式設計上有兩大利器,分別是channelgoroutine,這篇文章我們先聊聊channel。熟悉Golang的人都知道一句名言:“使用通訊來共享記憶體,而不是通過共享記憶體來通訊”。這句話有兩層意思,Go語言確實在sync包中提供了傳統的鎖機制,但更推薦使用channel來解決併發問題。這篇文章會先從channel的用法、channel的原理兩部分對channel做一個較為深入的探究。

channel用法

什麼是channel

從字面上看,channel的意思大概就是管道的意思。channel是一種go協程用以接收或傳送訊息的安全的訊息佇列,channel

就像兩個go協程之間的導管,來實現各種資源的同步。可以用下圖示意:

channel的用法很簡單:

func main() {
    ch := make(chan int1// 建立一個型別為int,緩衝區大小為1的channel
    ch <- 2 // 將2傳送到ch
    n, ok := <- ch // n接收從ch發出的值
    if ok {
        fmt.Println(n) // 2
    }
    close(ch) // 關閉channel
}
複製程式碼

使用channel時有幾個注意點:

  • 向一個nil channel傳送訊息,會一直阻塞;
  • 向一個已經關閉的channel
    傳送訊息,會引發執行時恐慌(panic)
  • channel關閉後不可以繼續向channel傳送訊息,但可以繼續從channel接收訊息;
  • channel關閉並且緩衝區為空時,繼續從從channel接收訊息會得到一個對應型別的零值。

Unbuffered channels與Buffered channels

Unbuffered channels是指緩衝區大小為0的channel,這種channel的接收者會阻塞直至接收到訊息,傳送者會阻塞直至接收者接收到訊息,這種機制可以用於兩個goroutine進行狀態同步;Buffered channels擁有緩衝區,當緩衝區已滿時,傳送者會阻塞;當緩衝區為空時,接收者會阻塞。

引用The Nature Of Channels In Go中的兩張圖來說明Unbuffered channelsBuffered channels, 非常形象,讀者可自行體會一下:

Unbuffered channels

Unbuffered channels
Unbuffered channels

Buffered channels

Buffered channels
Buffered channels

channel的遍歷

for range

channel支援 for range 的方式進行遍歷:

package main  

import "fmt"  

main() {  
    ci := 5)  
    for i := 1; i <= 5; i++ {
        ci <- i
    }    
    close(ci)  

    for i := range ci {  
        fmt.Println(i)  
    }  
}  
複製程式碼

值得注意的是,在遍歷時,如果channel 沒有關閉,那麼會一直等待下去,出現 deadlock 的錯誤;如果在遍歷時channel已經關閉,那麼在遍歷完資料後自動退出遍歷。也就是說,for range 的遍歷方式時阻塞型的遍歷方式。

for select

select可以處理非阻塞式訊息傳送、接收及多路選擇。

2)
    2; i++ {
        ci <- i
    }
    close(ci)

    cs := string,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">2)
    cs <- "hi"
    cs <- "golang"
    close(cs)

    ciClosed, csClosed := false,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-literal">false
    for {
        if ciClosed && csClosed {
            return
        }
        select {
        case i, ok := <-ci:
            if ok {
                fmt.Println(i)
            } else {
                ciClosed = true
                fmt.Println("ci closed")
            }
        case s, ok := <-cs:
            if ok {
                fmt.Println(s)
            } else {
                csClosed = "cs closed")
            }
        default:
            fmt.Println("waiting...")
        }
    }
}  
複製程式碼

select中有case程式碼塊,用於channel傳送或接收訊息,任意一個case程式碼塊準備好時,執行其對應內容;多個case程式碼塊準備好時,隨機選擇一個case程式碼塊並執行;所有case程式碼塊都沒有準備好,則等待;還可以有一個default程式碼塊,所有case程式碼塊都沒有準備好時執行default程式碼塊。

channel原理

先貼一下channel原始碼地址,讀者可以對照來看。

資料結構

先看channel的結構體:

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    // channel中元素大小
    elemsize uint16 
    // 是否已關閉
    closed   uint32
    // channel中元素型別
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}
複製程式碼

channel的緩衝區其實是一個環形佇列,qcount表示佇列中元素的數量,dataqsiz表示環形佇列的總大小,buf表示一個指向迴圈陣列的指標;sendxrecvx分別用來標識當前傳送和接收的元素在迴圈佇列中的位置;recvqsendq都是一個列表,分別用於儲存當前處於等待接收和等待傳送的Goroutine

再看一下waitq的資料結構:

type waitq struct {
    first *sudog
    last  *sudog
}

type sudog struct {
    // 當前goroutine
    g *g

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool
    next     *sudog
    prev     *sudog
    elem     unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32
    parent      *sudog // semaRoot binary tree
    waitlink    *sudog // g.waiting list or semaRoot
    waittail    *sudog // semaRoot
    c           *hchan // channel
}
複製程式碼

其中sudog表示處於等待列表中的Goroutine封裝,包含了一些上下文資訊,firstlast分別指向等待列表的首位的Goroutine

編譯分析

在分析channel的原理之前,我們先使用go tool分析以下程式碼,看看channel的各種操作在底層呼叫了什麼執行時方法:

ch := 2)
ch <- 2
ch <- 1
<-ch
n, ok := <-ch
if ok {
    fmt.Println(n)
}
close(ch)
複製程式碼

編譯

go build test.go
go tool objdump -s "main\.main" test | grep CALL
複製程式碼

CALL過濾出來:

  test.go:118           0x1092f55               e81612f7ff              CALL runtime.makechan(SB)
  test.go:119           0x1092f74               e82714f7ff              CALL runtime.chansend1(SB)
  test.go:120           0x1092f8e               e80d14f7ff              CALL runtime.chansend1(SB)
  test.go:121           0x1092fa5               e8361ff7ff              CALL runtime.chanrecv1(SB)
  test.go:122           0x1092fbd               e85e1ff7ff              CALL runtime.chanrecv2(SB)
  test.go:126           0x1092fd7               e8841cf7ff              CALL runtime.closechan(SB)
  test.go:124           0x1092fea               e8b156f7ff              CALL runtime.convT64(SB)
  print.go:275          0x1093041               e88a98ffff              CALL fmt.Fprintln(SB)
  test.go:47            0x1093055               e896c1fbff              CALL runtime.morestack_noctxt(SB)
複製程式碼

建立

從上面的編譯分析可以看出在建立channel時呼叫了執行時方法makechan:

makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    // 計算緩衝區需要的總大小(緩衝區大小*元素大小),並判斷是否超出最大可分配範圍
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:
        // 緩衝區大小為0,或者channel中元素大小為0(struct{}{})時,只需分配channel必需的空間即可
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-literal">nil,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-literal">true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.kind&kindNoPointers != // 通過位運算知道channel中元素型別不是指標,分配一片連續記憶體空間,所需空間等於 緩衝區陣列空間 + hchan必需的空間。
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-literal">true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // 元素中包含指標,為hchan和緩衝區分別分配空間
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-literal">true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size,68); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-string">"; elemalg=", elem.alg,68); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-string">"; dataqsiz=", size,68); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-string">"\n")
    }
    return c
}
複製程式碼

makechan的程式碼邏輯還是比較簡單的,首先校驗元素型別和緩衝區空間大小,然後建立hchan,分配所需空間。這裡有三種情況:當緩衝區大小為0,或者channel中元素大小為0時,只需分配channel必需的空間即可;當channel元素型別不是指標時,則只需要為hchan和緩衝區分配一片連續記憶體空間,空間大小為緩衝區陣列空間加上hchan必需的空間;預設情況,緩衝區包含指標,則需要為hchan和緩衝區分別分配記憶體。最後更新hchan的其他欄位,包括elemsizeelemtypedataqsiz

傳送

channel的傳送操作呼叫了執行時方法chansend1,在
chansend1內部又呼叫了chansend,直接來看chansend的實現:

chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // channel為nil
    if c == nil {
        // 如果是非阻塞,直接返回傳送不成功
        if !block {
            return false
        }
        // 否則,當前Goroutine阻塞掛起
        gopark(2)
        throw("unreachable")
    }

    "chansend: chan=",68); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-string">"\n")
    }

    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.

    // 對於非阻塞且channel未關閉,如果無緩衝區且沒有等待接收的Goroutine,或者有緩衝區且緩衝區已滿,那麼都直接返回傳送不成功
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加鎖
    lock(&c.lock)

    // 如果channel已關閉
    if c.closed != 0 {
        // 解鎖,直接panic
        unlock(&c.lock)
        "send on closed channel"))
    }

    // 除了以上情況,當channel未關閉時,就有以下幾種情況:

    // 1、當存在等待接收的Goroutine
    if sg := c.recvq.dequeue(); sg != // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).

        // 那麼直接把正在傳送的值傳送給等待接收的Goroutine
        send(c, sg, ep, func() { unlock(&c.lock) },128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">3)
        true
    }

    // 2、當緩衝區未滿時
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        // 獲取指向緩衝區陣列中位於sendx位置的元素的指標
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        // 將當前傳送的值拷貝到緩衝區
        typedmemmove(c.elemtype, qp, ep)
        // sendx索引加一
        c.sendx++
        // 因為是迴圈佇列,sendx等於佇列長度時置為0
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // 佇列中元素總數加一,並解鎖,返回傳送成功
        c.qcount++
        unlock(&c.lock)
        // 3、當既沒有等待接收的Goroutine,緩衝區也沒有剩餘空間,如果是非阻塞的傳送,那麼直接解鎖,返回傳送失敗
    if !block {
        unlock(&c.lock)
        false
    }

    // Block on the channel. Some receiver will complete our operation for us.
    // 4、如果是阻塞傳送,那麼就將當前的Goroutine打包成一個sudog結構體,並加入到channel的傳送佇列sendq裡
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)

    // 呼叫goparkunlock將當前Goroutine設定為等待狀態並解鎖,進入休眠等待被喚醒
    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">3)
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
    KeepAlive(ep)

    // someone woke us up.
    // 被喚醒之後執行清理工作並釋放sudog結構體
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        "send on closed channel"))
    }
    gp.param = if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    true
}
複製程式碼

chansend的執行邏輯,上面的註釋已經寫得很清楚了,我們再來梳理一下。對於非阻塞傳送或者channel已經關閉條件下的幾種傳送失敗的情況,處理邏輯比較簡單,讀者可以對照註釋來看;這裡我們重點關注channel未關閉時幾種常規情況:

存在等待接收的Goroutine

如果等待接收的佇列recvq中存在Goroutine,那麼直接把正在傳送的值傳送給等待接收的Goroutine。示意圖如下:


具體看一下send方法:

send(c *hchan, sg *sudog, unlockf func()skip int) {
    ...

    if sg.elem != // 將傳送的值直接拷貝到接收值(比如v = <-ch 中的v)的記憶體地址
        sendDirect(c.elemtype, ep)
        sg.elem = nil
    }
    // 獲取等待接收資料的Goroutine
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 喚醒之前等待接收資料的Goroutine
    goready(gp, skip+1)
}
複製程式碼

這裡有必要說明一下Goroutine在排程過程中的幾種狀態:

_Gidle = iota // goroutine剛剛分配,還沒有初始化

_Grunnable // goroutine處於執行佇列中, 還沒有執行,沒有自己的棧

_Grunning // goroutine在執行中,擁有自己的棧,被分配了M(執行緒)和P(排程上下文)

_Gsyscall // goroutine在執行系統呼叫

_Gwaiting // goroutine被阻塞

_Gdead // goroutine沒有被使用,可能是剛剛退出,或者正在初始化中

_Gcopystack // 表示g當前的棧正在被移除並分配新棧
複製程式碼

當呼叫goready時,將Goroutine的狀態從 _Gwaiting置為_Grunnable,等待下一次排程再次執行。

當緩衝區未滿時

當緩衝區未滿時,找到sendx所指向的緩衝區陣列的位置,將正在傳送的值拷貝到該位置,並增加sendx索引以及釋放鎖,示意圖如下:

阻塞傳送

如果是阻塞傳送,那麼就將當前的Goroutine打包成一個sudog結構體,並加入到channel的傳送佇列sendq裡。示意圖如下:

之後則呼叫goparkunlock將當前Goroutine設定為_Gwaiting狀態並解鎖,進入阻塞狀態等待被喚醒(呼叫goready);如果被排程器喚醒,執行清理工作並最終釋放對應的sudog結構體。

接收

channel的接收有兩種形式:

<-ch
n, ok := <-ch
複製程式碼

這兩種方式分別呼叫執行時方法chanrecv1chanrecv2:

chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-literal">true)
}

chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-literal">true)
    return
}
複製程式碼

這兩個方法最終都會呼叫chanrecv方法:

chanrecv(c *hchan,51); font-weight: bold; word-wrap: inherit !important; word-break: inherit !important;" class="hljs-keyword">bool) (selected, received bool) {

    "chanrecv: chan=",68); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-string">"\n")
    }

    // 非阻塞直接返回(false, false)
        return
        }
        // 阻塞接收,則當前Goroutine阻塞掛起
        gopark("unreachable")
    }

    // 非阻塞模式,對於以下兩種情況:
    // 1、無緩衝區且等待傳送佇列也為空
    // 2、有緩衝區但緩衝區陣列為空且channel未關閉
    // 這兩種情況都是接收失敗, 直接返回(false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    // 加鎖
    lock(&c.lock)
    // 如果channel已關閉,並且緩衝區無元素
    0 && c.qcount == if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        // 有等待接收的變數(即 v = <-ch中的v)
        if ep != nil {
            //根據channel元素的型別清理ep對應地址的記憶體,即ep接收了channel元素型別的零值
            typedmemclr(c.elemtype, ep)
        }
        // 返回(true, false),即接收到值,但不是從channel中接收的有效值
        true,136); font-style: italic; word-wrap: inherit !important; word-break: inherit !important;" class="hljs-comment">// 除了以上非常規情況,還有有以下幾種常見情況:

    // 1、等待傳送的佇列sendq裡存在Goroutine,那麼有兩種情況:當前channel無緩衝區,或者當前channel已滿
    if sg := c.sendq.dequeue(); sg != // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        // 如果無緩衝區,那麼直接從sender接收資料;否則,從buf佇列的頭部接收資料,並把sender的資料加到buf佇列的尾部
        recv(c,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">3)
        // 接收成功
        // 2、緩衝區buf中有元素
    if c.qcount > // Receive directly from queue
        // 從recvx指向的位置獲取元素
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        // 將從buf中取出的元素拷貝到當前協程
            typedmemmove(c.elemtype, qp)
        }
        // 同時將取出的資料所在的記憶體清空
        typedmemclr(c.elemtype, qp)
        // 接收索引 +1
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = // buf元素總數 -1
        c.qcount--
        // 解鎖,返回接收成功
        unlock(&c.lock)
        // 3、非阻塞模式,且沒有資料可以接受
    if !block {
        // 解鎖,直接返回接收失敗
        unlock(&c.lock)
        // no sender available: block on this channel.
    // 4、阻塞模式,獲取當前Goroutine,打包一個sudog
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    // 加入到channel的等待接收佇列recvq中
    c.recvq.enqueue(mysg)
    // 掛起當前Goroutine,設定為_Gwaiting狀態並解鎖,進入休眠等待被喚醒
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">3)

    // someone woke us up
    2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = 複製程式碼

chanrecv方法的處理邏輯與chansend非常類似,我們這裡仍然只分析幾種常見情況,其他情況上述註釋也解釋得比較清楚了,讀者可對照相應程式碼和註釋檢視。

存在等待傳送的Goroutine

如果等待傳送的佇列sendq裡存在掛起的Goroutine,那麼有兩種情況:當前channel無緩衝區,或者當前channel已滿。從sendq中取出最先阻塞的Goroutine,然後呼叫recv方法:

recv(c *hchan,0); font-weight: bold; word-wrap: inherit !important; word-break: inherit !important;" class="hljs-title">int) {
    if c.dataqsiz == // 無緩衝區
        if raceenabled {
            racesync(c, sg)
        }
        // copy data from sender
            recvDirect(c.elemtype, ep)
        }
    } else {
        // 緩衝區已滿
        // Queue is full. Take the item at the
        // head of the queue. Make the sender enqueue
        // its item at the tail of the queue. Since the
        // queue is full, those are both the same slot.
        qp := chanbuf(c,51); font-weight: bold; word-wrap: inherit !important; word-break: inherit !important;" class="hljs-keyword">if raceenabled {
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g,136); font-style: italic; word-wrap: inherit !important; word-break: inherit !important;" class="hljs-comment">// copy data from queue to receiver
        nil {
            typedmemmove(c.elemtype,136); font-style: italic; word-wrap: inherit !important; word-break: inherit !important;" class="hljs-comment">// copy data from sender to queue
        typedmemmove(c.elemtype, sg.elem)
        c.recvx++
        0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    // 將等待傳送資料的Goroutine的狀態從_Gwaiting置為 _Grunnable,等待下一次排程。
    goready(gp,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">1)
}
複製程式碼

1、如果無緩衝區,那麼直接從sender接收資料;
2、如果緩衝區已滿,從buf佇列的頭部接收資料,並把sender的資料加到buf佇列的尾部;
3、最後呼叫goready函式將等待傳送資料的Goroutine的狀態從_Grunnable,等待下一次排程。

下圖示意了當緩衝區已滿時的處理過程:

緩衝區buf中還有資料

如果緩衝區buf中還有元素,那麼就走正常的接收,將從buf中取出的元素拷貝到當前協程的接收資料目標記憶體地址中。值得注意的是,即使此時channel已經關閉,仍然可以正常地從緩衝區buf中接收資料。這種情況比較簡單,示意圖就不畫了。

阻塞接收

如果是阻塞模式,且當前沒有資料可以接收,那麼就需要將當前sudog加入到channel的等待接收佇列recvq中,將當前Goroutine的狀態置為_Gwaiting,等待喚醒。示意圖如下:

如果之後當前Goroutine被排程器喚醒,則執行清理工作並最終釋放對應的sudog結構體。

關閉

說完收發資料,最後就是關閉channel了:

closechan(c *hchan) {
    // nil channel檢查
    nil {
        "close of nil channel"))
    }

    lock(&c.lock)
    // 已關閉的channel不能再次關閉
    0 {
        unlock(&c.lock)
        "close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), funcPC(closechan))
        racerelease(c.raceaddr())
    }
    // 設定關閉狀態為1
    c.closed = 1

    var glist glist

    // release all readers
    // 遍歷recvq,清除sudog的資料,取出其中處於_Gwaiting狀態的Goroutine加入到glist中
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // release all writers (they will panic)
    // 遍歷sendq,清除sudog的資料,取出其中處於_Gwaiting狀態的Goroutine加入到glist中
    for {
        sg := c.sendq.dequeue()
        break
        }
        sg.elem = // Ready all Gs now that we've dropped the channel lock.
    將glist中所有Goroutine的狀態置為_Grunnable,等待排程器進行排程
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">3)
    }
}
複製程式碼

1、關閉channel時,會遍歷sendq(實際只有recvq或者sendq),取出sudog中掛起的Goroutine加入到glist列表中,並清除sudog上的一些資訊和狀態。

2、然後遍歷glist列表,為每個Goroutine呼叫goready函式,將所有Goroutine置為_Grunnable狀態,等待排程。

3、當Goroutine被喚醒之後,會繼續執行chansendchanrecv函式中當前Goroutine被喚醒後的剩餘邏輯。

總結

總結一下,本文先通過channel的基本用法對channel的定義、用法細節進行了介紹,然後對channel的基本操作包括髮送、接收和關閉進行了較為詳細和深入的探究。細心的讀者應該也會發現channel的操作跟協程的排程關係密切,不過這篇文章關於goroutine的排程只是一筆帶過,後續時機成熟會對這部分內容再作探究。

參考資料

1、The Nature Of Channels In Go
2、Concurrency in Golang