type gobuf struct { sp uintptr// sp堆栈寄存器,永远指向栈顶位置 pc uintptr// pc寄存器的值 g guintptr ctxt unsafe.Pointer // gc时候使用 ret uintptr// ret用来保存系统调用的返回值 lr uintptr// 保存返回地址 bp uintptr// 基址寄存器,配合sp寄存器使用,某一时刻的栈顶位置 }
funcnewproc1(fn *funcval, callergp *g, callerpc uintptr) *g { if fn == nil { fatal("go of nil func value") }
mp := acquirem() // disable preemption because we hold M and P in local vars. pp := mp.p.ptr() // 从当前P的gFree协程队列里面取一个来复用,当前P的gFree如果为空,就从全局gFree队列里面取 newg := gfget(pp) if newg == nil { // 新建一个栈大小为2k的g对象,将栈顶和栈底保存到newg.stack里面 newg = malg(_StackMin) casgstatus(newg, _Gidle, _Gdead) // 将新创建的g的指针保存到全局的allgs数组中,新创建的g未防止被gc扫描,将g的状态改为Gdead allgadd(newg) }
totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frame totalSize = alignUp(totalSize, sys.StackAlign) sp := newg.stack.hi - totalSize spArg := sp if usesLR { // caller's LR *(*uintptr)(unsafe.Pointer(sp)) = 0 prepGoExitFrame(sp) spArg += sys.MinFrameSize } // 初始化g对象的gobuf,调度用的sp、pc、任务函数等 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.stktopsp = sp // 这里先将pc指向goexit地址,后续这个pc值会在gostartcallfn函数中塞在sp中,也就是栈底位置,而这个位置是go函数调用的return addr,所以协程任务函数执行完成后会执行goexit函数 newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function newg.sched.g = guintptr(unsafe.Pointer(newg)) // 填充sched这个gobuf结构体,设置newg.sched的堆栈指针sp,pc指向协程的任务函数fn gostartcallfn(&newg.sched, fn) newg.gopc = callerpc newg.ancestors = saveAncestors(callergp) newg.startpc = fn.fn //...... casgstatus(newg, _Gdead, _Grunnable) gcController.addScannableStack(pp, int64(newg.stack.hi-newg.stack.lo)) // 分配goid if pp.goidcache == pp.goidcacheend { pp.goidcache = sched.goidgen.Add(_GoidCacheBatch) pp.goidcache -= _GoidCacheBatch - 1 pp.goidcacheend = pp.goidcache + _GoidCacheBatch } newg.goid = pp.goidcache pp.goidcache++ //...... releasem(mp)
funcschedinit() { lock(&sched.lock) sched.lastpoll.Store(nanotime()) procs := ncpu if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 { procs = n } if procresize(procs) != nil { throw("unknown runnable goroutine during bootstrap") } unlock(&sched.lock) }
// scheinit里面的全局变量 ncpu=getncpu() funcgetncpu()int32 { mib := [2]uint32{_CTL_HW, _HW_NCPU} out := uint32(0) nout := unsafe.Sizeof(out) ret := sysctl(&mib[0], 2, (*byte)(unsafe.Pointer(&out)), &nout, nil, 0) if ret >= 0 { returnint32(out) } return1 }
gp := getg() if gp.m.p == 0 { acquirep(pp) // temporarily borrow p for mallocs in this function } // 创建一个m结构体,并设置起始函数是fn mp := new(m) mp.mstartfn = fn mcommoninit(mp, id)
// Initialize an attribute object. var attr pthreadattr var err int32 err = pthread_attr_init(&attr) if err != 0 { write(2, unsafe.Pointer(&failthreadcreate[0]), int32(len(failthreadcreate))) exit(1) }
// Find out OS stack size for our own stack guard. // 获取线程的系统栈大小赋值给g0 var stacksize uintptr if pthread_attr_getstacksize(&attr, &stacksize) != 0 { write(2, unsafe.Pointer(&failthreadcreate[0]), int32(len(failthreadcreate))) exit(1) } mp.g0.stack.hi = stacksize // 设置hi为size, for mstart
// 通过系统调用创建线程,使用mstart_stub,而mstart_stub最终会调用runtime.mstart // Finally, create the thread. It starts at mstart_stub, which does some low-level // setup and then calls mstart. var oset sigset sigprocmask(_SIG_SETMASK, &sigset_all, &oset) err = pthread_create(&attr, abi.FuncPCABI0(mstart_stub), unsafe.Pointer(mp)) }
type schedt struct { goidgen atomic.Uint64 lastpoll atomic.Int64 // time of last network poll, 0 if currently polling pollUntil atomic.Int64 // time to which current poll is sleeping lock mutex midle muintptr // 等待运行的空闲M列表 nmidle int32// 空闲M列表中元素个数 nmidlelocked int32// number of locked m's waiting for work mnext int64// number of m's that have been created and next M ID maxmcount int32// 最多只能创建maxmcount个工作线程 nmsys int32// number of system m's not counted for deadlock nmfreed int64// cumulative number of freed m's ngsys atomic.Int32 // number of system goroutines pidle puintptr // 由空闲的p结构体对象组成的链表 npidle atomic.Int32 // 空闲的p结构体对象的数量 nmspinning atomic.Int32 // See "Worker thread parking/unparking" comment in proc.go. needspinning atomic.Uint32 // See "Delicate dance" comment in proc.go. Boolean. Must hold sched.lock to set to 1. // goroutine的全局运行队列 runq gQueue runqsize int32 disable struct { // user disables scheduling of user goroutines. user bool runnable gQueue // pending runnable Gs n int32// length of runnable } // gFree是所有状态已经死亡(dead)的goroutine对应的g结构体对象组成的链表 // 用于缓存g结构体对象,避免每次创建goroutine时都需要重新分配 gFree struct { lock mutex stack gList // Gs with stacks noStack gList // Gs without stacks n int32 }
// Central cache of sudog structs. sudoglock mutex sudogcache *sudog
// 保存已经退出的m,等待被释放 freem *m
gcwaiting atomic.Bool // gc is waiting to run stopwait int32 stopnote note sysmonwait atomic.Bool sysmonnote note // ...... }
funcschedinit() { lockInit(&sched.lock, lockRankSched) // lockInit...... gp := getg() // 系统线程M最多10000个 sched.maxmcount = 10000 // The world starts stopped. worldStopped()
moduledataverify() stackinit() mallocinit() cpuinit() // must run before alginit alginit() // maps, hash, fastrand must not be used before this call fastrandinit() // must run before mcommoninit mcommoninit(gp.m, -1) gcinit()
lock(&sched.lock) sched.lastpoll.Store(nanotime()) procs := ncpu if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 { procs = n } // 调度器初始化时会创建cpu核数相同或者环境变量GOMAXPROCS个处理器P if procresize(procs) != nil { throw("unknown runnable goroutine during bootstrap") } unlock(&sched.lock)
// World is effectively started now, as P's can run. worldStarted() }
// 调度策略 阻塞在查找可执行的g,用本地p的队列g,没有用全局队列的g gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available // 在当前m上运行协程g,该函数不再返回 execute(gp, inheritTime) }
// 返回参数 // gp: 返回可运行的协程g,可能是P的本地队列里面的,也可能是从别的P窃取过来的,也可能是全局获取的,还可能是poll network唤醒的 // inheriTime: 是否要增加当前P的调度计数tick // tryWakeP: 如果是GC或者trace的协程,需要去唤醒一个P来执行 funcfindRunnable() (gp *g, inheritTime, tryWakeP bool) { mp := getg().m // 每隔61次从全局队列上获取一个可执行的G,P的本地队列都忙,全局协程也能执行。https://github.com/golang/go/issues/20168 if pp.schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp := globrunqget(pp, 1) unlock(&sched.lock) if gp != nil { return gp, false, false } } // local runq if gp, inheritTime := runqget(pp); gp != nil { return gp, inheritTime, false } // global runq if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(pp, 0) unlock(&sched.lock) if gp != nil { return gp, false, false } } // Poll network. if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 { if list := netpoll(0); !list.empty() { // non-blocking gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false, false } }
// Spinning Ms: steal work from other Ps. if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() { if !mp.spinning { mp.becomeSpinning() }
gp, inheritTime, tnow, w, newWork := stealWork(now) if gp != nil { // Successfully stole. return gp, inheritTime, false } if newWork { // There may be new timer or GC work; restart to // discover. goto top }
now = tnow if w != 0 && (pollUntil == 0 || w < pollUntil) { // Earlier timer to wait for. pollUntil = w } }