2.6 抢占式调度 
2.6.0 sysmon监听抢占时机 
sysmon是一个Go里面的一个特殊的线程,不与任何P绑定,不参与调度,主要用于监控整个Go进程,主要有如下作用:
释放闲置超过5分钟的span物理内存 
超过2分钟没有垃圾回收,强制启动垃圾回收 
将长时间没有处理的netpoll结果添加到任务队列 
向长时间执行的G任务发起抢占调度 
收回因syscall而长时间阻塞的P 
 
sysmon线程在runtime.main函数里面创建:
1 2 3 4 5 6 7 8 9 10 11 12 func  main ()   {    ...       if  GOARCH != "wasm"  {                      atomic.Store(&sched.sysmonStarting, 1 )         systemstack(func ()   {           newm(sysmon, nil , -1 )       })     }   ... } 
 
在线程M的创建过程 中有提到newm函数,该函数用来创建一个内核线程m,设置启动函数为首个参数。执行流程为newm–>newm1–>newosporc->pthread_create–>runtime.mstart–>runtime.mstart1–>mstartfn–>schedule;此处mstartfn便是sysmon函数。由于sysmon函数是死循环的,所以监控线程永远不会执行到后面的调度程序schedule 
以下为sysmon函数循环检查Go进程的过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func  sysmon ()   {    lasttrace := int64 (0 )     idle := 0       delay := uint32 (0 )     for  {                           usleep(delay)                  lastpoll := sched.lastpoll.Load()         if  netpollinited() && lastpoll != 0  && lastpoll+10 *1000 *1000  < now {             sched.lastpoll.CompareAndSwap(lastpoll, now)             list := netpoll(0 )              if  !list.empty() {                 injectglist(&list)             }         }                  if  retake(now) != 0  {             idle = 0           } else  {             idle++         }                  if  t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && forcegc.idle.Load() {             lock(&forcegc.lock)             forcegc.idle.Store(false )             var  list gList             list.push(forcegc.g)             injectglist(&list)             unlock(&forcegc.lock)         }     } } 
 
sysmon监控线程判断是否需要抢占主要通过retake函数进行检查,遍历所有的P,如果某个P经过10ms没有切换都没有协程,那么就需要被抢占了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 const  forcePreemptNS = 10  * 1000  * 1000  func  retake (now int64 )   uint32  {         for  i := 0 ; i < len (allp); i++ {         pp := allp[i]         pd := &pp.sysmontick         s := pp.status         sysretake := false          if  s == _Prunning || s == _Psyscall {                          t := int64 (pp.schedtick)             if  int64 (pd.schedtick) != t {                 pd.schedtick = uint32 (t)                 pd.schedwhen = now             } else  if  pd.schedwhen+forcePreemptNS <= now {                                  preemptone(pp)                 sysretake = true              }         }              } } 
 
找到需要抢占的P后,调用preemptone(pp)对P当前运行的协程进行抢占。抢占的方式有两种,一种是基于协作的抢占,一种是基于信号的抢占
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 const (         stackPreempt = 0xfffffade   ) func  preemptone (pp *p)   bool  {    mp := pp.m.ptr()     gp := mp.curg     gp.preempt = true                gp.stackguard0 = stackPreempt     if  preemptMSupported && debug.asyncpreemptoff == 0  {         pp.preempt = true                   preemptM(mp)     }     return  true  } 
 
2.6.1 基于协作的抢占式调度 
在1.14版本之前,只有基于协作的抢占式调度,即preemptone函数中只有设置gp.stackguard0 = stackPreempt,而没有后面的preemptM(mp)过程。 
由于goroutine初始栈桢很小(2kb),为了避免栈溢出,go语言编译期会在函数头部加上栈增长检测代码,如果在函数调用时编译器发现栈不够用了或者g.stackguard0 = stackPreempt,将会调用runtime.morestack来进行栈增长,而runtime.morestack是个汇编函数,又会调用runtime.newstack。 
再morestack中首先要保存好当前协程的上下文,之后该协程继续从这个位置执行。保存完成之后调用newstack
1 2 3 4 5 6 7 8 9 10 11 TEXT runtime·morestack(SB),NOSPLIT,$0 -0      ...     MOVQ    0 (SP), AX      MOVQ    AX, (g_sched+gobuf_pc)(SI)     MOVQ    SI, (g_sched+gobuf_g)(SI)     LEAQ    8 (SP), AX      MOVQ    AX, (g_sched+gobuf_sp)(SI)     MOVQ    BP, (g_sched+gobuf_bp)(SI)     MOVQ    DX, (g_sched+gobuf_ctxt)(SI)     ...     CALL    runtime·newstack(SB) 
 
newstack函数执行的栈扩张逻辑,在扩张之前,首先会检查是否是要协程抢占
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 func  newstack ()   {    thisg := getg()     ...     gp := thisg.m.curg     ...          preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt               if  preempt {         if  !canPreemptM(thisg.m) {                          gp.stackguard0 = gp.stack.lo + _StackGuard             gogo(&gp.sched)          }     }     ...          if  preempt {         ...         if  gp.preemptShrink {                          gp.preemptShrink = false              shrinkstack(gp)         }         if  gp.preemptStop {             preemptPark(gp)          }         ...                  gopreempt_m(gp)      }          ... } func  gopreempt_m (gp *g)   {    goschedImpl(gp) } 
 
当newstack判断是发生了抢占时,会调用到goschedImpl函数,可以看到,会先把当前的g放到全局队列,然后开始调度
1 2 3 4 5 6 7 8 9 10 func  goschedImpl (gp *g)   {    casgstatus(gp, _Grunning, _Grunnable)     dropg()     lock(&sched.lock)          globrunqput(gp)     unlock(&sched.lock)          schedule() } 
 
2.6.2 基于信号的抢占式调度 
一个不参与任何函数调用的函数,直到执行完毕之前, 是不会被抢占的。如协程里面就一个for{}循环,将无法被抢占 
1.14版本增加了基于信号的抢占式调度,preemptM(mp)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func  preemptM (mp *m)   {    if  GOOS == "darwin"  || GOOS == "ios"  {         execLock.rlock()     }     if  atomic.Cas(&mp.signalPending, 0 , 1 ) {         if  GOOS == "darwin"  || GOOS == "ios"  {             atomic.Xadd(&pendingPreemptSignals, 1 )         }                  signalM(mp, sigPreempt)     }     if  GOOS == "darwin"  || GOOS == "ios"  {         execLock.runlock()     } } func  signalM (mp *m, sig int )   {         pthread_kill(pthread(mp.procid), uint32 (sig)) } func  signalM (mp *m, sig int )   {         tgkill(getpid(), int (mp.procid), sig) } 
 
可见,基于信号的抢占式调度是通过监控线程sysmon发现有10ms以上未调度的P时,通过执行signalM对Go进程发送抢占信号(0x17) 
Go进程收到该信号之后是如何执行抢占的呢,我们先来看信号是如何注册的
在初始化时注册信号 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 func  initsig (preinit bool )   {         if  !preinit {         signalsOK = true      }          for  i := uint32 (0 ); i < _NSIG; i++ {         t := &sigtable[i]                  if  t.flags == 0  || t.flags&_SigDefault != 0  {             continue          }         ...           setsig(i, funcPC(sighandler))     } } func  setsig (i uint32 , fn uintptr )   {    var  sa sigactiont     sa.sa_flags = _SA_SIGINFO | _SA_ONSTACK | _SA_RESTORER | _SA_RESTART     sigfillset(&sa.sa_mask)     if  fn == abi.FuncPCABIInternal(sighandler) {          if  iscgo {                          fn = abi.FuncPCABI0(cgoSigtramp)         } else  {             fn = abi.FuncPCABI0(sigtramp)         }     }     sa.sa_handler = fn     sigaction(i, &sa, nil ) } 
 
在initsig中先遍历信号数组调用setsig进行注册,setsig中会执行系统调用来安装信号和信号处理函数。我们继续看信号处理函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func  sighandler (sig uint32 , info *siginfo, ctxt unsafe.Pointer, gp *g)   {              gsignal := getg()     mp := gsignal.m     c := &sigctxt{info, ctxt}          if  sig == sigPreempt {                  doSigPreempt(gp, c)     }     ... } func  doSigPreempt (gp *g, ctxt *sigctxt)   {    if  wantAsyncPreempt(gp) {         if  ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {                          ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)         }     }          gp.m.preemptGen.Add(1 )     gp.m.signalPending.Store(0 ) } 
 
在信号处理函数sighandler中,对于抢占信号,会执行doSigPreempt函数,其中会通过pushcall插入syncPreempt函数调用
1 2 3 4 5 6 7 TEXT ·asyncPreempt(SB),NOSPLIT|NOFRAME,$0 -0      ...     CALL ·asyncPreempt2(SB)     ... 
 
syncPreempt最终调用了asyncPreempt2()函数
1 2 3 4 5 6 7 8 9 10 11 func  asyncPreempt2 ()   {    gp := getg()     gp.asyncSafePoint = true      if  gp.preemptStop {         mcall(preemptPark)     } else  {         mcall(gopreempt_m)     }     gp.asyncSafePoint = false  } 
 
可见,兜兜转转,最终跟基于协作的信号抢占一样,执行preemptPark或gopreempt_m函数来执行schedule
信号抢占的整体逻辑如下:
M 注册一个 SIGURG 信号的处理函数:sighandler。 
sysmon 线程检测到执行时间过长的 goroutine、GC stw 时,会向相应的 M(或者说线程,每个线程对应一个 M)发送 SIGURG 信号。 
收到信号后,内核执行 sighandler 函数,通过 pushCall 插入 asyncPreempt 函数调用。 
回到当前 goroutine 执行 asyncPreempt 函数,通过 mcall 切到 g0 栈执行 gopreempt_m。 
将当前 goroutine 插入到全局可运行队列,M 则继续寻找其他 goroutine 来运行。 
被抢占的 goroutine 再次调度过来执行时,会继续原来的执行流。