2.6 抢占式调度
2.6.0 sysmon监听抢占时机
sysmon是一个Go里面的一个特殊的线程,不与任何P绑定,不参与调度,主要用于监控整个Go进程,主要有如下作用:
释放闲置超过5分钟的span物理内存
超过2分钟没有垃圾回收,强制启动垃圾回收
将长时间没有处理的netpoll结果添加到任务队列
向长时间执行的G任务发起抢占调度
收回因syscall而长时间阻塞的P
sysmon线程在runtime.main函数里面创建:
1 2 3 4 5 6 7 8 9 10 11 12 func main () { ... if GOARCH != "wasm" { atomic.Store(&sched.sysmonStarting, 1 ) systemstack(func () { newm(sysmon, nil , -1 ) }) } ... }
在线程M的创建过程 中有提到newm函数,该函数用来创建一个内核线程m,设置启动函数为首个参数。执行流程为newm–>newm1–>newosporc->pthread_create–>runtime.mstart–>runtime.mstart1–>mstartfn–>schedule;此处mstartfn便是sysmon函数。由于sysmon函数是死循环的,所以监控线程永远不会执行到后面的调度程序schedule
以下为sysmon函数循环检查Go进程的过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func sysmon () { lasttrace := int64 (0 ) idle := 0 delay := uint32 (0 ) for { usleep(delay) lastpoll := sched.lastpoll.Load() if netpollinited() && lastpoll != 0 && lastpoll+10 *1000 *1000 < now { sched.lastpoll.CompareAndSwap(lastpoll, now) list := netpoll(0 ) if !list.empty() { injectglist(&list) } } if retake(now) != 0 { idle = 0 } else { idle++ } if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && forcegc.idle.Load() { lock(&forcegc.lock) forcegc.idle.Store(false ) var list gList list.push(forcegc.g) injectglist(&list) unlock(&forcegc.lock) } } }
sysmon监控线程判断是否需要抢占主要通过retake函数进行检查,遍历所有的P,如果某个P经过10ms没有切换都没有协程,那么就需要被抢占了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 const forcePreemptNS = 10 * 1000 * 1000 func retake (now int64 ) uint32 { for i := 0 ; i < len (allp); i++ { pp := allp[i] pd := &pp.sysmontick s := pp.status sysretake := false if s == _Prunning || s == _Psyscall { t := int64 (pp.schedtick) if int64 (pd.schedtick) != t { pd.schedtick = uint32 (t) pd.schedwhen = now } else if pd.schedwhen+forcePreemptNS <= now { preemptone(pp) sysretake = true } } } }
找到需要抢占的P后,调用preemptone(pp)对P当前运行的协程进行抢占。抢占的方式有两种,一种是基于协作的抢占,一种是基于信号的抢占
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 const ( stackPreempt = 0xfffffade ) func preemptone (pp *p) bool { mp := pp.m.ptr() gp := mp.curg gp.preempt = true gp.stackguard0 = stackPreempt if preemptMSupported && debug.asyncpreemptoff == 0 { pp.preempt = true preemptM(mp) } return true }
2.6.1 基于协作的抢占式调度
在1.14版本之前,只有基于协作的抢占式调度,即preemptone函数中只有设置gp.stackguard0 = stackPreempt
,而没有后面的preemptM(mp)
过程。
由于goroutine初始栈桢很小(2kb),为了避免栈溢出,go语言编译期会在函数头部加上栈增长检测代码,如果在函数调用时编译器发现栈不够用了或者g.stackguard0 = stackPreempt,将会调用runtime.morestack来进行栈增长,而runtime.morestack是个汇编函数,又会调用runtime.newstack。
再morestack中首先要保存好当前协程的上下文,之后该协程继续从这个位置执行。保存完成之后调用newstack
1 2 3 4 5 6 7 8 9 10 11 TEXT runtime·morestack(SB),NOSPLIT,$0 -0 ... MOVQ 0 (SP), AX MOVQ AX, (g_sched+gobuf_pc)(SI) MOVQ SI, (g_sched+gobuf_g)(SI) LEAQ 8 (SP), AX MOVQ AX, (g_sched+gobuf_sp)(SI) MOVQ BP, (g_sched+gobuf_bp)(SI) MOVQ DX, (g_sched+gobuf_ctxt)(SI) ... CALL runtime·newstack(SB)
newstack函数执行的栈扩张逻辑,在扩张之前,首先会检查是否是要协程抢占
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 func newstack () { thisg := getg() ... gp := thisg.m.curg ... preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt if preempt { if !canPreemptM(thisg.m) { gp.stackguard0 = gp.stack.lo + _StackGuard gogo(&gp.sched) } } ... if preempt { ... if gp.preemptShrink { gp.preemptShrink = false shrinkstack(gp) } if gp.preemptStop { preemptPark(gp) } ... gopreempt_m(gp) } ... } func gopreempt_m (gp *g) { goschedImpl(gp) }
当newstack判断是发生了抢占时,会调用到goschedImpl函数,可以看到,会先把当前的g放到全局队列,然后开始调度
1 2 3 4 5 6 7 8 9 10 func goschedImpl (gp *g) { casgstatus(gp, _Grunning, _Grunnable) dropg() lock(&sched.lock) globrunqput(gp) unlock(&sched.lock) schedule() }
2.6.2 基于信号的抢占式调度
一个不参与任何函数调用的函数,直到执行完毕之前, 是不会被抢占的。如协程里面就一个for{}循环,将无法被抢占
1.14版本增加了基于信号的抢占式调度,preemptM(mp)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func preemptM (mp *m) { if GOOS == "darwin" || GOOS == "ios" { execLock.rlock() } if atomic.Cas(&mp.signalPending, 0 , 1 ) { if GOOS == "darwin" || GOOS == "ios" { atomic.Xadd(&pendingPreemptSignals, 1 ) } signalM(mp, sigPreempt) } if GOOS == "darwin" || GOOS == "ios" { execLock.runlock() } } func signalM (mp *m, sig int ) { pthread_kill(pthread(mp.procid), uint32 (sig)) } func signalM (mp *m, sig int ) { tgkill(getpid(), int (mp.procid), sig) }
可见,基于信号的抢占式调度是通过监控线程sysmon发现有10ms以上未调度的P时,通过执行signalM对Go进程发送抢占信号(0x17)
Go进程收到该信号之后是如何执行抢占的呢,我们先来看信号是如何注册的
在初始化时注册信号
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 func initsig (preinit bool ) { if !preinit { signalsOK = true } for i := uint32 (0 ); i < _NSIG; i++ { t := &sigtable[i] if t.flags == 0 || t.flags&_SigDefault != 0 { continue } ... setsig(i, funcPC(sighandler)) } } func setsig (i uint32 , fn uintptr ) { var sa sigactiont sa.sa_flags = _SA_SIGINFO | _SA_ONSTACK | _SA_RESTORER | _SA_RESTART sigfillset(&sa.sa_mask) if fn == abi.FuncPCABIInternal(sighandler) { if iscgo { fn = abi.FuncPCABI0(cgoSigtramp) } else { fn = abi.FuncPCABI0(sigtramp) } } sa.sa_handler = fn sigaction(i, &sa, nil ) }
在initsig中先遍历信号数组调用setsig进行注册,setsig中会执行系统调用来安装信号和信号处理函数。我们继续看信号处理函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func sighandler (sig uint32 , info *siginfo, ctxt unsafe.Pointer, gp *g) { gsignal := getg() mp := gsignal.m c := &sigctxt{info, ctxt} if sig == sigPreempt { doSigPreempt(gp, c) } ... } func doSigPreempt (gp *g, ctxt *sigctxt) { if wantAsyncPreempt(gp) { if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok { ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc) } } gp.m.preemptGen.Add(1 ) gp.m.signalPending.Store(0 ) }
在信号处理函数sighandler中,对于抢占信号,会执行doSigPreempt函数,其中会通过pushcall插入syncPreempt函数调用
1 2 3 4 5 6 7 TEXT ·asyncPreempt(SB),NOSPLIT|NOFRAME,$0 -0 ... CALL ·asyncPreempt2(SB) ...
syncPreempt最终调用了asyncPreempt2()函数
1 2 3 4 5 6 7 8 9 10 11 func asyncPreempt2 () { gp := getg() gp.asyncSafePoint = true if gp.preemptStop { mcall(preemptPark) } else { mcall(gopreempt_m) } gp.asyncSafePoint = false }
可见,兜兜转转,最终跟基于协作的信号抢占一样,执行preemptPark或gopreempt_m函数来执行schedule
信号抢占的整体逻辑如下:
M 注册一个 SIGURG 信号的处理函数:sighandler。
sysmon 线程检测到执行时间过长的 goroutine、GC stw 时,会向相应的 M(或者说线程,每个线程对应一个 M)发送 SIGURG 信号。
收到信号后,内核执行 sighandler 函数,通过 pushCall 插入 asyncPreempt 函数调用。
回到当前 goroutine 执行 asyncPreempt 函数,通过 mcall 切到 g0 栈执行 gopreempt_m。
将当前 goroutine 插入到全局可运行队列,M 则继续寻找其他 goroutine 来运行。
被抢占的 goroutine 再次调度过来执行时,会继续原来的执行流。