1. 程式人生 > 程式設計 >深入理解Go-goroutine的實現及Scheduler分析

深入理解Go-goroutine的實現及Scheduler分析

在學習Go的過程中,最讓人驚歎的莫過於goroutine了。但是goroutine是什麼,我們用go關鍵字就可以建立一個goroutine,這麼多的goroutine之間,是如何排程的呢?

1. 結構概覽

在看Go原始碼的過程中,遍地可見g、p、m,我們首先就看一下這些關鍵字的結構及相互之間的關係

1.1. 資料結構

這裡我們僅列出來了結構體裡面比較關鍵的一些成員

1.1.1. G(gouroutine)

goroutine是執行時的最小執行單元

type g struct {
	// Stack parameters.
	// stack describes the actual stack memory: [stack.lo,stack.hi).
// stackguard0 is the stack pointer compared in the Go stack growth prologue. // It is stack.lo+StackGuard normally,but can be StackPreempt to trigger a preemption. // stackguard1 is the stack pointer compared in the C stack growth prologue. // It is stack.lo+StackGuard on g0 and gsignal stacks. // It is ~0 on other goroutine stacks,to trigger a call to morestackc (and crash).
// 當前g使用的棧空間,stack結構包括 [lo,hi]兩個成員 stack stack // offset known to runtime/cgo // 用於檢測是否需要進行棧擴張,go程式碼使用 stackguard0 uintptr // offset known to liblink // 用於檢測是否需要進行棧擴充套件,原生程式碼使用的 stackguard1 uintptr // offset known to liblink // 當前g所繫結的m m *m // current m; offset known to arm liblink
// 當前g的排程資料,當goroutine切換時,儲存當前g的上下文,用於恢復 sched gobuf // g當前的狀態 atomicstatus uint32 // 當前g的id goid int64 // 下一個g的地址,通過guintptr結構體的ptr set函式可以設定和獲取下一個g,通過這個欄位和sched.gfreeStack sched.gfreeNoStack 可以把 free g串成一個連結串列 schedlink guintptr // 判斷g是否允許被搶佔 preempt bool // preemption signal,duplicates stackguard0 = stackpreempt // g是否要求要回到這個M執行,有的時候g中斷了恢復會要求使用原來的M執行 lockedm muintptr } 複製程式碼

1.1.2. P(process)

P是M執行G所需的資源

type p struct {
   lock mutex

   id          int32
   // p的狀態,稍後介紹
   status      uint32 // one of pidle/prunning/...
   // 下一個p的地址,可參考 g.schedlink
   link        puintptr
   // p所關聯的m
   m           muintptr   // back-link to associated m (nil if idle)
   // 記憶體分配的時候用的,p所屬的m的mcache用的也是這個
   mcache      *mcache
  
   // Cache of goroutine ids,amortizes accesses to runtime·sched.goidgen.
   // 從sched中獲取並快取的id,避免每次分配goid都從sched分配
	 goidcache    uint64
	 goidcacheend uint64

   // Queue of runnable goroutines. Accessed without lock.
   // p 本地的runnbale的goroutine形成的佇列
   runqhead uint32
   runqtail uint32
   runq     [256]guintptr
   // runnext,if non-nil,is a runnable G that was ready'd by
   // the current G and should be run next instead of what's in
   // runq if there's time remaining in the running G's time
   // slice. It will inherit the time left in the current time
   // slice. If a set of goroutines is locked in a
   // communicate-and-wait pattern,this schedules that set as a
   // unit and eliminates the (potentially large) scheduling
   // latency that otherwise arises from adding the ready'd
   // goroutines to the end of the run queue.
   // 下一個執行的g,如果是nil,則從佇列中獲取下一個執行的g
   runnext guintptr

   // Available G's (status == Gdead)
   // 狀態為 Gdead的g的列表,可以進行復用
   gfree    *g
   gfreecnt int32
}
複製程式碼

1.1.3. M(machine)

type m struct {
   // g0是用於排程和執行系統呼叫的特殊g
   g0      *g     // goroutine with scheduling stack
	 // m當前執行的g
   curg          *g       // current running goroutine
   // 當前擁有的p
   p             puintptr // attached p for executing go code (nil if not executing go code)
   // 執行緒的 local storage
   tls           [6]uintptr   // thread-local storage
   // 喚醒m時,m會擁有這個p
   nextp         puintptr
   id            int64
   // 如果 !="",繼續執行curg
   preemptoff    string // if != "",keep curg running on this m
   // 自旋狀態,用於判斷m是否工作已結束,並尋找g進行工作
   spinning      bool // m is out of work and is actively looking for work
   // 用於判斷m是否進行休眠狀態
   blocked       bool // m is blocked on a note
	 // m休眠和喚醒通過這個,note裡面有一個成員key,對這個key所指向的地址進行值的修改,進而達到喚醒和休眠的目的
   park          note
   // 所有m組成的一個連結串列
   alllink       *m // on allm
   // 下一個m,通過這個欄位和sched.midle 可以串成一個m的空閒連結串列
   schedlink     muintptr
   // mcache,m擁有p的時候,會把自己的mcache給p
   mcache        *mcache
   // lockedm的對應值
   lockedg       guintptr
   // 待釋放的m的list,通過sched.freem 串成一個連結串列
   freelink      *m      // on sched.freem
}
複製程式碼

1.1.4. sched

type schedt struct {
   // 全域性的go id分配
   goidgen  uint64
   // 記錄的最後一次從i/o中查詢g的時間
   lastpoll uint64

   lock mutex

   // When increasing nmidle,nmidlelocked,nmsys,or nmfreed,be
   // sure to call checkdead().
	 // m的空閒連結串列,結合m.schedlink 就可以組成一個空閒連結串列了
   midle        muintptr // idle m's waiting for work
   nmidle       int32    // number of idle m's waiting for work
   nmidlelocked int32    // number of locked m's waiting for work
   // 下一個m的id,也用來記錄建立的m數量
   mnext        int64    // number of m's that have been created and next M ID
   // 最多允許的m的數量
   maxmcount    int32    // maximum number of m's allowed (or die)
   nmsys        int32    // number of system m's not counted for deadlock
   // free掉的m的數量,exit的m的數量
   nmfreed      int64    // cumulative number of freed m's

   ngsys uint32 // number of system goroutines; updated atomically

   pidle      puintptr // idle p's
   npidle     uint32
   nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.

   // Global runnable queue.
   // 這個就是全域性的g的隊列了,如果p的本地佇列沒有g或者太多,會跟全域性佇列進行平衡
   // 根據runqhead可以獲取佇列頭的g,然後根據g.schedlink 獲取下一個,從而形成了一個連結串列
   runqhead guintptr
   runqtail guintptr
   runqsize int32

   // freem is the list of m's waiting to be freed when their
   // m.exited is set. Linked through m.freelink.
   // 等待釋放的m的列表
   freem *m
}
複製程式碼

在這裡插一下狀態的解析

1.1.5. g.status

  • _Gidle: goroutine剛剛建立還沒有初始化
  • _Grunnable: goroutine處於執行佇列中,但是還沒有執行,沒有自己的棧
  • _Grunning: 這個狀態的g可能處於執行使用者程式碼的過程中,擁有自己的m和p
  • _Gsyscall: 執行systemcall中
  • _Gwaiting: 這個狀態的goroutine正在阻塞中,類似於等待channel
  • _Gdead: 這個狀態的g沒有被使用,有可能是剛剛退出,也有可能是正在初始化中
  • _Gcopystack: 表示g當前的棧正在被移除,新棧分配中

1.1.6. p.status

  • _Pidle: 空閒狀態,此時p不繫結m
  • _Prunning: m獲取到p的時候,p的狀態就是這個狀態了,然後m可以使用這個p的資源執行g
  • _Psyscall: 當go呼叫原生程式碼,原生程式碼又反過來呼叫go的時候,使用的p就會變成此態
  • _Pdead: 當執行中,需要減少p的數量時,被減掉的p的狀態就是這個了

1.1.7. m.status

m的status沒有p、g的那麼明確,但是在執行流程的分析中,主要有以下幾個狀態

  • 執行中: 拿到p,執行g的過程中
  • 執行原生程式碼: 正在執行原聲程式碼或者阻塞的syscall
  • 休眠中: m發現無待執行的g時,進入休眠,並加入到空閒列表中
  • 自旋中(spining): 當前工作結束,正在尋找下一個待執行的g

在上面的結構中,存在很多的連結串列,g m p結構中還有指向對方地址的成員,那麼他們的關係到底是什麼樣的

我們可以從上圖,簡單的表述一下 m p g的關係

2. 流程概覽

從下圖,可以簡單的一窺go的整個排程流程的大概

接下來我們就從原始碼的角度來具體的分析整個排程流程(本人彙編不照,彙編方面的就不分析了?)

3. 原始碼分析

3.1. 初始化

go的啟動流程分為4步

  1. call osinit, 這裡就是設定了全域性變數ncpu = cpu核心數量
  2. call schedinit
  3. make & queue new G (runtime.newproc,go func()也是呼叫這個函式來建立goroutine)
  4. call runtime·mstart

其中,schedinit 就是排程器的初始化,出去schedinit 中對記憶體分配,垃圾回收等操作,針對排程器的初始化大致就是初始化自身,設定最大的maxmcount, 確定p的數量並初始化這些操作

3.1.1. schedinit

schedinit這裡對當前m進行了初始化,並根據osinit獲取到的cpu核數和設定的GOMAXPROCS 確定p的數量,並進行初始化

func schedinit() {
	// 從TLS或者專用暫存器獲取當前g的指標型別
	_g_ := getg()
	// 設定m最大的數量
	sched.maxmcount = 10000

	// 初始化棧的複用空間
	stackinit()
	// 初始化當前m
	mcommoninit(_g_.m)

	// osinit的時候會設定 ncpu這個全域性變數,這裡就是根據cpu核心數和引數GOMAXPROCS來確定p的數量
	procs := ncpu
	if n,ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
		procs = n
	}
	// 生成設定數量的p
	if procresize(procs) != nil {
		throw("unknown runnable goroutine during bootstrap")
	}
}
複製程式碼

3.1.2. mcommoninit

func mcommoninit(mp *m) {
	_g_ := getg()

	lock(&sched.lock)
	// 判斷mnext的值是否溢位,mnext需要賦值給m.id
	if sched.mnext+1 < sched.mnext {
		throw("runtime: thread ID overflow")
	}
	mp.id = sched.mnext
	sched.mnext++
	// 判斷m的數量是否比maxmcount設定的要多,如果超出直接報異常
	checkmcount()
	// 建立一個新的g用於處理signal,並分配棧
	mpreinit(mp)
	if mp.gsignal != nil {
		mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
	}

	// Add to allm so garbage collector doesn't free g->m
	// when it is just in a register or thread-local storage.
	// 接下來的兩行,首先將當前m放到allm的頭,然後原子操作,將當前m的地址,賦值給m,這樣就將當前m新增到了allm連結串列的頭了
	mp.alllink = allm

	// NumCgoCall() iterates over allm w/o schedlock,
	// so we need to publish it safely.
	atomicstorep(unsafe.Pointer(&allm),unsafe.Pointer(mp))
	unlock(&sched.lock)

	// Allocate memory to hold a cgo traceback if the cgo call crashes.
	if iscgo || GOOS == "solaris" || GOOS == "windows" {
		mp.cgoCallers = new(cgoCallers)
	}
}
複製程式碼

在這裡就開始涉及到了m連結串列了,這個連結串列可以如下圖表示,其他的p g連結串列可以參考,只是使用的結構體的欄位不一樣

3.1.3. allm連結串列示意圖

3.1.4. procresize

更改p的數量,多退少補的原則,在初始化過程中,由於最開始是沒有p的,所以這裡的作用就是初始化設定數量的p了

procesize 不僅在初始化的時候會呼叫,當使用者手動呼叫 runtime.GOMAXPROCS 的時候,會重新設定 nprocs,然後執行 startTheWorld()startTheWorld()會是使用新的 nprocs 再次呼叫procresize 這個方法

func procresize(nprocs int32) *p {
	old := gomaxprocs
	if old < 0 || nprocs <= 0 {
		throw("procresize: invalid arg")
	}
	// update statistics
	now := nanotime()
	if sched.procresizetime != 0 {
		sched.totaltime += int64(old) * (now - sched.procresizetime)
	}
	sched.procresizetime = now

	// Grow allp if necessary.
	// 如果新給的p的數量比原先的p的數量多,則新建增長的p
	if nprocs > int32(len(allp)) {
		// Synchronize with retake,which could be running
		// concurrently since it doesn't run on a P.
		lock(&allpLock)
		// 判斷allp 的cap是否滿足增長後的長度,滿足就直接使用,不滿足,則需要擴張這個slice
		if nprocs <= int32(cap(allp)) {
			allp = allp[:nprocs]
		} else {
			nallp := make([]*p,nprocs)
			// Copy everything up to allp's cap so we
			// never lose old allocated Ps.
			copy(nallp,allp[:cap(allp)])
			allp = nallp
		}
		unlock(&allpLock)
	}

	// initialize new P's
	// 初始化新增的p
	for i := int32(0); i < nprocs; i++ {
		pp := allp[i]
		if pp == nil {
			pp = new(p)
			pp.id = i
			pp.status = _Pgcstop
			pp.sudogcache = pp.sudogbuf[:0]
			for i := range pp.deferpool {
				pp.deferpool[i] = pp.deferpoolbuf[i][:0]
			}
			pp.wbBuf.reset()
			// allp是一個slice,直接將新增的p放到對應的索引下面就ok了
			atomicstorep(unsafe.Pointer(&allp[i]),unsafe.Pointer(pp))
		}
		if pp.mcache == nil {
			// 初始化時,old=0,第一個新建的p給當前的m使用
			if old == 0 && i == 0 {
				if getg().m.mcache == nil {
					throw("missing mcache?")
				}
				pp.mcache = getg().m.mcache // bootstrap
			} else {
				// 為p分配記憶體
				pp.mcache = allocmcache()
			}
		}
	}

	// free unused P's
	// 釋放掉多餘的p,當新設定的p的數量,比原先設定的p的數量少的時候,會走到這個流程
	// 通過 runtime.GOMAXPROCS 就可以動態的修改nprocs
	for i := nprocs; i < old; i++ {
		p := allp[i]
		// move all runnable goroutines to the global queue
		// 把當前p的執行佇列裡的g轉移到全域性的g的佇列
		for p.runqhead != p.runqtail {
			// pop from tail of local queue
			p.runqtail--
			gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr()
			// push onto head of global queue
			globrunqputhead(gp)
		}
		// 把runnext裡的g也轉移到全域性佇列
		if p.runnext != 0 {
			globrunqputhead(p.runnext.ptr())
			p.runnext = 0
		}
		// if there's a background worker,make it runnable and put
		// it on the global queue so it can clean itself up
		// 如果有gc worker的話,修改g的狀態,然後再把它放到全域性佇列中
		if gp := p.gcBgMarkWorker.ptr(); gp != nil {
			casgstatus(gp,_Gwaiting,_Grunnable)
			globrunqput(gp)
			// This assignment doesn't race because the
			// world is stopped.
			p.gcBgMarkWorker.set(nil)
		}
		// sudoig的buf和cache,以及deferpool全部清空
		for i := range p.sudogbuf {
			p.sudogbuf[i] = nil
		}
		p.sudogcache = p.sudogbuf[:0]
		for i := range p.deferpool {
			for j := range p.deferpoolbuf[i] {
				p.deferpoolbuf[i][j] = nil
			}
			p.deferpool[i] = p.deferpoolbuf[i][:0]
		}
		// 釋放掉當前p的mcache
		freemcache(p.mcache)
		p.mcache = nil
		// 把當前p的gfree轉移到全域性
		gfpurge(p)
		// 修改p的狀態,讓他自生自滅去了
		p.status = _Pdead
		// can't free P itself because it can be referenced by an M in syscall
	}

	// Trim allp.
	if int32(len(allp)) != nprocs {
		lock(&allpLock)
		allp = allp[:nprocs]
		unlock(&allpLock)
	}
	// 判斷當前g是否有p,有的話更改當前使用的p的狀態,繼續使用
	_g_ := getg()
	if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
		// continue to use the current P
		_g_.m.p.ptr().status = _Prunning
	} else {
		// release the current P and acquire allp[0]
		// 如果當前g有p,但是擁有的是已經釋放的p,則不再使用這個p,重新分配
		if _g_.m.p != 0 {
			_g_.m.p.ptr().m = 0
		}
		// 分配allp[0]給當前g使用
		_g_.m.p = 0
		_g_.m.mcache = nil
		p := allp[0]
		p.m = 0
		p.status = _Pidle
		// 將p m g繫結,並把m.mcache指向p.mcache,並修改p的狀態為_Prunning
		acquirep(p)
	}
	var runnablePs *p
	for i := nprocs - 1; i >= 0; i-- {
		p := allp[i]
		if _g_.m.p.ptr() == p {
			continue
		}
		p.status = _Pidle
		// 根據 runqempty 來判斷當前p的g執行佇列是否為空
		if runqempty(p) {
			// g執行佇列為空的p,放到 sched的pidle佇列裡面
			pidleput(p)
		} else {
			// g 執行佇列不為空的p,組成一個可執行佇列,並最後返回
			p.m.set(mget())
			p.link.set(runnablePs)
			runnablePs = p
		}
	}
	stealOrder.reset(uint32(nprocs))
	var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
	atomic.Store((*uint32)(unsafe.Pointer(int32p)),uint32(nprocs))
	return runnablePs
}
複製程式碼
  • runqempty: 這個函式比較簡單,就不深究了,就是根據 p.runqtail == p.runqhead 和 p.runnext 來判斷有沒有待執行的g
  • pidleput: 將當前的p設定為 sched.pidle,然後根據p.link將空閒p串聯起來,可參考上圖allm的連結串列示意圖

3.2. 任務

建立一個goroutine,只需要使用 go func 就可以了,編譯器會將go func 翻譯成 newproc 進行呼叫,那麼新建的任務是如何呼叫的呢,我們從建立開始進行跟蹤

3.2.1. newproc

newproc 函式獲取了引數和當前g的pc資訊,並通過g0呼叫newproc1去真正的執行建立或獲取可用的g

func newproc(siz int32,fn *funcval) {
	// 獲取第一引數地址
	argp := add(unsafe.Pointer(&fn),sys.PtrSize)
	// 獲取當前執行的g
	gp := getg()
	// 獲取當前g的pc
	pc := getcallerpc()
	systemstack(func() {
		// 使用g0去執行newproc1函式
		newproc1(fn,(*uint8)(argp),siz,gp,pc)
	})
}
複製程式碼

3.2.2. newproc1

newporc1 的作用就是建立或者獲取一個空間的g,初始化這個g,並嘗試尋找一個p和m去執行g

func newproc1(fn *funcval,argp *uint8,narg int32,callergp *g,callerpc uintptr) {
	_g_ := getg()

	if fn == nil {
		_g_.m.throwing = -1 // do not dump full stacks
		throw("go of nil func value")
	}
	// 加鎖禁止被搶佔
	_g_.m.locks++ // disable preemption because it can be holding p in a local var
	siz := narg
	siz = (siz + 7) &^ 7

	// We could allocate a larger initial stack if necessary.
	// Not worth it: this is almost always an error.
	// 4*sizeof(uintreg): extra space added below
	// sizeof(uintreg): caller's LR (arm) or return address (x86,in gostartcall).
	// 如果引數過多,則直接丟擲異常,棧大小是2k
	if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
		throw("newproc: function arguments too large for new goroutine")
	}

	_p_ := _g_.m.p.ptr()
	// 嘗試獲取一個空閒的g,如果獲取不到,則新建一個,並新增到allg裡面
	// gfget首先會嘗試從p本地獲取空閒的g,如果本地沒有的話,則從全域性獲取一堆平衡到本地p
	newg := gfget(_p_)
	if newg == nil {
		newg = malg(_StackMin)
		casgstatus(newg,_Gidle,_Gdead)
		// 新建的g,新增到全域性的 allg裡面,allg是一個slice, append進去即可
		allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
	}
	// 判斷獲取的g的棧是否正常
	if newg.stack.hi == 0 {
		throw("newproc1: newg missing stack")
	}
	// 判斷g的狀態是否正常
	if readgstatus(newg) != _Gdead {
		throw("newproc1: new g is not Gdead")
	}
	// 預留一點空間,防止讀取超出一點點
	totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
	// 空間大小進行對齊
	totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
	sp := newg.stack.hi - totalSize
	spArg := sp
	// usesLr 為0,這裡不執行
	if usesLR {
		// caller's LR
		*(*uintptr)(unsafe.Pointer(sp)) = 0
		prepGoExitFrame(sp)
		spArg += sys.MinFrameSize
	}
	if narg > 0 {
		// 將引數拷貝入棧
		memmove(unsafe.Pointer(spArg),unsafe.Pointer(argp),uintptr(narg))
		// ... 省略 ...
	}
	// 初始化用於儲存現場的區域及初始化基本狀態
	memclrNoHeapPointers(unsafe.Pointer(&newg.sched),unsafe.Sizeof(newg.sched))
	newg.sched.sp = sp
	newg.stktopsp = sp
	// 這裡儲存了goexit的地址,在使用者函式執行完成後,會根據pc來執行goexit
	newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
	newg.sched.g = guintptr(unsafe.Pointer(newg))
	// 這裡調整 sched 資訊,pc = goexit的地址
	gostartcallfn(&newg.sched,fn)
	newg.gopc = callerpc
	newg.ancestors = saveAncestors(callergp)
	newg.startpc = fn.fn
	if _g_.m.curg != nil {
		newg.labels = _g_.m.curg.labels
	}
	if isSystemGoroutine(newg) {
		atomic.Xadd(&sched.ngsys,+1)
	}
	newg.gcscanvalid = false
	casgstatus(newg,_Gdead,_Grunnable)
	// 如果p快取的goid已經用完,本地再從sched批量獲取一點
	if _p_.goidcache == _p_.goidcacheend {
		// Sched.goidgen is the last allocated id,
		// this batch must be [sched.goidgen+1,sched.goidgen+GoidCacheBatch].
		// At startup sched.goidgen=0,so main goroutine receives goid=1.
		_p_.goidcache = atomic.Xadd64(&sched.goidgen,_GoidCacheBatch)
		_p_.goidcache -= _GoidCacheBatch - 1
		_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
	}
	// 分配goid
	newg.goid = int64(_p_.goidcache)
	_p_.goidcache++
	// 把新的g放到 p 的可執行g佇列中
	runqput(_p_,newg,true)
	// 判斷是否有空閒p,且是否需要喚醒一個m來執行g
	if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
		wakep()
	}
	_g_.m.locks--
	if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
		_g_.stackguard0 = stackPreempt
	}
}
複製程式碼

3.2.2.1. gfget

這個函式的邏輯比較簡單,就是看一下p有沒有空閒的g,沒有則去全域性的freeg佇列查詢,這裡就涉及了p本地和全域性平衡的一個互動了

func gfget(_p_ *p) *g {
retry:
	gp := _p_.gfree
	// 本地的g佇列為空,且全域性佇列不為空,則從全域性佇列一次獲取至多32個下來,如果全域性佇列不夠就算了
	if gp == nil && (sched.gfreeStack != nil || sched.gfreeNoStack != nil) {
		lock(&sched.gflock)
		for _p_.gfreecnt < 32 {
			if sched.gfreeStack != nil {
				// Prefer Gs with stacks.
				gp = sched.gfreeStack
				sched.gfreeStack = gp.schedlink.ptr()
			} else if sched.gfreeNoStack != nil {
				gp = sched.gfreeNoStack
				sched.gfreeNoStack = gp.schedlink.ptr()
			} else {
				break
			}
			_p_.gfreecnt++
			sched.ngfree--
			gp.schedlink.set(_p_.gfree)
			_p_.gfree = gp
		}
		// 已經從全域性拿了g了,再去從頭開始判斷
		unlock(&sched.gflock)
		goto retry
	}
	// 如果拿到了g,則判斷g是否有棧,沒有棧就分配
	// 棧的分配跟記憶體分配差不多,首先建立幾個固定大小的棧的陣列,然後到指定大小的陣列裡面去分配就ok了,過大則直接全域性分配
	if gp != nil {
		_p_.gfree = gp.schedlink.ptr()
		_p_.gfreecnt--
		if gp.stack.lo == 0 {
			// Stack was deallocated in gfput. Allocate a new one.
			systemstack(func() {
				gp.stack = stackalloc(_FixedStack)
			})
			gp.stackguard0 = gp.stack.lo + _StackGuard
		} else {
			// ... 省略 ...
		}
	}
	// 注意: 如果全域性沒有g,p也沒有g,則返回的gp還是nil
	return gp
}
複製程式碼

3.2.2.2. runqput

runqput會把g放到p的本地佇列或者p.runnext,如果p的本地佇列過長,則把g到全域性佇列,同時平衡p本地佇列的一半到全域性

func runqput(_p_ *p,gp *g,next bool) {
	if randomizeScheduler && next && fastrand()%2 == 0 {
		next = false
	}
	// 如果next為true,則放入到p.runnext裡面,並把原先runnext的g交換出來
	if next {
	retryNext:
		oldnext := _p_.runnext
		if !_p_.runnext.cas(oldnext,guintptr(unsafe.Pointer(gp))) {
			goto retryNext
		}
		if oldnext == 0 {
			return
		}
		// Kick the old runnext out to the regular run queue.
		gp = oldnext.ptr()
	}

retry:
	h := atomic.Load(&_p_.runqhead) // load-acquire,synchronize with consumers
	t := _p_.runqtail
	// 判斷p的佇列的長度是否超了, runq是一個長度為256的陣列,超出的話就會放到全域性隊列了
	if t-h < uint32(len(_p_.runq)) {
		_p_.runq[t%uint32(len(_p_.runq))].set(gp)
		atomic.Store(&_p_.runqtail,t+1) // store-release,makes the item available for consumption
		return
	}
	// 把g放到全域性佇列
	if runqputslow(_p_,h,t) {
		return
	}
	// the queue is not full,now the put above must succeed
	goto retry
}
複製程式碼

3.2.2.3. runqputslow

func runqputslow(_p_ *p,t uint32) bool {
	var batch [len(_p_.runq)/2 + 1]*g

	// First,grab a batch from local queue.
	n := t - h
	n = n / 2
	if n != uint32(len(_p_.runq)/2) {
		throw("runqputslow: queue is not full")
	}
	// 獲取p後面的一半
	for i := uint32(0); i < n; i++ {
		batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
	}
	if !atomic.Cas(&_p_.runqhead,h+n) { // cas-release,commits consume
		return false
	}
	batch[n] = gp

	// Link the goroutines.
	for i := uint32(0); i < n; i++ {
		batch[i].schedlink.set(batch[i+1])
	}

	// Now put the batch on global queue.
	// 放到全域性佇列隊尾
	lock(&sched.lock)
	globrunqputbatch(batch[0],batch[n],int32(n+1))
	unlock(&sched.lock)
	return true
}
複製程式碼

新建任務至此基本結束,建立完成任務後,等待排程執行就好了,從上面可以看出,任務的優先順序是 p.runnext > p.runq > sched.runq

g從建立到執行結束並放入free佇列中的狀態轉換大致如下圖所示

3.2.3 wakep

當 newproc1建立完任務後,會嘗試喚醒m來執行任務

func wakep() {
	// be conservative about spinning threads
	// 一次應該只有一個m在spining,否則就退出
	if !atomic.Cas(&sched.nmspinning,0,1) {
		return
	}
	// 呼叫startm來執行
	startm(nil,true)
}
複製程式碼

3.2.4 startm

排程m或者建立m來執行p,如果p==nil,就會嘗試獲取一個空閒p,p的佇列中有g,拿到p後才能拿到g

func startm(_p_ *p,spinning bool) {
	lock(&sched.lock)
	if _p_ == nil {
		// 如果沒有指定p,則從sched.pidle獲取空閒的p
		_p_ = pidleget()
		if _p_ == nil {
			unlock(&sched.lock)
			// 如果沒有獲取到p,重置nmspinning
			if spinning {
				// The caller incremented nmspinning,but there are no idle Ps,
				// so it's okay to just undo the increment and give up.
				if int32(atomic.Xadd(&sched.nmspinning,-1)) < 0 {
					throw("startm: negative nmspinning")
				}
			}
			return
		}
	}
	// 首先嚐試從 sched.midle獲取一個空閒的m
	mp := mget()
	unlock(&sched.lock)
	if mp == nil {
		// 如果獲取不到空閒的m,則建立一個 mspining = true的m,並將p繫結到m上,直接返回
		var fn func()
		if spinning {
			// The caller incremented nmspinning,so set m.spinning in the new M.
			fn = mspinning
		}
		newm(fn,_p_)
		return
	}
	// 判斷獲取到的空閒m是否是spining狀態
	if mp.spinning {
		throw("startm: m is spinning")
	}
	// 判斷獲取到的m是否有p
	if mp.nextp != 0 {
		throw("startm: m has p")
	}
	if spinning && !runqempty(_p_) {
		throw("startm: p has runnable gs")
	}
	// The caller incremented nmspinning,so set m.spinning in the new M.
	// 呼叫函式的父函式已經增加了nmspinning, 這裡只需要設定m.spining就ok了,同時把p綁上來
	mp.spinning = spinning
	mp.nextp.set(_p_)
	// 喚醒m
	notewakeup(&mp.park)
}
複製程式碼

3.2.4.1. newm

newm 通過allocm函式來建立新m

func newm(fn func(),_p_ *p) {
	// 新建一個m
	mp := allocm(_p_,fn)
	// 為這個新建的m繫結指定的p
	mp.nextp.set(_p_)
	// ... 省略 ...
	// 建立系統執行緒
	newm1(mp)
}
複製程式碼

3.2.4.2. new1m

func newm1(mp *m) {
	// runtime cgo包會把iscgo設定為true,這裡不分析
	if iscgo {
		var ts cgothreadstart
		if _cgo_thread_start == nil {
			throw("_cgo_thread_start missing")
		}
		ts.g.set(mp.g0)
		ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
		ts.fn = unsafe.Pointer(funcPC(mstart))
		if msanenabled {
			msanwrite(unsafe.Pointer(&ts),unsafe.Sizeof(ts))
		}
		execLock.rlock() // Prevent process clone.
		asmcgocall(_cgo_thread_start,unsafe.Pointer(&ts))
		execLock.runlock()
		return
	}
	execLock.rlock() // Prevent process clone.
	newosproc(mp)
	execLock.runlock()
}
複製程式碼

3.2.4.3. newosproc

newosproc 建立一個新的系統執行緒,並執行mstart_stub函式,之後呼叫mstart函式進入排程,後面在執行流程會分析

func newosproc(mp *m) {
	stk := unsafe.Pointer(mp.g0.stack.hi)
	// Initialize an attribute object.
	var attr pthreadattr
	var err int32
	err = pthread_attr_init(&attr)

	// Finally,create the thread. It starts at mstart_stub,which does some low-level
	// setup and then calls mstart.
	var oset sigset
	sigprocmask(_SIG_SETMASK,&sigset_all,&oset)
	// 建立執行緒,並傳入啟動啟動函式 mstart_stub, mstart_stub 之後呼叫mstart
	err = pthread_create(&attr,funcPC(mstart_stub),unsafe.Pointer(mp))
	sigprocmask(_SIG_SETMASK,&oset,nil)
	if err != 0 {
		write(2,unsafe.Pointer(&failthreadcreate[0]),int32(len(failthreadcreate)))
		exit(1)
	}
}
複製程式碼

3.2.4.4. allocm

allocm這裡首先會釋放 sched的freem,然後再去建立m,並初始化m

func allocm(_p_ *p,fn func()) *m {
	_g_ := getg()
	_g_.m.locks++ // disable GC because it can be called from sysmon
	if _g_.m.p == 0 {
		acquirep(_p_) // temporarily borrow p for mallocs in this function
	}

	// Release the free M list. We need to do this somewhere and
	// this may free up a stack we can use.
	// 首先釋放掉freem列表
	if sched.freem != nil {
		lock(&sched.lock)
		var newList *m
		for freem := sched.freem; freem != nil; {
			if freem.freeWait != 0 {
				next := freem.freelink
				freem.freelink = newList
				newList = freem
				freem = next
				continue
			}
			stackfree(freem.g0.stack)
			freem = freem.freelink
		}
		sched.freem = newList
		unlock(&sched.lock)
	}

	mp := new(m)
	// 啟動函式,根據startm呼叫來看,這個fn就是 mspinning, 會將m.mspinning設定為true
	mp.mstartfn = fn
	// 初始化m,上面已經分析了
	mcommoninit(mp)
	// In case of cgo or Solaris or Darwin,pthread_create will make us a stack.
	// Windows and Plan 9 will layout sched stack on OS stack.
	// 為新的m建立g0
	if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" {
		mp.g0 = malg(-1)
	} else {
		mp.g0 = malg(8192 * sys.StackGuardMultiplier)
	}
	// 為mp的g0繫結自己
	mp.g0.m = mp
	// 如果當前的m所繫結的是引數傳遞過來的p,解除繫結,因為引數傳遞過來的p稍後要繫結新建的m
	if _p_ == _g_.m.p.ptr() {
		releasep()
	}

	_g_.m.locks--
	if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
		_g_.stackguard0 = stackPreempt
	}

	return mp
}
複製程式碼

3.2.4.5. notewakeup

func notewakeup(n *note) {
	var v uintptr
	// 設定m 為locked
	for {
		v = atomic.Loaduintptr(&n.key)
		if atomic.Casuintptr(&n.key,v,locked) {
			break
		}
	}

	// Successfully set waitm to locked.
	// What was it before?
	// 根據m的原先的狀態,來判斷後面的執行流程,0則直接返回,locked則衝突,否則認為是wating,喚醒
	switch {
	case v == 0:
		// Nothing was waiting. Done.
	case v == locked:
		// Two notewakeups! Not allowed.
		throw("notewakeup - double wakeup")
	default:
		// Must be the waiting m. Wake it up.
		// 喚醒系統執行緒
		semawakeup((*m)(unsafe.Pointer(v)))
	}
}
複製程式碼

至此的話,建立完任務g後,將g放入了p的local佇列或者是全域性佇列,然後開始獲取了一個空閒的m或者新建一個m來執行g,m,p,g 都已經準備完成了,下面就是開始排程,來執行任務g了

3.3. 執行

在startm函式分析的過程中會,可以看到,有兩種獲取m的方式

  • 新建: 這時候執行newm1下的newosproc,同時最終呼叫mstart來執行排程
  • 喚醒空閒m:從休眠的地方繼續執行

m執行g有兩個起點,一個是執行緒啟動函式 mstart, 另一個則是休眠被喚醒後的排程schedule了,我們從頭開始,也就是mstartmstart 走到最後也是 schedule 排程

3.3.1. mstart

func mstart() {
	_g_ := getg()

	osStack := _g_.stack.lo == 0
	if osStack {
		// Initialize stack bounds from system stack.
		// Cgo may have left stack size in stack.hi.
		// minit may update the stack bounds.
		// 從系統堆疊上直接劃出所需的範圍
		size := _g_.stack.hi
		if size == 0 {
			size = 8192 * sys.StackGuardMultiplier
		}
		_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
		_g_.stack.lo = _g_.stack.hi - size + 1024
	}
	// Initialize stack guards so that we can start calling
	// both Go and C functions with stack growth prologues.
	_g_.stackguard0 = _g_.stack.lo + _StackGuard
	_g_.stackguard1 = _g_.stackguard0
	// 呼叫mstart1來處理
	mstart1()

	// Exit this thread.
	if GOOS == "windows" || GOOS == "solaris" || GOOS == "plan9" || GOOS == "darwin" {
		// Window,Solaris,Darwin and Plan 9 always system-allocate
		// the stack,but put it in _g_.stack before mstart,
		// so the logic above hasn't set osStack yet.
		osStack = true
	}
	// 退出m,正常情況下mstart1呼叫schedule() 時,是不再返回的,所以,不用擔心繫統執行緒的頻繁建立退出
	mexit(osStack)
}
複製程式碼

3.3.2. mstart1

func mstart1() {
	_g_ := getg()

	if _g_ != _g_.m.g0 {
		throw("bad runtime·mstart")
	}

	// Record the caller for use as the top of stack in mcall and
	// for terminating the thread.
	// We're never coming back to mstart1 after we call schedule,
	// so other calls can reuse the current frame.
	// 儲存呼叫者的pc sp等資訊
	save(getcallerpc(),getcallersp())
	asminit()
	// 初始化m的sigal的棧和mask
	minit()

	// Install signal handlers; after minit so that minit can
	// prepare the thread to be able to handle the signals.
	// 安裝sigal處理器
	if _g_.m == &m0 {
		mstartm0()
	}
	// 如果設定了mstartfn,就先執行這個
	if fn := _g_.m.mstartfn; fn != nil {
		fn()
	}

	if _g_.m.helpgc != 0 {
		_g_.m.helpgc = 0
		stopm()
	} else if _g_.m != &m0 {
		// 獲取nextp
		acquirep(_g_.m.nextp.ptr())
		_g_.m.nextp = 0
	}
	schedule()
}
複製程式碼

3.3.2.1. acquirep

acquirep 函式主要是改變p的狀態,繫結 m p,通過吧p的mcache與m共享

func acquirep(_p_ *p) {
	// Do the part that isn't allowed to have write barriers.
	acquirep1(_p_)

	// have p; write barriers now allowed
	_g_ := getg()
	// 把p的mcache與m共享
	_g_.m.mcache = _p_.mcache
}
複製程式碼

3.3.2.2. acquirep1

func acquirep1(_p_ *p) {
	_g_ := getg()

	// 讓m p互相繫結
	_g_.m.p.set(_p_)
	_p_.m.set(_g_.m)
	_p_.status = _Prunning
}
複製程式碼

3.3.2.3. schedule

開始進入到排程函式了,這是一個由schedule、execute、goroutine fn、goexit構成的邏輯迴圈,就算m是喚醒後,也是從設定的斷點開始執行

func schedule() {
	_g_ := getg()

	if _g_.m.locks != 0 {
		throw("schedule: holding locks")
	}
	// 如果有lockg,停止執行當前的m
	if _g_.m.lockedg != 0 {
		// 解除lockedm的鎖定,並執行當前g
		stoplockedm()
		execute(_g_.m.lockedg.ptr(),false) // Never returns.
	}

	// We should not schedule away from a g that is executing a cgo call,
	// since the cgo call is using the m's g0 stack.
	if _g_.m.incgo {
		throw("schedule: in cgo")
	}

top:
	// gc 等待
	if sched.gcwaiting != 0 {
		gcstopm()
		goto top
	}

	var gp *g
	var inheritTime bool

	if gp == nil {
		// Check the global runnable queue once in a while to ensure fairness.
		// Otherwise two goroutines can completely occupy the local runqueue
		// by constantly respawning each other.
		// 為了保證公平,每隔61次,從全域性佇列上獲取g
		if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
			lock(&sched.lock)
			gp = globrunqget(_g_.m.p.ptr(),1)
			unlock(&sched.lock)
		}
	}
	if gp == nil {
		// 全域性佇列上獲取不到待執行的g,則從p local佇列中獲取
		gp,inheritTime = runqget(_g_.m.p.ptr())
		if gp != nil && _g_.m.spinning {
			throw("schedule: spinning with local work")
		}
	}
	if gp == nil {
		// 如果p local獲取不到待執行g,則開始查詢,這個函式會從 全域性 io poll, p locl和其他p local獲取待執行的g,後面詳細分析
		gp,inheritTime = findrunnable() // blocks until work is available
	}

	// This thread is going to run a goroutine and is not spinning anymore,
	// so if it was marked as spinning we need to reset it now and potentially
	// start a new spinning M.
	if _g_.m.spinning {
		// 如果m是自旋狀態,取消自旋
		resetspinning()
	}

	if gp.lockedm != 0 {
		// Hands off own p to the locked m,
		// then blocks waiting for a new p.
		// 如果g有lockedm,則休眠上交p,休眠m,等待新的m,喚醒後從這裡開始執行,跳轉到top
		startlockedm(gp)
		goto top
	}
	// 開始執行這個g
	execute(gp,inheritTime)
}
複製程式碼
3.3.2.3.1. stoplockedm

因為當前的m綁定了lockedg,而當前g不是指定的lockedg,所以這個m不能執行,上交當前m繫結的p,並且休眠m直到排程lockedg

func stoplockedm() {
	_g_ := getg()

	if _g_.m.lockedg == 0 || _g_.m.lockedg.ptr().lockedm.ptr() != _g_.m {
		throw("stoplockedm: inconsistent locking")
	}
	if _g_.m.p != 0 {
		// Schedule another M to run this p.
		// 釋放當前p
		_p_ := releasep()
		handoffp(_p_)
	}
	incidlelocked(1)
	// Wait until another thread schedules lockedg again.
	notesleep(&_g_.m.park)
	noteclear(&_g_.m.park)
	status := readgstatus(_g_.m.lockedg.ptr())
	if status&^_Gscan != _Grunnable {
		print("runtime:stoplockedm: g is not Grunnable or Gscanrunnable\n")
		dumpgstatus(_g_)
		throw("stoplockedm: not runnable")
	}
	// 上交了當前的p,將nextp設定為可執行的p
	acquirep(_g_.m.nextp.ptr())
	_g_.m.nextp = 0
}
複製程式碼
3.3.2.3.2. startlockedm

排程 lockedm去執行lockedg

func startlockedm(gp *g) {
	_g_ := getg()

	mp := gp.lockedm.ptr()
	if mp == _g_.m {
		throw("startlockedm: locked to me")
	}
	if mp.nextp != 0 {
		throw("startlockedm: m has p")
	}
	// directly handoff current P to the locked m
	incidlelocked(-1)
	// 移交當前p給lockedm,並設定為lockedm.nextp,以便於lockedm喚醒後,可以獲取
	_p_ := releasep()
	mp.nextp.set(_p_)
	// m被喚醒後,從m休眠的地方開始執行,也就是schedule()函式中
	notewakeup(&mp.park)
	stopm()
}
複製程式碼
3.3.2.3.3. handoffp
func handoffp(_p_ *p) {
	// handoffp must start an M in any situation where
	// findrunnable would return a G to run on _p_.

	// if it has local work,start it straight away
	if !runqempty(_p_) || sched.runqsize != 0 {
		// 呼叫startm開始排程
		startm(_p_,false)
		return
	}

	// no local work,check that there are no spinning/idle M's,
	// otherwise our help is not required
	// 判斷有沒有正在尋找p的m以及有沒有空閒的p
	if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning,1) { // TODO: fast atomic
		startm(_p_,true)
		return
	}
	lock(&sched.lock)

	if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn,1,0) {
		sched.safePointFn(_p_)
		sched.safePointWait--
		if sched.safePointWait == 0 {
			notewakeup(&sched.safePointNote)
		}
	}
	// 如果 全域性待執行g佇列不為空,嘗試使用startm進行排程
	if sched.runqsize != 0 {
		unlock(&sched.lock)
		startm(_p_,false)
		return
	}
	// If this is the last running P and nobody is polling network,
	// need to wakeup another M to poll network.
	if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
		unlock(&sched.lock)
		startm(_p_,false)
		return
	}
	// 把p放入到全域性的空閒佇列,放回佇列就不多說了,參考allm的放回
	pidleput(_p_)
	unlock(&sched.lock)
}
複製程式碼
3.3.2.3.4. execute

開始執行g的程式碼了

func execute(gp *g,inheritTime bool) {
	_g_ := getg()
	// 更改g的狀態,並不允許搶佔
	casgstatus(gp,_Grunnable,_Grunning)
	gp.waitsince = 0
	gp.preempt = false
	gp.stackguard0 = gp.stack.lo + _StackGuard
	if !inheritTime {
		// 排程計數
		_g_.m.p.ptr().schedtick++
	}
	_g_.m.curg = gp
	gp.m = _g_.m
	// 開始執行g的程式碼了
	gogo(&gp.sched)
}
複製程式碼
3.3.2.3.5. gogo

gogo函式承載的作用就是切換到g的棧,開始執行g的程式碼,彙編內容就不分析了,但是有一個疑問就是,gogo執行完函式後,怎麼再次進入排程呢?

我們回到newproc1函式的L63 newg.sched.pc = funcPC(goexit) + sys.PCQuantum ,這裡儲存了pc的質地為goexit的地址,所以當執行完使用者程式碼後,就會進入 goexit 函式

3.3.2.3.6. goexit0

goexit 在彙編層面就是呼叫 runtime.goexit1,而goexit1通過 mcall 呼叫了goexit0 所以這裡直接分析了goexit0

goexit0 重置g的狀態,並重新進行排程,這樣就排程就又回到了schedule() 了,開始迴圈往復的排程

func goexit0(gp *g) {
	_g_ := getg()
	// 轉換g的狀態為dead,以放回空閒列表
	casgstatus(gp,_Grunning,_Gdead)
	if isSystemGoroutine(gp) {
		atomic.Xadd(&sched.ngsys,-1)
	}
	// 清空g的狀態
	gp.m = nil
	locked := gp.lockedm != 0
	gp.lockedm = 0
	_g_.m.lockedg = 0
	gp.paniconfault = false
	gp._defer = nil // should be true already but just in case.
	gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
	gp.writebuf = nil
	gp.waitreason = 0
	gp.param = nil
	gp.labels = nil
	gp.timer = nil

	// Note that gp's stack scan is now "valid" because it has no
	// stack.
	gp.gcscanvalid = true
	dropg()

	// 把g放回空閒列表,以備複用
	gfput(_g_.m.p.ptr(),gp)
	// 再次進入排程迴圈
	schedule()
}
複製程式碼

至此,單次排程結束,再次進入排程,迴圈往復

3.3.2.3.7. findrunnable
func findrunnable() (gp *g,inheritTime bool) {
	_g_ := getg()

	// The conditions here and in handoffp must agree: if
	// findrunnable would return a G to run,handoffp must start
	// an M.

top:
	_p_ := _g_.m.p.ptr()

	// local runq
	// 從p local 去獲取g
	if gp,inheritTime := runqget(_p_); gp != nil {
		return gp,inheritTime
	}

	// global runq
	// 從全域性的待執行d佇列獲取
	if sched.runqsize != 0 {
		lock(&sched.lock)
		gp := globrunqget(_p_,0)
		unlock(&sched.lock)
		if gp != nil {
			return gp,false
		}
	}

	// Poll network.
	// This netpoll is only an optimization before we resort to stealing.
	// We can safely skip it if there are no waiters or a thread is blocked
	// in netpoll already. If there is any kind of logical race with that
	// blocked thread (e.g. it has already returned from netpoll,but does
	// not set lastpoll yet),this thread will do blocking netpoll below
	// anyway.
	// 看看netpoll中有沒有已經準備好的g
	if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
		if gp := netpoll(false); gp != nil { // non-blocking
			// netpoll returns list of goroutines linked by schedlink.
			injectglist(gp.schedlink.ptr())
			casgstatus(gp,_Grunnable)
			if trace.enabled {
				traceGoUnpark(gp,0)
			}
			return gp,false
		}
	}

	// Steal work from other P's.
	// 如果sched.pidle == procs - 1,說明所有的p都是空閒的,無需遍歷其他p了
	procs := uint32(gomaxprocs)
	if atomic.Load(&sched.npidle) == procs-1 {
		// Either GOMAXPROCS=1 or everybody,except for us,is idle already.
		// New work can appear from returning syscall/cgocall,network or timers.
		// Neither of that submits to local run queues,so no point in stealing.
		goto stop
	}
	// If number of spinning M's >= number of busy P's,block.
	// This is necessary to prevent excessive CPU consumption
	// when GOMAXPROCS>>1 but the program parallelism is low.
	// 如果尋找p的m的數量,大於有g的p的數量的一般,就不再去尋找了
	if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
		goto stop
	}
	// 設定當前m的自旋狀態
	if !_g_.m.spinning {
		_g_.m.spinning = true
		atomic.Xadd(&sched.nmspinning,1)
	}
	// 開始竊取其他p的待執行g了
	for i := 0; i < 4; i++ {
		for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
			if sched.gcwaiting != 0 {
				goto top
			}
			stealRunNextG := i > 2 // first look for ready queues with more than 1 g
			// 從其他的p偷取一般的任務數量,還會隨機偷取p的runnext(過分了),偷取部分就不分析了,就是slice的操作而已
			if gp := runqsteal(_p_,allp[enum.position()],stealRunNextG); gp != nil {
				return gp,false
			}
		}
	}

stop:
	// 對all做個映象備份
	allpSnapshot := allp

	// return P and block
	lock(&sched.lock)

	if sched.runqsize != 0 {
		gp := globrunqget(_p_,0)
		unlock(&sched.lock)
		return gp,false
	}
	if releasep() != _p_ {
		throw("findrunnable: wrong p")
	}
	pidleput(_p_)
	unlock(&sched.lock)

	wasSpinning := _g_.m.spinning
	if _g_.m.spinning {
		// 設定非自旋狀態,因為找p的工作已經結束了
		_g_.m.spinning = false
		if int32(atomic.Xadd(&sched.nmspinning,-1)) < 0 {
			throw("findrunnable: negative nmspinning")
		}
	}

	// check all runqueues once again
	for _,_p_ := range allpSnapshot {
		if !runqempty(_p_) {
			lock(&sched.lock)
			_p_ = pidleget()
			unlock(&sched.lock)
			if _p_ != nil {
				acquirep(_p_)
				if wasSpinning {
					_g_.m.spinning = true
					atomic.Xadd(&sched.nmspinning,1)
				}
				goto top
			}
			break
		}
	}
	// poll network
	if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll,0) != 0 {
		if _g_.m.p != 0 {
			throw("findrunnable: netpoll with p")
		}
		if _g_.m.spinning {
			throw("findrunnable: netpoll with spinning")
		}
		gp := netpoll(true) // block until new work is available
		atomic.Store64(&sched.lastpoll,uint64(nanotime()))
		if gp != nil {
			lock(&sched.lock)
			_p_ = pidleget()
			unlock(&sched.lock)
			if _p_ != nil {
				acquirep(_p_)
				injectglist(gp.schedlink.ptr())
				casgstatus(gp,_Grunnable)
				if trace.enabled {
					traceGoUnpark(gp,0)
				}
				return gp,false
			}
			injectglist(gp)
		}
	}
	stopm()
	goto top
}
複製程式碼

這裡真的是無奈啊,為了尋找一個可執行的g,也是煞費苦心,及時進入了stop 的label,還是不死心,又來了一邊尋找。大致尋找過程可以總結為一下幾個:

  • 從p自己的local佇列中獲取可執行的g
  • 從全域性佇列中獲取可執行的g
  • 從netpoll中獲取一個已經準備好的g
  • 從其他p的local佇列中獲取可執行的g,隨機偷取p的runnext,有點任性
  • 無論如何都獲取不到的話,就stopm了
3.3.2.3.7. stopm

stop會把當前m放到空閒列表裡面,同時繫結m.nextp 與 m

func stopm() {
	_g_ := getg()
retry:
	lock(&sched.lock)
	// 把當前m放到sched.midle 的空閒列表裡
	mput(_g_.m)
	unlock(&sched.lock)
	// 休眠,等待被喚醒
	notesleep(&_g_.m.park)
	noteclear(&_g_.m.park)
	// 繫結p
	acquirep(_g_.m.nextp.ptr())
	_g_.m.nextp = 0
}
複製程式碼

3.4. 監控

3.4.1. sysmon

go的監控是依靠函式 sysmon 來完成的,監控主要做一下幾件事

  • 釋放閒置超過5分鐘的span實體記憶體
  • 如果超過兩分鐘沒有執行垃圾回收,則強制執行
  • 將長時間未處理的netpoll結果新增到任務佇列
  • 向長時間執行的g進行搶佔
  • 收回因為syscall而長時間阻塞的p

監控執行緒並不是時刻在執行的,監控執行緒首次休眠20us,每次執行完後,增加一倍的休眠時間,但是最多休眠10ms

func sysmon() {
	lock(&sched.lock)
	sched.nmsys++
	checkdead()
	unlock(&sched.lock)

	// If a heap span goes unused for 5 minutes after a garbage collection,
	// we hand it back to the operating system.
	scavengelimit := int64(5 * 60 * 1e9)

	if debug.scavenge > 0 {
		// Scavenge-a-lot for testing.
		forcegcperiod = 10 * 1e6
		scavengelimit = 20 * 1e6
	}

	lastscavenge := nanotime()
	nscavenge := 0

	lasttrace := int64(0)
	idle := 0 // how many cycles in succession we had not wokeup somebody
	delay := uint32(0)
	for {
		// 判斷當前迴圈,應該休眠的時間
		if idle == 0 { // start with 20us sleep...
			delay = 20
		} else if idle > 50 { // start doubling the sleep after 1ms...
			delay *= 2
		}
		if delay > 10*1000 { // up to 10ms
			delay = 10 * 1000
		}
		usleep(delay)
		// STW時休眠sysmon
		if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
			lock(&sched.lock)
			if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
				atomic.Store(&sched.sysmonwait,1)
				unlock(&sched.lock)
				// Make wake-up period small enough
				// for the sampling to be correct.
				maxsleep := forcegcperiod / 2
				if scavengelimit < forcegcperiod {
					maxsleep = scavengelimit / 2
				}
				shouldRelax := true
				if osRelaxMinNS > 0 {
					next := timeSleepUntil()
					now := nanotime()
					if next-now < osRelaxMinNS {
						shouldRelax = false
					}
				}
				if shouldRelax {
					osRelax(true)
				}
				// 進行休眠
				notetsleep(&sched.sysmonnote,maxsleep)
				if shouldRelax {
					osRelax(false)
				}
				lock(&sched.lock)
				// 喚醒後,清除休眠狀態,繼續執行
				atomic.Store(&sched.sysmonwait,0)
				noteclear(&sched.sysmonnote)
				idle = 0
				delay = 20
			}
			unlock(&sched.lock)
		}
		// trigger libc interceptors if needed
		if *cgo_yield != nil {
			asmcgocall(*cgo_yield,nil)
		}
		// poll network if not polled for more than 10ms
		lastpoll := int64(atomic.Load64(&sched.lastpoll))
		now := nanotime()
		// 如果netpoll不為空,每隔10ms檢查一下是否有ok的
		if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
			atomic.Cas64(&sched.lastpoll,uint64(lastpoll),uint64(now))
			// 返回了已經獲取到結果的goroutine的列表
			gp := netpoll(false) // non-blocking - returns list of goroutines
			if gp != nil {
				incidlelocked(-1)
				// 把獲取到的g的列表加入到全域性待執行佇列中
				injectglist(gp)
				incidlelocked(1)
			}
		}
		// retake P's blocked in syscalls
		// and preempt long running G's
		// 搶奪syscall長時間阻塞的p和長時間執行的g
		if retake(now) != 0 {
			idle = 0
		} else {
			idle++
		}
		// check if we need to force a GC
		// 通過gcTrigger.test() 函式判斷是否超過設定的強制觸發gc的時間間隔,
		if t := (gcTrigger{kind: gcTriggerTime,now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
			lock(&forcegc.lock)
			forcegc.idle = 0
			forcegc.g.schedlink = 0
			// 把gc的g加入待執行佇列,等待排程執行
			injectglist(forcegc.g)
			unlock(&forcegc.lock)
		}
		// scavenge heap once in a while
		// 判斷是否有5分鐘未使用的span,有的話,歸還給系統
		if lastscavenge+scavengelimit/2 < now {
			mheap_.scavenge(int32(nscavenge),uint64(now),uint64(scavengelimit))
			lastscavenge = now
			nscavenge++
		}
		if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
			lasttrace = now
			schedtrace(debug.scheddetail > 0)
		}
	}
}
複製程式碼

掃描netpoll,並把g存放到去全域性佇列比較好理解,跟前面新增p和m的邏輯差不多,但是搶佔這裡就不是很理解了,你說搶佔就搶佔,被搶佔的g豈不是很沒面子,而且怎麼搶佔呢?

3.4.2. retake

const forcePreemptNS = 10 * 1000 * 1000 // 10ms

func retake(now int64) uint32 {
	n := 0
	// Prevent allp slice changes. This lock will be completely
	// uncontended unless we're already stopping the world.
	lock(&allpLock)
	// We can't use a range loop over allp because we may
	// temporarily drop the allpLock. Hence,we need to re-fetch
	// allp each time around the loop.
	for i := 0; i < len(allp); i++ {
		_p_ := allp[i]
		if _p_ == nil {
			// This can happen if procresize has grown
			// allp but not yet created new Ps.
			continue
		}
		pd := &_p_.sysmontick
		s := _p_.status
		if s == _Psyscall {
			// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
			// pd.syscalltick 即 _p_.sysmontick.syscalltick 只有在sysmon的時候會更新,而 _p_.syscalltick 則會每次都更新,所以,當syscall之後,第一個sysmon檢測到的時候並不會搶佔,而是第二次開始才會搶佔,中間間隔至少有20us,最多會有10ms
			t := int64(_p_.syscalltick)
			if int64(pd.syscalltick) != t {
				pd.syscalltick = uint32(t)
				pd.syscallwhen = now
				continue
			}
			// On the one hand we don't want to retake Ps if there is no other work to do,
			// but on the other hand we want to retake them eventually
			// because they can prevent the sysmon thread from deep sleep.
			// 是否有空p,有尋找p的m,以及當前的p在syscall之後,有沒有超過10ms
			if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
				continue
			}
			// Drop allpLock so we can take sched.lock.
			unlock(&allpLock)
			// Need to decrement number of idle locked M's
			// (pretending that one more is running) before the CAS.
			// Otherwise the M from which we retake can exit the syscall,
			// increment nmidle and report deadlock.
			incidlelocked(-1)
			// 搶佔p,把p的狀態轉為idle狀態
			if atomic.Cas(&_p_.status,s,_Pidle) {
				if trace.enabled {
					traceGoSysBlock(_p_)
					traceProcStop(_p_)
				}
				n++
				_p_.syscalltick++
				// 把當前p移交出去,上面已經分析過了
				handoffp(_p_)
			}
			incidlelocked(1)
			lock(&allpLock)
		} else if s == _Prunning {
			// Preempt G if it's running for too long.
			// 如果p是running狀態,如果p下面的g執行太久了,則搶佔
			t := int64(_p_.schedtick)
			if int64(pd.schedtick) != t {
				pd.schedtick = uint32(t)
				pd.schedwhen = now
				continue
			}
			// 判斷是否超出10ms,不超過不搶佔
			if pd.schedwhen+forcePreemptNS > now {
				continue
			}
			// 開始搶佔
			preemptone(_p_)
		}
	}
	unlock(&allpLock)
	return uint32(n)
}
複製程式碼

3.4.3. preemptone

這個函式的註釋,作者就表明這種搶佔並不是很靠譜?,我們先看一下實現吧

func preemptone(_p_ *p) bool {
	mp := _p_.m.ptr()
	if mp == nil || mp == getg().m {
		return false
	}
	gp := mp.curg
	if gp == nil || gp == mp.g0 {
		return false
	}
	// 標識搶佔欄位
	gp.preempt = true

	// Every call in a go routine checks for stack overflow by
	// comparing the current stack pointer to gp->stackguard0.
	// Setting gp->stackguard0 to StackPreempt folds
	// preemption into the normal stack overflow check.
	// 更新stackguard0,保證能檢測到棧溢
	gp.stackguard0 = stackPreempt
	return true
}
複製程式碼

在這裡,作者會更新 gp.stackguard0 = stackPreempt,然後讓g誤以為棧不夠用了,那就只有乖乖的去進行棧擴張,站擴張的話就用呼叫newstack 分配一個新棧,然後把原先的棧的內容拷貝過去,而在 newstack 裡面有一段如下

if preempt {
	if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
		// Let the goroutine keep running for now.
		// gp->preempt is set,so it will be preempted next time.
		gp.stackguard0 = gp.stack.lo + _StackGuard
		gogo(&gp.sched) // never return
	}
}
複製程式碼

然後這裡就發現g被搶佔了,那你棧不夠用就有可能是假的,但是管你呢,你再去排程去吧,也不給你擴棧了,雖然作者和雨痕大神都吐槽了一下這個,但是這種搶佔方式自動1.5(也可能更早)就一直存在,且穩定執行,就說明還是很牛逼的了

4. 總結

在排程器的設定上,最明顯的就是複用:g 的free連結串列, m的free列表, p的free列表,這樣就避免了重複建立銷燬鎖浪費的資源

其次就是多級快取: 這一塊跟記憶體上的設計思想也是一直的,p一直有一個 g 的待執行佇列,自己沒有貨過多的時候,才會平衡到全域性佇列,全域性佇列操作需要鎖,則本地操作則不需要,大大減少了鎖的建立銷燬所消耗的資源

至此,g m p的關係及狀態轉換大致都講解完成了,由於對彙編這塊比較薄弱,所以基本略過了,右面有機會還是需要多瞭解一點

5. 參考檔案