调度器

在高并发场景当中,一般会起很多的协程(goroutine),这样一来,就会导致阻塞操作;为了解决这些问题,go语言自己实现了一套 调度器,用于调度多个goroutine的执行,协程相对于线程来说很轻量,生命周期非常短暂,速度很快;所以这也就是为什么golang的执行速度非常的快的原因;

那怎么样来调度呢?简单来讲就是将当前cpu核心所持有的协程给其他的cpu执行,怎么给,什么时候给呢?那就需要理解一下P、G、M、sched;

golang在 src/runtime/runtime2.go文件当中有四个结构体类型,通过这四个结构体完成了整个调度器的建模

  • shced:golang的调度器
  • G:即goroutine
  • M:操作系统线程
  • P:类似CPU的核心数

sched

初始化调度器src/runtime/proc.go

// The bootstrap sequence is:
//
//	call osinit
//	call schedinit
//	make & queue new G
//	call runtime·mstart
//
// The new G calls runtime·main.
func schedinit() {
	// raceinit must be the first call to race detector.
	// In particular, it must be done before mallocinit below calls racemapshadow.
	// G的初始化
	_g_ := getg()
	if raceenabled {
		_g_.racectx, raceprocctx0 = raceinit()
	}

	sched.maxmcount = 10000
	
	// M的初始化
	mcommoninit(_g_.m)
	// cpu初始化
	cpuinit()       // must run before alginit
	.......
	// P的初始化
	procs := ncpu
	if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
		procs = n
	}
	......
}

看到调度器初始化的时候会将P、G、M进行初始化;

P

结构体定义

// src/runtime/runtime2.go
type p struct {
	id          int32
	status      uint32 // P的状态,pidle、prunning、pgcstop、psyscall
	......
	mcache      *mcache
	raceprocctx uintptr

	// PPool
	deferpool    [5][]*_defer // pool of available defer structs of different sizes (see panic.go)
	deferpoolbuf [5][32]*_defer

	// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
	goidcache    uint64
	goidcacheend uint64

	// Queue of runnable goroutines. Accessed without lock.
	runqhead uint32
	runqtail uint32
	runq     [256]guintptr
	// runnext, if non-nil, is a runnable G that was ready'd by
	// the current G and should be run next instead of what's in
	// runq if there's time remaining in the running G's time
	// slice. It will inherit the time left in the current time
	// slice. If a set of goroutines is locked in a
	// communicate-and-wait pattern, this schedules that set as a
	// unit and eliminates the (potentially large) scheduling
	// latency that otherwise arises from adding the ready'd
	// goroutines to the end of the run queue.
	runnext guintptr

	// Available G's (status == Gdead)
	gFree struct {
		gList
		n int32
	}
	......
}

P的生命周期(状态)

  • 新建的时候是_Pgcstop
// src/runtime/proc.go
// init initializes pp, which may be a freshly allocated p or a
// previously destroyed p, and transitions it to status _Pgcstop.
func (pp *p) init(id int32) {
	pp.id = id
	pp.status = _Pgcstop
	......
}
  • 初始化完成后,如果有M在运行,则P的状态为_Prunning
// src/runtime/proc.go  method procresize 
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
		// continue to use the current P
		_g_.m.p.ptr().status = _Prunning
		_g_.m.p.ptr().mcache.prepareForSweep()
	} else {
   	.......
		p.status = _Pidle
		......
	}
  • 上面代码当中可以看到若没有M在运行,那P的状态就置为_Pidle;在acquirep的函数实现里面,会将p的状态从_Prunning-> _Pidle
// src/runtime/proc.go  method acquirep
func acquirep(_p_ *p) {
	// Do the part that isn't allowed to have write barriers.
	wirep(_p_)
	.....
}
func wirep(_p_ *p) {
	......
	_p_.status = _Prunning
}
  • 通过releasep,将P的状态从_Prunning–> _Pidle
// src/runtime/proc.go  method releasep 
func releasep() *p {
	......
	_p_.status = _Pidle
	return _p_
}
  • _Prunning可以与_Psyscall状态进行切换
// 通过entersyscall P由_Prunning变为_Psyscall
func entersyscall() {
	reentersyscall(getcallerpc(), getcallersp())
}

// 通过exitsyscall --> exitsyscallfast --> wirep; P由_Psyscall变为_Prunning
func exitsyscall() {
	......
	exitsyscallfast
	.....
}
//go:nosplit
func exitsyscallfast(oldp *p) bool {
......
// Try to re-acquire the last P.
	if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) {
		// There's a cpu for us, so we can run.
		wirep(oldp)
		exitsyscallfast_reacquired()
		return true
	}
}
func wirep(_p_ *p) {
	......
	_p_.status = _Prunning
}
  • destory是将P状态变为_Pdead
// src/runtime/proc.go  method destroy 
func (pp *p) destroy() {
	......
	pp.status = _Pdead
}

那什么时候会执行destory方法呢?前置调用procresize的时候,(可以是设置GOMAXPROCS参数)若发生减少P的情况,即会销毁P

	// src/runtime/proc.go  method procresize当中有一处 
	// release resources from unused P's
	for i := nprocs; i < old; i++ {
		p := allp[i]
		p.destroy()
		// can't free P itself because it can be referenced by an M in syscall
	}

M

结构体定义

// src/runtime/runtime2.go
type m struct {
	g0      *g     // goroutine with scheduling stack
	morebuf gobuf  // gobuf arg to morestack
	divmod  uint32 // div/mod denominator for arm - known to liblink

	// Fields not known to debuggers.
	procid        uint64       // for debuggers, but offset not hard-coded
	gsignal       *g           // signal-handling g
	goSigStack    gsignalStack // Go-allocated signal handling stack
	sigmask       sigset       // storage for saved signal mask
	tls           [6]uintptr   // thread-local storage (for x86 extern register)
	mstartfn      func()
	curg          *g       // current running goroutine
	caughtsig     guintptr // goroutine running during fatal signal
	p             puintptr // attached p for executing go code (nil if not executing go code)
......
	spinning      bool // m is out of work and is actively looking for work
......
	alllink       *m // on allm
	schedlink     muintptr
}

由上面可以看出

  • 一个M需要绑定一个P才会去执行,结构体里面有一个p的字段地址,如果没有执行则为nil
  • 一个M会绑定一个goroutine,即curg,保存着当前running goroutine的指针对象
  • M去执行goroutine的时候也需要用一个goroutine,即g0
  • 其他后续再补充……
  • ……

M的状态

  • 持有G,就是执行(非自旋)
func mspinning() {
	// startm's caller incremented nmspinning. Set the new M's spinning.
	getg().m.spinning = true
}
//go:nowritebarrierrec
func startm(_p_ *p, spinning bool) {}
  • 不持有G,就是等待(自旋),此种状态的好处就是省去创建M的开销;
// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {......}

G

我们会聊一个问题,goroutine与thread有什么区别呢?可以参考 go-nuts里面的讨论

  • 从内存占用上来说:一个goroutine初始化stack大小为2kb,而一个线程为1MB;这样会更加小的开支
  • 从创建和销毀来说:创建一个线程 需要使用cpu的调度,需要与硬件打交道,但是呢创建一个goroutine不需要,因为初始化runtime的时候线程已经创建好了,而创建的goroutine会依附的M上面去执行;
  • 从切换来讲:线程切换需要将executing的线程先放到暂存器里面,然后将runnable的线程拿过来执行;goroutine也是这样做,但是暂存器只有三种,但goroutine的切的时间远远小于线程切换时间1000-1500 纳秒,只需要 200 ns

如何创建?

src/runtime/proc.go

// Create a new g running fn with siz bytes of arguments.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
// Cannot split the stack because it assumes that the arguments
// are available sequentially after &fn; they would not be
// copied if a stack split occurred.
//go:nosplit
func newproc(siz int32, fn *funcval) {
	argp := add(unsafe.Pointer(&fn), sys.PtrSize)
	gp := getg()
	pc := getcallerpc()
	systemstack(func() {
		newproc1(fn, (*uint8)(argp), siz, gp, pc)
	})
}

调度原理

先得有G,才能干活

首先了解一下LRQ本地可运行队列(LocalRunningQueue)、GRQ全局可运行队列(GlobalRunningQueue)

// GRQ
type schedt struct{
	....
		// 全局的G队列
    runq     gQueue
		runqsize int32
	....
}
// LRQ
type p struct{
	// Queue of runnable goroutines. Accessed without lock.
	runqhead uint32
	runqtail uint32
	runq     [256]guintptr
}
  • A:初始化的时候先去全局里面看看有没有,func globrunqget(_p_ *p, max int32) *g {...}
    • 如果有,放到本地的p队列里面,
    • 如果没有,那就看本地的队列里面有没有func runqget(_p_ *p) (gp *g, inheritTime bool) {...}
      • C:如果有,那么去执行
      • 如果没有,去看poll网络里面有idle
        • 如果有,则C
        • 如果没有,去其他P里面去偷
  • B:本地队列执行完了怎么办?
    • 再去看看全局里面有没有,即重复执行上面的A
  • 具体如何找一个G,在函数``func findrunnable() (gp *g, inheritTime bool) {…}`里面

有了G,便得有M

如果有了G,如果g绑定的M处于自旋状态,则进行wakeup操作

	if _g_.m.spinning {
		resetspinning()
	}

有了M,再有一个P就可以干活了

如果G里面的P不是nil,则就可以直接干活,如果没有,那需要wakePtryWakeP就是在上面进行判断gp是否正常

	// If about to schedule a not-normal goroutine (a GCworker or tracereader),
	// wake a P if there is one.
	if tryWakeP {
		if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
			wakep()
		}
	}

怎么干活?

	execute(gp, inheritTime)

大概步骤:

  • 切换g的状态_Grunnable––> _Grunning
	casgstatus(gp, _Grunnable, _Grunning)
  • gp绑定到M,同时gp里面M也给绑定;即执行的_g_与需要执行的gp都绑定上M
	_g_.m.curg = gp
	gp.m = _g_.m
  • 然后进行gogo
	gogo(&gp.sched)

// src/runtime/asm_amd64.s
// func gogo(buf *gobuf)
// restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), NOSPLIT, $16-8
	MOVQ	buf+0(FP), BX		// gobuf
	MOVQ	gobuf_g(BX), DX
	MOVQ	0(DX), CX		// make sure g != nil
	get_tls(CX)
	MOVQ	DX, g(CX)
	MOVQ	gobuf_sp(BX), SP	// restore SP;这个SP像一个勾子一样,执行完之后回到schedule当中
	MOVQ	gobuf_ret(BX), AX
	MOVQ	gobuf_ctxt(BX), DX
	MOVQ	gobuf_bp(BX), BP
	MOVQ	$0, gobuf_sp(BX)	// clear to help garbage collector
	MOVQ	$0, gobuf_ret(BX)
	MOVQ	$0, gobuf_ctxt(BX)
	MOVQ	$0, gobuf_bp(BX)
	MOVQ	gobuf_pc(BX), BX
	JMP	BX
  • 最后执行goexit1,而goexit1最终调用的是goexit0
/ 在 goroutine 返回 goexit + PCQuantum 时运行的最顶层函数。
TEXT runtime·goexit(SB),NOSPLIT,$0-0
	BYTE	$0x90	// NOP
	CALL	runtime·goexit1(SB)	// 不会返回
	// traceback from goexit1 must hit code range of goexit
	BYTE	$0x90	// NOP
// Finishes execution of the current goroutine.
func goexit1() {
	if raceenabled {
		racegoend()
	}
	if trace.enabled {
		traceGoEnd()
	}
	// 这里调用goexit0
	mcall(goexit0)
}
// goexit continuation on g0.
func goexit0(gp *g) {
	_g_ := getg()
	.......
	schedule()
}
  • 执行goexit0里面的大概逻辑
// goexit continuation on g0.
func goexit0(gp *g) {
	_g_ := getg()

	// 切换gp的状态,从running变为dead
	casgstatus(gp, _Grunning, _Gdead)
	......
	// 解绑M
	gp.m = nil
	locked := gp.lockedm != 0
	gp.lockedm = 0
	// 清空_g_.m里面的持有g
	_g_.m.lockedg = 0
	gp.paniconfault = false
	// 将各种变更都置为nil
	gp._defer = nil // should be true already but just in case.
	gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
	gp.writebuf = nil
	gp.waitreason = 0
	gp.param = nil
	gp.labels = nil
	gp.timer = nil

	// Note that gp's stack scan is now "valid" because it has no
	// stack.
	gp.gcscanvalid = true
	// 丢弃g
	dropg()

  // 处理p里面的g队列
	gfput(_g_.m.p.ptr(), gp)
	.....
	// 再次开始调度
	schedule()
}

以上为自己的学习笔记,可能会有错误或者理解不到位的地方,参考资料里面写的有很多;

参考资料