深入理解Golang之channel
前言
Golang在併發程式設計上有兩大利器,分別是channel
和goroutine
,這篇文章我們先聊聊channel
。熟悉Golang的人都知道一句名言:“使用通訊來共享記憶體,而不是通過共享記憶體來通訊”。這句話有兩層意思,Go語言確實在sync
包中提供了傳統的鎖機制,但更推薦使用channel
來解決併發問題。這篇文章會先從channel
的用法、channel
的原理兩部分對channel
做一個較為深入的探究。
channel用法
什麼是channel
從字面上看,channel
的意思大概就是管道的意思。channel
是一種go協程用以接收或傳送訊息的安全的訊息佇列,channel
channel
的用法很簡單:
func main() {
ch := make(chan int, 1) // 建立一個型別為int,緩衝區大小為1的channel
ch <- 2 // 將2傳送到ch
n, ok := <- ch // n接收從ch發出的值
if ok {
fmt.Println(n) // 2
}
close(ch) // 關閉channel
}
複製程式碼
使用channel
時有幾個注意點:
- 向一個
nil
channel
傳送訊息,會一直阻塞; - 向一個已經關閉的
channel
(panic)
; -
channel
關閉後不可以繼續向channel
傳送訊息,但可以繼續從channel
接收訊息; - 當
channel
關閉並且緩衝區為空時,繼續從從channel
接收訊息會得到一個對應型別的零值。
Unbuffered channels與Buffered channels
Unbuffered channels
是指緩衝區大小為0的channel
,這種channel
的接收者會阻塞直至接收到訊息,傳送者會阻塞直至接收者接收到訊息,這種機制可以用於兩個goroutine
進行狀態同步;Buffered channels
擁有緩衝區,當緩衝區已滿時,傳送者會阻塞;當緩衝區為空時,接收者會阻塞。
引用The Nature Of Channels In Go中的兩張圖來說明Unbuffered channels
與Buffered channels
, 非常形象,讀者可自行體會一下:
Unbuffered channels
:
Buffered channels
:
channel的遍歷
for range
channel
支援 for range
的方式進行遍歷:
package main
import "fmt"
main() {
ci := 5)
for i := 1; i <= 5; i++ {
ci <- i
}
close(ci)
for i := range ci {
fmt.Println(i)
}
}
複製程式碼
值得注意的是,在遍歷時,如果channel
沒有關閉,那麼會一直等待下去,出現 deadlock
的錯誤;如果在遍歷時channel
已經關閉,那麼在遍歷完資料後自動退出遍歷。也就是說,for range
的遍歷方式時阻塞型的遍歷方式。
for select
select
可以處理非阻塞式訊息傳送、接收及多路選擇。
2)
2; i++ {
ci <- i
}
close(ci)
cs := string,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">2)
cs <- "hi"
cs <- "golang"
close(cs)
ciClosed, csClosed := false,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-literal">false
for {
if ciClosed && csClosed {
return
}
select {
case i, ok := <-ci:
if ok {
fmt.Println(i)
} else {
ciClosed = true
fmt.Println("ci closed")
}
case s, ok := <-cs:
if ok {
fmt.Println(s)
} else {
csClosed = "cs closed")
}
default:
fmt.Println("waiting...")
}
}
}
複製程式碼
select
中有case
程式碼塊,用於channel
傳送或接收訊息,任意一個case
程式碼塊準備好時,執行其對應內容;多個case
程式碼塊準備好時,隨機選擇一個case
程式碼塊並執行;所有case
程式碼塊都沒有準備好,則等待;還可以有一個default
程式碼塊,所有case
程式碼塊都沒有準備好時執行default
程式碼塊。
channel原理
先貼一下channel
的原始碼地址,讀者可以對照來看。
資料結構
先看channel
的結構體:
type hchan struct {
qcount uint // total data in the queue
dataqsiz // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
// channel中元素大小
elemsize uint16
// 是否已關閉
closed uint32
// channel中元素型別
elemtype *_type // element type
sendx uint // send index
recvx // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
複製程式碼
channel
的緩衝區其實是一個環形佇列,qcount
表示佇列中元素的數量,dataqsiz
表示環形佇列的總大小,buf
表示一個指向迴圈陣列的指標;sendx
和recvx
分別用來標識當前傳送和接收的元素在迴圈佇列中的位置;recvq
和sendq
都是一個列表,分別用於儲存當前處於等待接收和等待傳送的Goroutine
。
再看一下waitq
的資料結構:
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
// 當前goroutine
g *g
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
複製程式碼
其中sudog
表示處於等待列表中的Goroutine
封裝,包含了一些上下文資訊,first
和last
分別指向等待列表的首位的Goroutine
。
編譯分析
在分析channel
的原理之前,我們先使用go tool
分析以下程式碼,看看channel
的各種操作在底層呼叫了什麼執行時方法:
ch := 2)
ch <- 2
ch <- 1
<-ch
n, ok := <-ch
if ok {
fmt.Println(n)
}
close(ch)
複製程式碼
編譯
go build test.go
go tool objdump -s "main\.main" test | grep CALL
複製程式碼
把CALL
過濾出來:
test.go:118 0x1092f55 e81612f7ff CALL runtime.makechan(SB)
test.go:119 0x1092f74 e82714f7ff CALL runtime.chansend1(SB)
test.go:120 0x1092f8e e80d14f7ff CALL runtime.chansend1(SB)
test.go:121 0x1092fa5 e8361ff7ff CALL runtime.chanrecv1(SB)
test.go:122 0x1092fbd e85e1ff7ff CALL runtime.chanrecv2(SB)
test.go:126 0x1092fd7 e8841cf7ff CALL runtime.closechan(SB)
test.go:124 0x1092fea e8b156f7ff CALL runtime.convT64(SB)
print.go:275 0x1093041 e88a98ffff CALL fmt.Fprintln(SB)
test.go:47 0x1093055 e896c1fbff CALL runtime.morestack_noctxt(SB)
複製程式碼
建立
從上面的編譯分析可以看出在建立channel
時呼叫了執行時方法makechan
:
makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 計算緩衝區需要的總大小(緩衝區大小*元素大小),並判斷是否超出最大可分配範圍
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// 緩衝區大小為0,或者channel中元素大小為0(struct{}{})時,只需分配channel必需的空間即可
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-literal">nil,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-literal">true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != // 通過位運算知道channel中元素型別不是指標,分配一片連續記憶體空間,所需空間等於 緩衝區陣列空間 + hchan必需的空間。
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-literal">true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素中包含指標,為hchan和緩衝區分別分配空間
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-literal">true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size,68); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-string">"; elemalg=", elem.alg,68); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-string">"; dataqsiz=", size,68); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-string">"\n")
}
return c
}
複製程式碼
makechan
的程式碼邏輯還是比較簡單的,首先校驗元素型別和緩衝區空間大小,然後建立hchan
,分配所需空間。這裡有三種情況:當緩衝區大小為0,或者channel
中元素大小為0時,只需分配channel
必需的空間即可;當channel
元素型別不是指標時,則只需要為hchan
和緩衝區分配一片連續記憶體空間,空間大小為緩衝區陣列空間加上hchan
必需的空間;預設情況,緩衝區包含指標,則需要為hchan
和緩衝區分別分配記憶體。最後更新hchan
的其他欄位,包括elemsize
,elemtype
,dataqsiz
。
傳送
channel
的傳送操作呼叫了執行時方法chansend1
,在chansend1
內部又呼叫了chansend
,直接來看chansend
的實現:
chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// channel為nil
if c == nil {
// 如果是非阻塞,直接返回傳送不成功
if !block {
return false
}
// 否則,當前Goroutine阻塞掛起
gopark(2)
throw("unreachable")
}
"chansend: chan=",68); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-string">"\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
// 對於非阻塞且channel未關閉,如果無緩衝區且沒有等待接收的Goroutine,或者有緩衝區且緩衝區已滿,那麼都直接返回傳送不成功
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加鎖
lock(&c.lock)
// 如果channel已關閉
if c.closed != 0 {
// 解鎖,直接panic
unlock(&c.lock)
"send on closed channel"))
}
// 除了以上情況,當channel未關閉時,就有以下幾種情況:
// 1、當存在等待接收的Goroutine
if sg := c.recvq.dequeue(); sg != // Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
// 那麼直接把正在傳送的值傳送給等待接收的Goroutine
send(c, sg, ep, func() { unlock(&c.lock) },128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">3)
true
}
// 2、當緩衝區未滿時
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// 獲取指向緩衝區陣列中位於sendx位置的元素的指標
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 將當前傳送的值拷貝到緩衝區
typedmemmove(c.elemtype, qp, ep)
// sendx索引加一
c.sendx++
// 因為是迴圈佇列,sendx等於佇列長度時置為0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 佇列中元素總數加一,並解鎖,返回傳送成功
c.qcount++
unlock(&c.lock)
// 3、當既沒有等待接收的Goroutine,緩衝區也沒有剩餘空間,如果是非阻塞的傳送,那麼直接解鎖,返回傳送失敗
if !block {
unlock(&c.lock)
false
}
// Block on the channel. Some receiver will complete our operation for us.
// 4、如果是阻塞傳送,那麼就將當前的Goroutine打包成一個sudog結構體,並加入到channel的傳送佇列sendq裡
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// 呼叫goparkunlock將當前Goroutine設定為等待狀態並解鎖,進入休眠等待被喚醒
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
// 被喚醒之後執行清理工作並釋放sudog結構體
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
"send on closed channel"))
}
gp.param = if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">2)
}
mysg.c = nil
releaseSudog(mysg)
true
}
複製程式碼
chansend
的執行邏輯,上面的註釋已經寫得很清楚了,我們再來梳理一下。對於非阻塞傳送或者channel
已經關閉條件下的幾種傳送失敗的情況,處理邏輯比較簡單,讀者可以對照註釋來看;這裡我們重點關注channel
未關閉時幾種常規情況:
存在等待接收的Goroutine
如果等待接收的佇列recvq
中存在Goroutine
,那麼直接把正在傳送的值傳送給等待接收的Goroutine
。示意圖如下:
具體看一下
send
方法:
send(c *hchan, sg *sudog, unlockf func(), skip int) {
...
if sg.elem != // 將傳送的值直接拷貝到接收值(比如v = <-ch 中的v)的記憶體地址
sendDirect(c.elemtype, ep)
sg.elem = nil
}
// 獲取等待接收資料的Goroutine
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 喚醒之前等待接收資料的Goroutine
goready(gp, skip+1)
}
複製程式碼
這裡有必要說明一下Goroutine
在排程過程中的幾種狀態:
_Gidle = iota // goroutine剛剛分配,還沒有初始化
_Grunnable // goroutine處於執行佇列中, 還沒有執行,沒有自己的棧
_Grunning // goroutine在執行中,擁有自己的棧,被分配了M(執行緒)和P(排程上下文)
_Gsyscall // goroutine在執行系統呼叫
_Gwaiting // goroutine被阻塞
_Gdead // goroutine沒有被使用,可能是剛剛退出,或者正在初始化中
_Gcopystack // 表示g當前的棧正在被移除並分配新棧
複製程式碼
當呼叫goready
時,將Goroutine
的狀態從 _Gwaiting
置為_Grunnable
,等待下一次排程再次執行。
當緩衝區未滿時
當緩衝區未滿時,找到sendx
所指向的緩衝區陣列的位置,將正在傳送的值拷貝到該位置,並增加sendx
索引以及釋放鎖,示意圖如下:
阻塞傳送
如果是阻塞傳送,那麼就將當前的Goroutine
打包成一個sudog
結構體,並加入到channel
的傳送佇列sendq
裡。示意圖如下:
之後則呼叫goparkunlock
將當前Goroutine
設定為_Gwaiting
狀態並解鎖,進入阻塞狀態等待被喚醒(呼叫goready
);如果被排程器喚醒,執行清理工作並最終釋放對應的sudog
結構體。
接收
channel
的接收有兩種形式:
<-ch
n, ok := <-ch
複製程式碼
這兩種方式分別呼叫執行時方法chanrecv1
和chanrecv2
:
chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-literal">true)
}
chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-literal">true)
return
}
複製程式碼
這兩個方法最終都會呼叫chanrecv
方法:
chanrecv(c *hchan,51); font-weight: bold; word-wrap: inherit !important; word-break: inherit !important;" class="hljs-keyword">bool) (selected, received bool) {
"chanrecv: chan=",68); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-string">"\n")
}
// 非阻塞直接返回(false, false)
return
}
// 阻塞接收,則當前Goroutine阻塞掛起
gopark("unreachable")
}
// 非阻塞模式,對於以下兩種情況:
// 1、無緩衝區且等待傳送佇列也為空
// 2、有緩衝區但緩衝區陣列為空且channel未關閉
// 這兩種情況都是接收失敗, 直接返回(false, false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
// 加鎖
lock(&c.lock)
// 如果channel已關閉,並且緩衝區無元素
0 && c.qcount == if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
// 有等待接收的變數(即 v = <-ch中的v)
if ep != nil {
//根據channel元素的型別清理ep對應地址的記憶體,即ep接收了channel元素型別的零值
typedmemclr(c.elemtype, ep)
}
// 返回(true, false),即接收到值,但不是從channel中接收的有效值
true,136); font-style: italic; word-wrap: inherit !important; word-break: inherit !important;" class="hljs-comment">// 除了以上非常規情況,還有有以下幾種常見情況:
// 1、等待傳送的佇列sendq裡存在Goroutine,那麼有兩種情況:當前channel無緩衝區,或者當前channel已滿
if sg := c.sendq.dequeue(); sg != // Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// 如果無緩衝區,那麼直接從sender接收資料;否則,從buf佇列的頭部接收資料,並把sender的資料加到buf佇列的尾部
recv(c,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">3)
// 接收成功
// 2、緩衝區buf中有元素
if c.qcount > // Receive directly from queue
// 從recvx指向的位置獲取元素
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 將從buf中取出的元素拷貝到當前協程
typedmemmove(c.elemtype, qp)
}
// 同時將取出的資料所在的記憶體清空
typedmemclr(c.elemtype, qp)
// 接收索引 +1
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = // buf元素總數 -1
c.qcount--
// 解鎖,返回接收成功
unlock(&c.lock)
// 3、非阻塞模式,且沒有資料可以接受
if !block {
// 解鎖,直接返回接收失敗
unlock(&c.lock)
// no sender available: block on this channel.
// 4、阻塞模式,獲取當前Goroutine,打包一個sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 加入到channel的等待接收佇列recvq中
c.recvq.enqueue(mysg)
// 掛起當前Goroutine,設定為_Gwaiting狀態並解鎖,進入休眠等待被喚醒
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">3)
// someone woke us up
2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = 複製程式碼
chanrecv
方法的處理邏輯與chansend
非常類似,我們這裡仍然只分析幾種常見情況,其他情況上述註釋也解釋得比較清楚了,讀者可對照相應程式碼和註釋檢視。
存在等待傳送的Goroutine
如果等待傳送的佇列sendq
裡存在掛起的Goroutine
,那麼有兩種情況:當前channel
無緩衝區,或者當前channel
已滿。從sendq
中取出最先阻塞的Goroutine
,然後呼叫recv
方法:
recv(c *hchan,0); font-weight: bold; word-wrap: inherit !important; word-break: inherit !important;" class="hljs-title">int) {
if c.dataqsiz == // 無緩衝區
if raceenabled {
racesync(c, sg)
}
// copy data from sender
recvDirect(c.elemtype, ep)
}
} else {
// 緩衝區已滿
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c,51); font-weight: bold; word-wrap: inherit !important; word-break: inherit !important;" class="hljs-keyword">if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g,136); font-style: italic; word-wrap: inherit !important; word-break: inherit !important;" class="hljs-comment">// copy data from queue to receiver
nil {
typedmemmove(c.elemtype,136); font-style: italic; word-wrap: inherit !important; word-break: inherit !important;" class="hljs-comment">// copy data from sender to queue
typedmemmove(c.elemtype, sg.elem)
c.recvx++
0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
// 將等待傳送資料的Goroutine的狀態從_Gwaiting置為 _Grunnable,等待下一次排程。
goready(gp,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">1)
}
複製程式碼
1、如果無緩衝區,那麼直接從sender
接收資料;
2、如果緩衝區已滿,從buf
佇列的頭部接收資料,並把sender
的資料加到buf佇列的尾部;
3、最後呼叫goready
函式將等待傳送資料的Goroutine
的狀態從_Grunnable
,等待下一次排程。
下圖示意了當緩衝區已滿時的處理過程:
緩衝區buf中還有資料
如果緩衝區buf
中還有元素,那麼就走正常的接收,將從buf
中取出的元素拷貝到當前協程的接收資料目標記憶體地址中。值得注意的是,即使此時channel
已經關閉,仍然可以正常地從緩衝區buf
中接收資料。這種情況比較簡單,示意圖就不畫了。
阻塞接收
如果是阻塞模式,且當前沒有資料可以接收,那麼就需要將當前sudog
加入到channel
的等待接收佇列recvq
中,將當前Goroutine
的狀態置為_Gwaiting
,等待喚醒。示意圖如下:
如果之後當前Goroutine
被排程器喚醒,則執行清理工作並最終釋放對應的sudog
結構體。
關閉
說完收發資料,最後就是關閉channel
了:
closechan(c *hchan) {
// nil channel檢查
nil {
"close of nil channel"))
}
lock(&c.lock)
// 已關閉的channel不能再次關閉
0 {
unlock(&c.lock)
"close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), funcPC(closechan))
racerelease(c.raceaddr())
}
// 設定關閉狀態為1
c.closed = 1
var glist glist
// release all readers
// 遍歷recvq,清除sudog的資料,取出其中處於_Gwaiting狀態的Goroutine加入到glist中
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
// 遍歷sendq,清除sudog的資料,取出其中處於_Gwaiting狀態的Goroutine加入到glist中
for {
sg := c.sendq.dequeue()
break
}
sg.elem = // Ready all Gs now that we've dropped the channel lock.
將glist中所有Goroutine的狀態置為_Grunnable,等待排程器進行排程
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp,128); word-wrap: inherit !important; word-break: inherit !important;" class="hljs-number">3)
}
}
複製程式碼
1、關閉channel
時,會遍歷sendq
(實際只有recvq
或者sendq
),取出sudog
中掛起的Goroutine
加入到glist
列表中,並清除sudog
上的一些資訊和狀態。
2、然後遍歷glist
列表,為每個Goroutine
呼叫goready
函式,將所有Goroutine
置為_Grunnable
狀態,等待排程。
3、當Goroutine
被喚醒之後,會繼續執行chansend
和chanrecv
函式中當前Goroutine
被喚醒後的剩餘邏輯。
總結
總結一下,本文先通過channel
的基本用法對channel
的定義、用法細節進行了介紹,然後對channel
的基本操作包括髮送、接收和關閉進行了較為詳細和深入的探究。細心的讀者應該也會發現channel
的操作跟協程的排程關係密切,不過這篇文章關於goroutine
的排程只是一筆帶過,後續時機成熟會對這部分內容再作探究。