1. 程式人生 > 程式設計 >深度解密 Go 語言中的 sync.Pool

深度解密 Go 語言中的 sync.Pool

最近在工作中碰到了 GC 的問題:專案中大量重複地建立許多物件,造成 GC 的工作量巨大,CPU 頻繁掉底。準備使用 sync.Pool 來快取物件,減輕 GC 的消耗。為了用起來更順暢,我特地研究了一番,形成此文。本文從使用到原始碼解析,循序漸進,一一道來。

是什麼

sync.Pool 是 sync 包下的一個元件,可以作為儲存臨時取還物件的一個“池子”。個人覺得它的名字有一定的誤導性,因為 Pool 裡裝的物件可以被無通知地被回收,可能 sync.Cache 是一個更合適的名字。

有什麼用

對於很多需要重複分配、回收記憶體的地方,sync.Pool 是一個很好的選擇。頻繁地分配、回收記憶體會給 GC 帶來一定的負擔,嚴重的時候會引起 CPU 的毛刺,而 sync.Pool

可以將暫時不用的物件快取起來,待下次需要的時候直接使用,不用再次經過記憶體分配,複用物件的記憶體,減輕 GC 的壓力,提升系統的效能。

怎麼用

首先,sync.Pool 是協程安全的,這對於使用者來說是極其方便的。使用前,設定好物件的 New 函式,用於在 Pool 裡沒有快取的物件時,建立一個。之後,在程式的任何地方、任何時候僅通過 Get()Put() 方法就可以取、還物件了。

下面是 2018 年的時候,《Go 夜讀》上關於 sync.Pool 的分享,關於適用場景:

當多個 goroutine 都需要建立同⼀個物件的時候,如果 goroutine 數過多,導致物件的建立數⽬劇增,進⽽導致 GC 壓⼒增大。形成 “併發⼤-佔⽤記憶體⼤-GC 緩慢-處理併發能⼒降低-併發更⼤”這樣的惡性迴圈。

在這個時候,需要有⼀個物件池,每個 goroutine 不再⾃⼰單獨建立物件,⽽是從物件池中獲取出⼀個物件(如果池中已經有的話)。

因此關鍵思想就是物件的複用,避免重複建立、銷燬,下面我們來看看如何使用。

簡單的例子

首先來看一個簡單的例子:

package main
import (
	"fmt"
	"sync"
)

var pool *sync.Pool

type Person struct {
	Name string
}

func initPool() {
	pool = &sync.Pool {
		New: func()interface{} {
			fmt.Println("Creating a new Person")
			return new(Person)
		},}
}

func main() {
	initPool()

	p := pool.Get().(*Person)
	fmt.Println("首次從 pool 裡獲取:",p)

	p.Name = "first"
	fmt.Printf("設定 p.Name = %s\n",p.Name)

	pool.Put(p)

	fmt.Println("Pool 裡已有一個物件:&{first},呼叫 Get: ",pool.Get().(*Person))
	fmt.Println("Pool 沒有物件了,呼叫 Get: ",pool.Get().(*Person))
}

執行結果:

Creating a new Person
首次從 pool 裡獲取: &{}
設定 p.Name = first
Pool 裡已有一個物件:&{first},Get: &{first}
Creating a new Person
Pool 沒有物件了,Get: &{}

首先,需要初始化 Pool,唯一需要的就是設定好 New 函式。當呼叫 Get 方法時,如果池子裡快取了物件,就直接返回快取的物件。如果沒有存貨,則呼叫 New 函式建立一個新的物件。

另外,我們發現 Get 方法取出來的物件和上次 Put 進去的物件實際上是同一個,Pool 沒有做任何“清空”的處理。但我們不應當對此有任何假設,因為在實際的併發使用場景中,無法保證這種順序,最好的做法是在 Put 前,將物件清空。

fmt 包如何用

這部分主要看 fmt.Printf 如何使用:

func Printf(format string,a ...interface{}) (n int,err error) {
	return Fprintf(os.Stdout,format,a...)
}

繼續看 Fprintf

func Fprintf(w io.Writer,format string,err error) {
	p := newPrinter()
	p.doPrintf(format,a)
	n,err = w.Write(p.buf)
	p.free()
	return
}

Fprintf 函式的引數是一個 io.WriterPrintf 傳的是 os.Stdout,相當於直接輸出到標準輸出。這裡的 newPrinter 用的就是 Pool:

// newPrinter allocates a new pp struct or grabs a cached one.
func newPrinter() *pp {
	p := ppFree.Get().(*pp)
	p.panicking = false
	p.erroring = false
	p.wrapErrs = false
	p.fmt.init(&p.buf)
	return p
}

var ppFree = sync.Pool{
	New: func() interface{} { return new(pp) },}

回到 Fprintf 函式,拿到 pp 指標後,會做一些 format 的操作,並且將 p.buf 裡面的內容寫入 w。最後,呼叫 free 函式,將 pp 指標歸還到 Pool 中:

// free saves used pp structs in ppFree; avoids an allocation per invocation.
func (p *pp) free() {
	if cap(p.buf) > 64<<10 {
		return
	}

	p.buf = p.buf[:0]
	p.arg = nil
	p.value = reflect.Value{}
	p.wrappedErr = nil
	ppFree.Put(p)
}

歸還到 Pool 前將物件的一些欄位清零,這樣,通過 Get 拿到快取的物件時,就可以安全地使用了。

pool_test

通過 test 檔案學習原始碼是一個很好的途徑,因為它代表了“官方”的用法。更重要的是,測試用例會故意測試一些“坑”,學習這些坑,也會讓自己在使用的時候就能學會避免。

pool_test 檔案裡共有 7 個測試,4 個 BechMark。

TestPoolTestPoolNew 比較簡單,主要是測試 Get/Put 的功能。我們來看下 TestPoolNew

func TestPoolNew(t *testing.T) {
	// disable GC so we can control when it happens.
	defer debug.SetGCPercent(debug.SetGCPercent(-1))

	i := 0
	p := Pool{
		New: func() interface{} {
			i++
			return i
		},}
	if v := p.Get(); v != 1 {
		t.Fatalf("got %v; want 1",v)
	}
	if v := p.Get(); v != 2 {
		t.Fatalf("got %v; want 2",v)
	}

	// Make sure that the goroutine doesn't migrate to another P
	// between Put and Get calls.
	Runtime_procPin()
	p.Put(42)
	if v := p.Get(); v != 42 {
		t.Fatalf("got %v; want 42",v)
	}
	Runtime_procUnpin()

	if v := p.Get(); v != 3 {
		t.Fatalf("got %v; want 3",v)
	}
}

首先設定了 GC=-1,作用就是停止 GC。那為啥要用 defer?函式都跑完了,還要 defer 幹啥。注意到,debug.SetGCPercent 這個函式被呼叫了兩次,而且這個函式返回的是上一次 GC 的值。因此,defer 在這裡的用途是還原到呼叫此函式之前的 GC 設定,也就是恢復現場。

接著,調置了 Pool 的 New 函式:直接返回一個 int,變且每次呼叫 New,都會自增 1。然後,連續呼叫了兩次 Get 函式,因為這個時候 Pool 裡沒有快取的物件,因此每次都會呼叫 New 建立一個,所以第一次返回 1,第二次返回 2。

然後,呼叫 Runtime_procPin() 防止 goroutine 被強佔,目的是保護接下來的一次 Put 和 Get 操作,使得它們操作的物件都是同一個 P 的“池子”。並且,這次呼叫 Get 的時候並沒有呼叫 New,因為之前有一次 Put 的操作。

最後,再次呼叫 Get 操作,因為沒有“存貨”,因此還是會再次呼叫 New 建立一個物件。

TestPoolGCTestPoolRelease 則主要測試 GC 對 Pool 裡物件的影響。這裡用了一個函式,用於計數有多少物件會被 GC 回收:

runtime.SetFinalizer(v,func(vv *string) {
	atomic.AddUint32(&fin,1)
})

當垃圾回收檢測到 v 是一個不可達的物件時,並且 v 又有一個關聯的 Finalizer,就會另起一個 goroutine 呼叫設定的 finalizer 函式,也就是上面程式碼裡的引數 func。這樣,就會讓物件 v 重新可達,從而在這次 GC 過程中不被回收。之後,解綁物件 v 和它所關聯的 Finalizer,當下次 GC 再次檢測到物件 v 不可達時,才會被回收。

TestPoolStress 從名字看,主要是想測一下“壓力”,具體操作就是起了 10 個 goroutine 不斷地向 Pool 裡 Put 物件,然後又 Get 物件,看是否會出錯。

TestPoolDequeueTestPoolChain,都呼叫了 testPoolDequeue,這是具體幹活的。它需要傳入一個 PoolDequeue 介面:

// poolDequeue testing.
type PoolDequeue interface {
	PushHead(val interface{}) bool
	PopHead() (interface{},bool)
	PopTail() (interface{},bool)
}

PoolDequeue 是一個雙端佇列,可以從頭部入隊元素,從頭部和尾部出隊元素。呼叫函式時,前者傳入 NewPoolDequeue(16),後者傳入 NewPoolChain(),底層其實都是 poolDequeue 這個結構體。具體來看 testPoolDequeue 做了什麼:

深度解密 Go 語言中的 sync.Pool

總共起了 10 個 goroutine:1 個生產者,9 個消費者。生產者不斷地從佇列頭 pushHead 元素到雙端佇列裡去,並且每 push 10 次,就 popHead 一次;消費者則一直從佇列尾取元素。不論是從佇列頭還是從佇列尾取元素,都會在 map 裡做標記,最後檢驗每個元素是不是隻被取出過一次。

剩下的就是 Benchmark 測試了。第一個 BenchmarkPool 比較簡單,就是不停地 Put/Get,測試效能。

BenchmarkPoolSTW 函式會先關掉 GC,再向 pool 裡 put 10 個物件,然後強制觸發 GC,記錄 GC 的停頓時間,並且做一個排序,計算 P50 和 P95 的 STW 時間。這個函式可以加入個人的程式碼庫了:

func BenchmarkPoolSTW(b *testing.B) {
	// Take control of GC.
	defer debug.SetGCPercent(debug.SetGCPercent(-1))

	var mstats runtime.MemStats
	var pauses []uint64

	var p Pool
	for i := 0; i < b.N; i++ {
		// Put a large number of items into a pool.
		const N = 100000
		var item interface{} = 42
		for i := 0; i < N; i++ {
			p.Put(item)
		}
		// Do a GC.
		runtime.GC()
		// Record pause time.
		runtime.ReadMemStats(&mstats)
		pauses = append(pauses,mstats.PauseNs[(mstats.NumGC+255)%256])
	}

	// Get pause time stats.
	sort.Slice(pauses,func(i,j int) bool { return pauses[i] < pauses[j] })
	var total uint64
	for _,ns := range pauses {
		total += ns
	}
	// ns/op for this benchmark is average STW time.
	b.ReportMetric(float64(total)/float64(b.N),"ns/op")
	b.ReportMetric(float64(pauses[len(pauses)*95/100]),"p95-ns/STW")
	b.ReportMetric(float64(pauses[len(pauses)*50/100]),"p50-ns/STW")
}

我在 mac 上跑了一下:

go test -v -run=none -bench=BenchmarkPoolSTW

得到輸出:

goos: darwin
goarch: amd64
pkg: sync
BenchmarkPoolSTW-12 361 3708 ns/op 3583 p50-ns/STW 5008 p95-ns/STW
PASS
ok sync 1.481s

最後一個 BenchmarkPoolExpensiveNew 測試當 New 的代價很高時,Pool 的表現。也可以加入個人的程式碼庫。

其他

標準庫中 encoding/json 也用到了 sync.Pool 來提升效能。著名的 gin 框架,對 context 取用也到了 sync.Pool

來看下 gin 如何使用 sync.Pool。設定 New 函式:

engine.pool.New = func() interface{} {
	return engine.allocateContext()
}

func (engine *Engine) allocateContext() *Context {
	return &Context{engine: engine,KeysMutex: &sync.RWMutex{}}
}

使用:

// ServeHTTP conforms to the http.Handler interface.
func (engine *Engine) ServeHTTP(w http.ResponseWriter,req *http.Request) {
	c := engine.pool.Get().(*Context)
	c.writermem.reset(w)
	c.Request = req
	c.reset()

	engine.handleHTTPRequest(c)

	engine.pool.Put(c)
}

先呼叫 Get 取出來快取的物件,然後會做一些 reset 操作,再執行 handleHTTPRequest,最後再 Put 回 Pool。

另外,Echo 框架也使⽤了 sync.Pool 來管理 context,並且⼏乎達到了零堆記憶體分配:

It leverages sync pool to reuse memory and achieve zero dynamic memory allocation with no GC overhead.

原始碼分析

Pool 結構體

首先來看 Pool 的結構體:

type Pool struct {
	noCopy noCopy

 // 每個 P 的本地佇列,實際型別為 [P]poolLocal
	local unsafe.Pointer // local fixed-size per-P pool,actual type is [P]poolLocal
	// [P]poolLocal的大小
	localSize uintptr // size of the local array

	victim unsafe.Pointer // local from previous cycle
	victimSize uintptr // size of victims array

	// 自定義的物件建立回撥函式,當 pool 中無可用物件時會呼叫此函式
	New func() interface{}
}

因為 Pool 不希望被複制,所以結構體裡有一個 noCopy 的欄位,使用 go vet 工具可以檢測到使用者程式碼是否複製了 Pool。

noCopy 是 go1.7 開始引入的一個靜態檢查機制。它不僅僅工作在執行時或標準庫,同時也對使用者程式碼有效。

使用者只需實現這樣的不消耗記憶體、僅用於靜態分析的結構,來保證一個物件在第一次使用後不會發生複製。

實現非常簡單:

// noCopy 用於嵌入一個結構體中來保證其第一次使用後不會被複制
//
// 見 https://golang.org/issues/8005#issuecomment-190753527
type noCopy struct{}

// Lock 是一個空操作用來給 `go ve` 的 -copylocks 靜態分析
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}

local 欄位儲存指向 [P]poolLocal 陣列(嚴格來說,它是一個切片)的指標,localSize 則表示 local 陣列的大小。訪問時,P 的 id 對應 [P]poolLocal 下標索引。通過這樣的設計,多個 goroutine 使用同一個 Pool 時,減少了競爭,提升了效能。

在一輪 GC 到來時,victim 和 victimSize 會分別“接管” local 和 localSize。victim 的機制用於減少 GC 後冷啟動導致的效能抖動,讓分配物件更平滑。

Victim Cache 本來是計算機架構裡面的一個概念,是 CPU 硬體處理快取的一種技術,sync.Pool 引入的意圖在於降低 GC 壓力的同時提高命中率。

當 Pool 沒有快取的物件時,呼叫 New 方法生成一個新的物件。

type poolLocal struct {
	poolLocalInternal

	// 將 poolLocal 補齊至兩個快取行的倍數,防止 false sharing,// 每個快取行具有 64 bytes,即 512 bit
	// 目前我們的處理器一般擁有 32 * 1024 / 64 = 512 條快取行
	// 偽共享,僅佔位用,防止在 cache line 上分配多個 poolLocalInternal
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

// Local per-P Pool appendix.
type poolLocalInternal struct {
 // P 的私有快取區,使用時無需要加鎖
	private interface{}
	// 公共快取區。本地 P 可以 pushHead/popHead;其他 P 則只能 popTail
	shared poolChain
}

欄位 pad 主要是防止 false sharing,董大的《什麼是 cpu cache》裡講得比較好:

現代 cpu 中,cache 都劃分成以 cache line (cache block) 為單位,在 x86_64 體系下一般都是 64 位元組,cache line 是操作的最小單元。

程式即使只想讀記憶體中的 1 個位元組資料,也要同時把附近 63 節字載入到 cache 中,如果讀取超個 64 位元組,那麼就要載入到多個 cache line 中。

簡單來說,如果沒有 pad 欄位,那麼當需要訪問 0 號索引的 poolLocal 時,CPU 同時會把 0 號和 1 號索引同時載入到 cpu cache。在只修改 0 號索引的情況下,會讓 1 號索引的 poolLocal 失效。這樣,當其他執行緒想要讀取 1 號索引時,發生 cache miss,還得重新再載入,對效能有損。增加一個 pad,補齊快取行,讓相關的欄位能獨立地載入到快取行就不會出現 false sharding 了。

poolChain 是一個雙端佇列的實現:

type poolChain struct {
	// 只有生產者會 push to,不用加鎖
	head *poolChainElt

	// 讀寫需要原子控制。 pop from
	tail *poolChainElt
}

type poolChainElt struct {
	poolDequeue

	// next 被 producer 寫,consumer 讀。所以只會從 nil 變成 non-nil
	// prev 被 consumer 寫,producer 讀。所以只會從 non-nil 變成 nil
	next,prev *poolChainElt
}

type poolDequeue struct {
	// The head index is stored in the most-significant bits so
	// that we can atomically add to it and the overflow is
	// harmless.
	// headTail 包含一個 32 位的 head 和一個 32 位的 tail 指標。這兩個值都和 len(vals)-1 取模過。
	// tail 是佇列中最老的資料,head 指向下一個將要填充的 slot
 // slots 的有效範圍是 [tail,head),由 consumers 持有。
	headTail uint64

	// vals 是一個儲存 interface{} 的環形佇列,它的 size 必須是 2 的冪
	// 如果 slot 為空,則 vals[i].typ 為空;否則,非空。
	// 一個 slot 在這時宣告無效:tail 不指向它了,vals[i].typ 為 nil
	// 由 consumer 設定成 nil,由 producer 讀
	vals []eface
}

poolDequeue 被實現為單生產者、多消費者的固定大小的無鎖(atomic 實現) Ring 式佇列(底層儲存使用陣列,使用兩個指標標記 head、tail)。生產者可以從 head 插入、head 刪除,而消費者僅可從 tail 刪除。

headTail 指向佇列的頭和尾,通過位運算將 head 和 tail 存入 headTail 變數中。

我們用一幅圖來完整地描述 Pool 結構體:

深度解密 Go 語言中的 sync.Pool

結合木白的技術私廚的《請問sync.Pool有什麼缺點?》裡的一張圖,對於雙端佇列的理解會更容易一些:

深度解密 Go 語言中的 sync.Pool

我們看到 Pool 並沒有直接使用 poolDequeue,原因是它的大小是固定的,而 Pool 的大小是沒有限制的。因此,在 poolDequeue 之上包裝了一下,變成了一個 poolChainElt 的雙向連結串列,可以動態增長。

Get

直接上原始碼:

func (p *Pool) Get() interface{} {
 // ......
	l,pid := p.pin()
	x := l.private
	l.private = nil
	if x == nil {
		x,_ = l.shared.popHead()
		if x == nil {
			x = p.getSlow(pid)
		}
	}
	runtime_procUnpin()
 // ......
	if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}

省略號的內容是 race 相關的,屬於閱讀原始碼過程中的一些噪音,暫時註釋掉。這樣,Get 的整個過程就非常清晰了:

  1. 首先,呼叫 p.pin() 函式將當前的 goroutine 和 P 繫結,禁止被搶佔,返回當前 P 對應的 poolLocal,以及 pid。
  2. 然後直接取 l.private,賦值給 x,並置 l.private 為 nil。
  3. 判斷 x 是否為空,若為空,則嘗試從 l.shared 的頭部 pop 一個物件出來,同時賦值給 x。
  4. 如果 x 仍然為空,則呼叫 getSlow 嘗試從其他 P 的 shared 雙端佇列尾部“偷”一個物件出來。
  5. Pool 的相關操作做完了,呼叫 runtime_procUnpin() 解除非搶佔。
  6. 最後如果還是沒有取到快取的物件,那就直接呼叫預先設定好的 New 函式,建立一個出來。

我用一張流程圖來展示整個過程:

深度解密 Go 語言中的 sync.Pool

整體流程梳理完了,我們再來看一下其中的一些關鍵函式。

pin

先來看 Pool.pin()

// src/sync/pool.go

// 呼叫方必須在完成取值後呼叫 runtime_procUnpin() 來取消搶佔。
func (p *Pool) pin() (*poolLocal,int) {
	pid := runtime_procPin()
	s := atomic.LoadUintptr(&p.localSize) // load-acquire
	l := p.local    // load-consume
	// 因為可能存在動態的 P(執行時調整 P 的個數)
	if uintptr(pid) < s {
		return indexLocal(l,pid),pid
	}
	return p.pinSlow()
}

pin 的作用就是將當前 groutine 和 P 繫結在一起,禁止搶佔。並且返回對應的 poolLocal 以及 P 的 id。

如果 G 被搶佔,則 G 的狀態從 running 變成 runnable,會被放回 P 的 localq 或 globaq,等待下一次排程。下次再執行時,就不一定是和現在的 P 相結合了。因為之後會用到 pid,如果被搶佔了,有可能接下來使用的 pid 與所繫結的 P 並非同一個。

“繫結”的任務最終交給了 procPin

// src/runtime/proc.go

func procPin() int {
	_g_ := getg()
	mp := _g_.m

	mp.locks++
	return int(mp.p.ptr().id)
}

實現的程式碼很簡潔:將當前 goroutine 繫結的 m 上的一個鎖欄位 locks 值加 1,即完成了“繫結”。關於 pin 的原理,可以參考《golang的物件池sync.pool原始碼解讀》,文章詳細分析了為什麼執行 procPin 之後,不可搶佔,且 GC 不會清掃 Pool 裡的物件。

我們再回到 p.pin(),原子操作取出 p.localSizep.local,如果當前 pid 小於 p.localSize,則直接取 poolLocal 陣列中的 pid 索引處的元素。否則,說明 Pool 還沒有建立 poolLocal,呼叫 p.pinSlow() 完成建立工作。

func (p *Pool) pinSlow() (*poolLocal,int) {
	// Retry under the mutex.
	// Can not lock the mutex while pinned.
	runtime_procUnpin()
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	pid := runtime_procPin()
	// poolCleanup won't be called while we are pinned.
	// 沒有使用原子操作,因為已經加了全域性鎖了
	s := p.localSize
	l := p.local
	// 因為 pinSlow 中途可能已經被其他的執行緒呼叫,因此這時候需要再次對 pid 進行檢查。 如果 pid 在 p.local 大小範圍內,則不用建立 poolLocal 切片,直接返回。
	if uintptr(pid) < s {
		return indexLocal(l,pid
	}
	if p.local == nil {
		allPools = append(allPools,p)
	}
	// If GOMAXPROCS changes between GCs,we re-allocate the array and lose the old one.
	// 當前 P 的數量
	size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal,size)
	// 舊的 local 會被回收
	atomic.StorePointer(&p.local,unsafe.Pointer(&local[0])) // store-release
	atomic.StoreUintptr(&p.localSize,uintptr(size))  // store-release
	return &local[pid],pid
}

因為要上一把大鎖 allPoolsMu,所以函式名帶有 slow。我們知道,鎖粒度越大,競爭越多,自然就越“slow”。不過要想上鎖的話,得先解除“繫結”,鎖上之後,再執行“繫結”。原因是鎖越大,被阻塞的概率就越大,如果還佔著 P,那就浪費資源。

在解除繫結後,pinSlow 可能被其他的執行緒呼叫過了,p.local 可能會發生變化。因此這時候需要再次對 pid 進行檢查。如果 pid 在 p.localSize 大小範圍內,則不用再建立 poolLocal 切片,直接返回。

之後,根據 P 的個數,使用 make 建立切片,包含 runtime.GOMAXPROCS(0) 個 poolLocal,並且使用原子操作設定 p.local 和 p.localSize。

最後,返回 p.local 對應 pid 索引處的元素。

關於這把大鎖 allPoolsMu,曹大在《幾個 Go 系統可能遇到的鎖問題》裡講了一個例子。第三方庫用了 sync.Pool,內部有一個結構體 fasttemplate.Template,包含 sync.Pool 欄位。而 rd 在使用時,每個請求都會新建這樣一個結構體。於是,處理每個請求時,都會嘗試從一個空的 Pool 裡取快取的物件,最後 goroutine 都阻塞在了這把大鎖上,因為都在嘗試執行:allPools = append(allPools,p),從而造成效能問題。

popHead

回到 Get 函式,再來看另一個關鍵的函式:poolChain.popHead()

func (c *poolChain) popHead() (interface{},bool) {
	d := c.head
	for d != nil {
		if val,ok := d.popHead(); ok {
			return val,ok
		}
		// There may still be unconsumed elements in the
		// previous dequeue,so try backing up.
		d = loadPoolChainElt(&d.prev)
	}
	return nil,false
}

popHead 函式只會被 producer 呼叫。首先拿到頭節點:c.head,如果頭節點不為空的話,嘗試呼叫頭節點的 popHead 方法。注意這兩個 popHead 方法實際上並不相同,一個是 poolChain 的,一個是 poolDequeue 的,有疑惑的,不妨回頭再看一下 Pool 結構體的圖。我們來看 poolDequeue.popHead()

// /usr/local/go/src/sync/poolqueue.go

func (d *poolDequeue) popHead() (interface{},bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		head,tail := d.unpack(ptrs)
		// 判斷佇列是否為空
		if tail == head {
			// Queue is empty.
			return nil,false
		}

		// head 位置是隊頭的前一個位置,所以此處要先退一位。
		// 在讀出 slot 的 value 之前就把 head 值減 1,取消對這個 slot 的控制
		head--
		ptrs2 := d.pack(head,tail)
		if atomic.CompareAndSwapUint64(&d.headTail,ptrs,ptrs2) {
			// We successfully took back slot.
			slot = &d.vals[head&uint32(len(d.vals)-1)]
			break
		}
	}

 // 取出 val
	val := *(*interface{})(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil
	}
	
	// 重置 slot,typ 和 val 均為 nil
	// 這裡清空的方式與 popTail 不同,與 pushHead 沒有競爭關係,所以不用太小心
	*slot = eface{}
	return val,true
}

此函式會刪掉並且返回 queue 的頭節點。但如果 queue 為空的話,返回 false。這裡的 queue 儲存的實際上就是 Pool 裡快取的物件。

整個函式的核心是一個無限迴圈,這是 Go 中常用的無鎖化程式設計形式。

首先呼叫 unpack 函式分離出 head 和 tail 指標,如果 head 和 tail 相等,即首尾相等,那麼這個佇列就是空的,直接就返回 nil,false

否則,將 head 指標後移一位,即 head 值減 1,然後呼叫 pack 打包 head 和 tail 指標。使用 atomic.CompareAndSwapUint64 比較 headTail 在這之間是否有變化,如果沒變化,相當於獲取到了這把鎖,那就更新 headTail 的值。並且把 vals 相應索引處的元素賦值給 slot。

因為 vals 長度實際是隻能是 2 的 n 次冪,因此 len(d.vals)-1 實際上得到的值的低 n 位是全 1,它再與 head 相與,實際就是取 head 低 n 位的值。

得到相應 slot 的元素後,經過型別轉換並判斷是否是 dequeueNil,如果是,說明沒取到快取的物件,返回 nil。

// /usr/local/go/src/sync/poolqueue.go
// 因為使用 nil 代表空的 slots,因此使用 dequeueNil 表示 interface{}(nil)
type dequeueNil *struct{}

最後,返回 val 之前,將 slot “歸零”:*slot = eface{}

回到 poolChain.popHead(),呼叫 poolDequeue.popHead() 拿到快取的物件後,直接返回。否則,將 d 重新指向 d.prev,繼續嘗試獲取快取的物件。

getSlow

如果在 shared 裡沒有獲取到快取物件,則繼續呼叫 Pool.getSlow(),嘗試從其他 P 的 poolLocal 偷取:

func (p *Pool) getSlow(pid int) interface{} {
	// See the comment in pin regarding ordering of the loads.
	size := atomic.LoadUintptr(&p.localSize) // load-acquire
	locals := p.local   // load-consume
	// Try to steal one element from other procs.
	// 從其他 P 中竊取物件
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals,(pid+i+1)%int(size))
		if x,_ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// 嘗試從victim cache中取物件。這發生在嘗試從其他 P 的 poolLocal 偷去失敗後,
	// 因為這樣可以使 victim 中的物件更容易被回收。
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	locals = p.victim
	l := indexLocal(locals,pid)
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals,(pid+i)%int(size))
		if x,_ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// 清空 victim cache。下次就不用再從這裡找了
	atomic.StoreUintptr(&p.victimSize,0)

	return nil
}

從索引為 pid+1 的 poolLocal 處開始,嘗試呼叫 shared.popTail() 獲取快取物件。如果沒有拿到,則從 victim 裡找,和 poolLocal 的邏輯類似。

最後,實在沒找到,就把 victimSize 置 0,防止後來的“人”再到 victim 裡找。

在 Get 函式的最後,經過這一番操作還是沒找到快取的物件,就呼叫 New 函式建立一個新的物件。

popTail

最後,還剩一個 popTail 函式:

func (c *poolChain) popTail() (interface{},bool) {
	d := loadPoolChainElt(&c.tail)
	if d == nil {
		return nil,false
	}

	for {
		d2 := loadPoolChainElt(&d.next)

		if val,ok := d.popTail(); ok {
			return val,ok
		}

		if d2 == nil {
			// 雙向連結串列只有一個尾節點,現在為空
			return nil,false
		}

		// 雙向連結串列的尾節點裡的雙端佇列被“掏空”,所以繼續看下一個節點。
		// 並且由於尾節點已經被“掏空”,所以要甩掉它。這樣,下次 popHead 就不會再看它有沒有快取物件了。
		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)),unsafe.Pointer(d),unsafe.Pointer(d2)) {
			// 甩掉尾節點
			storePoolChainElt(&d2.prev,nil)
		}
		d = d2
	}
}

for 迴圈的一開始,就把 d.next 載入到了 d2。因為 d 可能會短暫為空,但如果 d2 在 pop 或者 pop fails 之前就不為空的話,說明 d 就會永久為空了。在這種情況下,可以安全地將 d 這個結點“甩掉”。

最後,將 c.tail 更新為 d2,可以防止下次 popTail 的時候檢視一個空的 dequeue;而將 d2.prev 設定為 nil,可以防止下次 popHead 時檢視一個空的 dequeue

我們再看一下核心的 poolDequeue.popTail

// src/sync/poolqueue.go:147

func (d *poolDequeue) popTail() (interface{},tail := d.unpack(ptrs)
		// 判斷佇列是否空
		if tail == head {
			// Queue is empty.
			return nil,false
		}

		// 先搞定 head 和 tail 指標位置。如果搞定,那麼這個 slot 就歸屬我們了
		ptrs2 := d.pack(head,tail+1)
		if atomic.CompareAndSwapUint64(&d.headTail,ptrs2) {
			// Success.
			slot = &d.vals[tail&uint32(len(d.vals)-1)]
			break
		}
	}

	// We now own slot.
	val := *(*interface{})(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil
	}

	slot.val = nil
	atomic.StorePointer(&slot.typ,nil)
	// At this point pushHead owns the slot.

	return val,true
}

popTail 從佇列尾部移除一個元素,如果佇列為空,返回 false。此函式可能同時被多個消費者呼叫。

函式的核心是一個無限迴圈,又是一個無鎖程式設計。先解出 head,tail 指標值,如果兩者相等,說明佇列為空。

因為要從尾部移除一個元素,所以 tail 指標前進 1,然後使用原子操作設定 headTail。

最後,將要移除的 slot 的 val 和 typ “歸零”:

slot.val = nil
atomic.StorePointer(&slot.typ,nil)

Put

// src/sync/pool.go

// Put 將物件新增到 Pool 
func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
	// ……
	l,_ := p.pin()
	if l.private == nil {
		l.private = x
		x = nil
	}
	if x != nil {
		l.shared.pushHead(x)
	}
	runtime_procUnpin()
 //…… 
}

同樣刪掉了 race 相關的函式,看起來清爽多了。整個 Put 的邏輯也很清晰:

  • 先繫結 g 和 P,然後嘗試將 x 賦值給 private 欄位。
  • 如果失敗,就呼叫 pushHead 方法嘗試將其放入 shared 欄位所維護的雙端佇列中。

同樣用流程圖來展示整個過程:

深度解密 Go 語言中的 sync.Pool

pushHead

我們來看 pushHead 的原始碼,比較清晰:

// src/sync/poolqueue.go

func (c *poolChain) pushHead(val interface{}) {
	d := c.head
	if d == nil {
		// poolDequeue 初始長度為8
		const initSize = 8 // Must be a power of 2
		d = new(poolChainElt)
		d.vals = make([]eface,initSize)
		c.head = d
		storePoolChainElt(&c.tail,d)
	}

	if d.pushHead(val) {
		return
	}

 // 前一個 poolDequeue 長度的 2 倍
	newSize := len(d.vals) * 2
	if newSize >= dequeueLimit {
		// Can't make it any bigger.
		newSize = dequeueLimit
	}

 // 首尾相連,構成連結串列
	d2 := &poolChainElt{prev: d}
	d2.vals = make([]eface,newSize)
	c.head = d2
	storePoolChainElt(&d.next,d2)
	d2.pushHead(val)
}

如果 c.head 為空,就要建立一個 poolChainElt,作為首結點,當然也是尾節點。它管理的雙端佇列的長度,初始為 8,放滿之後,再建立一個 poolChainElt 節點時,雙端佇列的長度就要翻倍。當然,有一個最大長度限制(2^30):

const dequeueBits = 32
const dequeueLimit = (1 << dequeueBits) / 4

呼叫 poolDequeue.pushHead 嘗試將物件放到 poolDeque 裡去:

// src/sync/poolqueue.go

// 將 val 新增到雙端佇列頭部。如果佇列已滿,則返回 false。此函式只能被一個生產者呼叫
func (d *poolDequeue) pushHead(val interface{}) bool {
	ptrs := atomic.LoadUint64(&d.headTail)
	head,tail := d.unpack(ptrs)
	if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
		// 佇列滿了
		return false
	}
	slot := &d.vals[head&uint32(len(d.vals)-1)]

	// 檢測這個 slot 是否被 popTail 釋放
	typ := atomic.LoadPointer(&slot.typ)
	if typ != nil {
		// 另一個 groutine 正在 popTail 這個 slot,說明佇列仍然是滿的
		return false
	}

	// The head slot is free,so we own it.
	if val == nil {
		val = dequeueNil(nil)
	}
	
	// slot佔位,將val存入vals中
	*(*interface{})(unsafe.Pointer(slot)) = val

	// head 增加 1
	atomic.AddUint64(&d.headTail,1<<dequeueBits)
	return true
}

首先判斷佇列是否已滿:

if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
	// Queue is full.
	return false
}

也就是將尾部指標加上 d.vals 的長度,再取低 31 位,看它是否和 head 相等。我們知道,d.vals 的長度實際上是固定的,因此如果佇列已滿,那麼 if 語句的兩邊就是相等的。如果佇列滿了,直接返回 false。

否則,佇列沒滿,通過 head 指標找到即將填充的 slot 位置:取 head 指標的低 31 位。

// Check if the head slot has been released by popTail.
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
	// Another goroutine is still cleaning up the tail,so
	// the queue is actually still full.
	// popTail 是先設定 val,再將 typ 設定為 nil。設定完 typ 之後,popHead 才可以操作這個 slot
	return false
}

上面這一段用來判斷是否和 popTail 有衝突發生,如果有,則直接返回 false。

最後,將 val 賦值到 slot,並將 head 指標值加 1。

// slot佔位,將val存入vals中
*(*interface{})(unsafe.Pointer(slot)) = val

這裡的實現比較巧妙,slot 是 eface 型別,將 slot 轉為 interface{} 型別,這樣 val 能以 interface{} 賦值給 slot 讓 slot.typ 和 slot.val 指向其記憶體塊,於是 slot.typ 和 slot.val 均不為空。

pack/unpack

最後我們再來看一下 pack 和 unpack 函式,它們實際上是一組繫結、解綁 head 和 tail 指標的兩個函式。

// src/sync/poolqueue.go

const dequeueBits = 32

func (d *poolDequeue) pack(head,tail uint32) uint64 {
	const mask = 1<<dequeueBits - 1
	return (uint64(head) << dequeueBits) |
		uint64(tail&mask)
}

mask 的低 31 位為全 1,其他位為 0,它和 tail 相與,就是隻看 tail 的低 31 位。而 head 向左移 32 位之後,低 32 位為全 0。最後把兩部分“或”起來,head 和 tail 就“繫結”在一起了。

相應的解綁函式:

func (d *poolDequeue) unpack(ptrs uint64) (head,tail uint32) {
	const mask = 1<<dequeueBits - 1
	head = uint32((ptrs >> dequeueBits) & mask)
	tail = uint32(ptrs & mask)
	return
}

取出 head 指標的方法就是將 ptrs 右移 32 位,再與 mask 相與,同樣只看 head 的低 31 位。而 tail 實際上更簡單,直接將 ptrs 與 mask 相與就可以了。

GC

對於 Pool 而言,並不能無限擴充套件,否則物件佔用記憶體太多了,會引起記憶體溢位。

幾乎所有的池技術中,都會在某個時刻清空或清除部分快取物件,那麼在 Go 中何時清理未使用的物件呢?

答案是 GC 發生時。

在 pool.go 檔案的 init 函式裡,註冊了 GC 發生時,如何清理 Pool 的函式:

// src/sync/pool.go

func init() {
	runtime_registerPoolCleanup(poolCleanup)
}

編譯器在背後做了一些動作:

// src/runtime/mgc.go

// Hooks for other packages

var poolcleanup func()

// 利用編譯器標誌將 sync 包中的清理註冊到執行時
//go:linkname sync_runtime_registerPoolCleanup sync.runtime_registerPoolCleanup
func sync_runtime_registerPoolCleanup(f func()) {
	poolcleanup = f
}

具體來看下:

func poolCleanup() {
	for _,p := range oldPools {
		p.victim = nil
		p.victimSize = 0
	}

	// Move primary cache to victim cache.
	for _,p := range allPools {
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}

	oldPools,allPools = allPools,nil
}

poolCleanup 會在 STW 階段被呼叫。整體看起來,比較簡潔。主要是將 local 和 victim 作交換,這樣也就不致於讓 GC 把所有的 Pool 都清空了,有 victim 在“兜底”。

如果 sync.Pool 的獲取、釋放速度穩定,那麼就不會有新的池物件進行分配。如果獲取的速度下降了,那麼物件可能會在兩個 GC 週期內被釋放,而不是以前的一個 GC 週期。

鳥窩的【Go 1.13中 sync.Pool 是如何優化的?】講了 1.13 中的優化。

參考資料【理解 Go 1.13 中 sync.Pool 的設計與實現】 手動模擬了一下呼叫 poolCleanup 函式前後 oldPools,allPools,p.vitcim 的變化過程,很精彩:

初始狀態下,oldPools 和 allPools 均為 nil。

第 1 次呼叫 Get,由於 p.local 為 nil,將會在 pinSlow 中建立 p.local,然後將 p 放入 allPools,此時 allPools 長度為 1,oldPools 為 nil。物件使用完畢,第 1 次呼叫 Put 放回物件。第 1 次GC STW 階段,allPools 中所有 p.local 將值賦值給 victim 並置為 nil。allPools 賦值給 oldPools,最後 allPools 為 nil,oldPools 長度為 1。第 2 次呼叫 Get,由於 p.local 為 nil,此時會從 p.victim 裡面嘗試取物件。物件使用完畢,第 2 次呼叫 Put 放回物件,但由於 p.local 為 nil,重新建立 p.local,並將物件放回,此時 allPools 長度為 1,oldPools 長度為 1。第 2 次 GC STW 階段,oldPools 中所有 p.victim 置 nil,前一次的 cache 在本次 GC 時被回收,allPools 所有 p.local 將值賦值給 victim 並置為nil,最後 allPools 為 nil,oldPools 長度為 1。

我根據這個流程畫了一張圖,可以理解地更清晰一些:

深度解密 Go 語言中的 sync.Pool

需要指出的是,allPoolsoldPools 都是切片,切片的元素是指向 Pool 的指標,Get/Put 操作不需要通過它們。在第 6 步,如果還有其他 Pool 執行了 Put 操作,allPools 這時就會有多個元素。

在 Go 1.13 之前的實現中,poolCleanup 比較“簡單粗暴”:

func poolCleanup() {
 for i,p := range allPools {
  allPools[i] = nil
  for i := 0; i < int(p.localSize); i++ {
   l := indexLocal(p.local,i)
   l.private = nil
   for j := range l.shared {
    l.shared[j] = nil
   }
   l.shared = nil
  }
  p.local = nil
  p.localSize = 0
 }
 allPools = []*Pool{}
}

直接清空了所有 Pool 的 p.localpoolLocal.shared

通過兩者的對比發現,新版的實現相比 Go 1.13 之前,GC 的粒度拉大了,由於實際回收的時間線拉長,單位時間內 GC 的開銷減小。

由此基本明白 p.victim 的作用。它的定位是次級快取,GC 時將物件放入其中,下一次 GC 來臨之前如果有 Get 呼叫則會從 p.victim 中取,直到再一次 GC 來臨時回收。

同時由於從 p.victim 中取出物件使用完畢之後並未放回 p.victim 中,在一定程度也減小了下一次 GC 的開銷。原來 1 次 GC 的開銷被拉長到 2 次且會有一定程度的開銷減小,這就是 p.victim 引入的意圖。

【理解 Go 1.13 中 sync.Pool 的設計與實現】 這篇文章最後還總結了 sync.Pool 的設計理念,包括:無鎖、操作物件隔離、原子操作代替鎖、行為隔離——連結串列、Victim Cache 降低 GC 開銷。寫得非常不錯,推薦閱讀。

另外,關於 sync.Pool 中鎖競爭優化的文章,推薦閱讀芮大神的【優化鎖競爭】。

總結

本文先是介紹了 Pool 是什麼,有什麼作用,接著給出了 Pool 的用法以及在標準庫、一些第三方庫中的用法,還介紹了 pool_test 中的一些測試用例。最後,詳細解讀了 sync.Pool 的原始碼。

本文的結尾部分,再來詳細地總結一下關於 sync.Pool 的要點:

  1. 關鍵思想是物件的複用,避免重複建立、銷燬。將暫時不用的物件快取起來,待下次需要的時候直接使用,不用再次經過記憶體分配,複用物件的記憶體,減輕 GC 的壓力。
  2. sync.Pool 是協程安全的,使用起來非常方便。設定好 New 函式後,呼叫 Get 獲取,呼叫 Put 歸還物件。
  3. Go 語言內建的 fmt 包,encoding/json 包都可以看到 sync.Pool 的身影;ginEcho 等框架也都使用了 sync.Pool。
  4. 不要對 Get 得到的物件有任何假設,更好的做法是歸還物件時,將物件“清空”。
  5. Pool 裡物件的生命週期受 GC 影響,不適合於做連線池,因為連線池需要自己管理物件的生命週期。
  6. Pool 不可以指定⼤⼩,⼤⼩只受制於 GC 臨界值。
  7. procPin 將 G 和 P 繫結,防止 G 被搶佔。在繫結期間,GC 無法清理快取的物件。
  8. 在加入 victim 機制前,sync.Pool 裡物件的最⼤快取時間是一個 GC 週期,當 GC 開始時,沒有被引⽤的物件都會被清理掉;加入 victim 機制後,最大快取時間為兩個 GC 週期。
  9. Victim Cache 本來是計算機架構裡面的一個概念,是 CPU 硬體處理快取的一種技術,sync.Pool 引入的意圖在於降低 GC 壓力的同時提高命中率。
  10. sync.Pool 的最底層使用切片加連結串列來實現雙端佇列,並將快取的物件儲存在切片中。

參考資料

【歐神 原始碼分析】https://changkun.us/archives/2018/09/256/

【Go 夜讀】https://reading.hidevops.io/reading/20180817/2018-08-17-sync-pool-reading.pdf

【夜讀第 14 期視訊】https://www.youtube.com/watch?v=jaepwn2PWPk&list=PLe5svQwVF1L5bNxB0smO8gNfAZQYWdIpI

【原始碼分析,偽共享】https://juejin.im/post/5d4087276fb9a06adb7fbe4a

【golang的物件池sync.pool原始碼解讀】https://zhuanlan.zhihu.com/p/99710992

【理解 Go 1.13 中 sync.Pool 的設計與實現】https://zhuanlan.zhihu.com/p/110140126

【優缺點,圖】http://cbsheng.github.io/posts/golang標準庫sync.pool原理及原始碼簡析/

【xiaorui 優化鎖競爭】http://xiaorui.cc/archives/5878

【效能優化之路,自定義多種規格的快取】https://blog.cyeam.com/golang/2017/02/08/go-optimize-slice-pool

【sync.Pool 有什麼缺點】https://mp.weixin.qq.com/s?__biz=MzA4ODg0NDkzOA==&mid=2247487149&idx=1&sn=f38f2d72fd7112e19e97d5a2cd304430&source=41

【1.12 和 1.13 的演變】https://github.com/watermelo/dailyTrans/blob/master/golang/sync_pool_understand.md

【董澤潤 演進】https://www.jianshu.com/p/2e08332481c5

【noCopy】https://github.com/golang/go/issues/8005

【董澤潤 cpu cache】https://www.jianshu.com/p/dc4b5562aad2

【gomemcache 例子】https://docs.kilvn.com/The-Golang-Standard-Library-by-Example/chapter16/16.01.html

【鳥窩 1.13 優化】https://colobu.com/2019/10/08/how-is-sync-Pool-improved-in-Go-1-13/

【A journey with go】https://medium.com/a-journey-with-go/go-understand-the-design-of-sync-pool-2dde3024e277

【封裝了一個計陣列件】https://www.akshaydeo.com/blog/2017/12/23/How-did-I-improve-latency-by-700-percent-using-syncPool/

【偽共享】http://ifeve.com/falsesharing/

到此這篇關於深度解密 Go 語言之 sync.Pool的文章就介紹到這了,更多相關go sync.pool內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!