Go 运行时通过 Semacquire(goroutine 尝试获取信号量,失败则用 sudog 记录状态并挂起)和 Semrelease(释放信号量后唤醒等待的 goroutine,改状态为可运行并加入调度)实现资源同步,过程中依赖 g0 栈切换工具(mcall/systemstack)保障安全操作,最终支撑 goroutine 轻量级并发的高效调度。
Semacquire
runtime_SemacquireRWMutex位于runtime/sema.go,
func sync_runtime_SemacquireRWMutexR(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes, waitReasonSyncRWMutexRLock)
}
semacquire1 大致流程如下:
-
获取当前goroutin
-
如果信号量的值是1 则说明没有其他goroutin 占用,将值-1 并返回
-
从 P 中获取一个sudog 对象
-
执行for 循环:1)将当前sudog 加入信号量的等待队列,该m 会尝试调度其他g 执行,这个g会等待信号量被唤醒,若此时信号量的值是1 则说明没有其他goroutin 占用,将值-1 并继续执行后续逻辑
-
释放sudog
sudog
是用于管理 goroutine 等待状态的关键结构体(例如在信号量、锁、channel 等同步操作中),其生命周期通常是:
- 当 goroutine 需要等待资源时,通过
acquireSudog()
从缓存池获取一个sudog
;- 使用
sudog
记录等待信息(如关联的 goroutine、等待的资源地址、在等待队列中的位置等);- 当 goroutine 被唤醒(获取到资源)后,
sudog
不再需要,通过releaseSudog(s)
将其归还到缓存池。
for { lockWithRank(&root.lock, lockRankRoot) // Add ourselves to nwait to disable "easy case" in semrelease. root.nwait.Add(1) // Check cansemacquire to avoid missed wakeup. if cansemacquire(addr) { root.nwait.Add(-1) unlock(&root.lock) break } // Any semrelease after the cansemacquire knows we're waiting // (we set nwait above), so go to sleep. root.queue(addr, s, lifo) goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } }
这段循环中会root.nwait.Add(1), 这是与
semrelease
逻辑配合使用的,semrelease
依据root.nwait
决定是否唤醒等待者当其他 goroutine 释放信号量(
semrelease
)时,会检查root.nwait
的值:
- 若
root.nwait == 0
(无等待者),semrelease
只需简单增加信号量计数(快速释放);- 若
root.nwait > 0
(有等待者),semrelease
必须从等待队列中唤醒一个等待者(慢速释放),并减少root.nwait
计数。
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
// Easy case.
if cansemacquire(addr) {
return
}
// Harder case:
// increment waiter count
// try cansemacquire one more time, return if succeeded
// enqueue itself as a waiter
// sleep
// (waiter descriptor is dequeued by signaler)
s := acquireSudog()
root := semtable.rootFor(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lockWithRank(&root.lock, lockRankRoot)
// Add ourselves to nwait to disable "easy case" in semrelease.
root.nwait.Add(1)
// Check cansemacquire to avoid missed wakeup.
if cansemacquire(addr) {
root.nwait.Add(-1)
unlock(&root.lock)
break
}
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
root.queue(addr, s, lifo)
goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
releaseSudog(s)
}
goparkunlock会将当前 goroutine 切换到 "等待状态",同时会释放(解锁)之前持有的锁(对应信号量等待队列的锁)
// Puts the current goroutine into a waiting state and unlocks the lock.
// The goroutine can be made runnable again by calling goready(gp).
func goparkunlock(lock *mutex, reason waitReason, traceReason traceBlockReason, traceskip int) {
gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceReason, traceskip)
}
reason 为 waitReasonSemacquire
。 gopark会将该g 对应的m 加锁,并且保存需要解锁的资源(对应信号量等待队列的锁),以及解锁的回调函数。然后释放m。最后执行macll 函数,mcall
是一个特殊函数,用于从 G
的用户栈切换到 M
的g0执行指定函数(park_m
)。
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceReason traceBlockReason, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waitTraceBlockReason = traceReason
mp.waitTraceSkip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)
}
mcall
是通过汇编代码实现,具体位置可参考 src/runtime/asm_amd64.s,用处是将执行栈从当前 goroutine(G)的用户栈切换到系统线程(M)的g0 栈,并在 kernel 栈上执行指定函数 fn
,fn
此处为park_m,参数为当前goroutine(G)。
// func mcall(fn func(*g))
// Switch to m->g0's stack, call fn(g).
// Fn must never return. It should gogo(&g->sched)
// to keep running g.
TEXT runtime·mcall<ABIInternal>(SB), NOSPLIT, $0-8
MOVQ AX, DX // DX = fn
// Save state in g->sched. The caller's SP and PC are restored by gogo to
// resume execution in the caller's frame (implicit return). The caller's BP
// is also restored to support frame pointer unwinding.
MOVQ SP, BX // hide (SP) reads from vet
MOVQ 8(BX), BX // caller's PC
MOVQ BX, (g_sched+gobuf_pc)(R14)
LEAQ fn+0(FP), BX // caller's SP
MOVQ BX, (g_sched+gobuf_sp)(R14)
// Get the caller's frame pointer by dereferencing BP. Storing BP as it is
// can cause a frame pointer cycle, see CL 476235.
MOVQ (BP), BX // caller's BP
MOVQ BX, (g_sched+gobuf_bp)(R14)
// switch to m->g0 & its stack, call fn
MOVQ g_m(R14), BX
MOVQ m_g0(BX), SI // SI = g.m.g0
CMPQ SI, R14 // if g == m->g0 call badmcall
JNE goodm
JMP runtime·badmcall(SB)
goodm:
MOVQ R14, AX // AX (and arg 0) = g
MOVQ SI, R14 // g = g.m.g0
get_tls(CX) // Set G in TLS
MOVQ R14, g(CX)
MOVQ (g_sched+gobuf_sp)(R14), SP // sp = g0.sched.sp
MOVQ $0, BP // clear frame pointer, as caller may execute on another M
PUSHQ AX // open up space for fn's arg spill slot
MOVQ 0(DX), R12
CALL R12 // fn(g)
// The Windows native stack unwinder incorrectly classifies the next instruction
// as part of the function epilogue, producing a wrong call stack.
// Add a NOP to work around this issue. See go.dev/issue/67007.
BYTE $0x90
POPQ AX
JMP runtime·badmcall2(SB)
RET
park_m 中:
- 首先将g 的状态转换为
_Gwaiting
, dropg()
将m与当前 g解绑- 然后会查看
waitunlockf
(信号量等待队列的锁解锁的回调函数),如果回调函数失败则execute(gp, true)
继续执行 g, - 否则通过
schedule()
去调度新的g 执行。
// park continuation on g0.
func park_m(gp *g) {
mp := getg().m
trace := traceAcquire()
// If g is in a synctest group, we don't want to let the group
// become idle until after the waitunlockf (if any) has confirmed
// that the park is happening.
// We need to record gp.bubble here, since waitunlockf can change it.
bubble := gp.bubble
if bubble != nil {
bubble.incActive()
}
if trace.ok() {
// Trace the event before the transition. It may take a
// stack trace, but we won't own the stack after the
// transition anymore.
trace.GoPark(mp.waitTraceBlockReason, mp.waitTraceSkip)
}
// N.B. Not using casGToWaiting here because the waitreason is
// set by park_m's caller.
casgstatus(gp, _Grunning, _Gwaiting)
if trace.ok() {
traceRelease(trace)
}
dropg()
if fn := mp.waitunlockf; fn != nil {
ok := fn(gp, mp.waitlock)
mp.waitunlockf = nil
mp.waitlock = nil
if !ok {
trace := traceAcquire()
casgstatus(gp, _Gwaiting, _Grunnable)
if bubble != nil {
bubble.decActive()
}
if trace.ok() {
trace.GoUnpark(gp, 2)
traceRelease(trace)
}
execute(gp, true) // Schedule it back, never returns.
}
}
if bubble != nil {
bubble.decActive()
}
schedule()
}
在此我们先介绍一下execute 方法:
- 绑定
M
与G
的关联关系; - 将
G
的状态从 “就绪” 改为 “运行”; - 初始化运行所需的状态(栈检查阈值、时间片等);
- 通过
gogo
切换到G
的执行上下文,正式开始执行。
func execute(gp *g, inheritTime bool) {
//代码运行在 M 的 g0 栈 上,而非被调度的 gp 栈上,所以可以获得m
mp := getg().m
if goroutineProfile.active {
// Make sure that gp has had its stack written out to the goroutine
// profile, exactly as it was when the goroutine profiler first stopped
// the world.
tryRecordGoroutineProfile(gp, nil, osyield)
}
// Assign gp.m before entering _Grunning so running Gs have an M.
mp.curg = gp
gp.m = mp
gp.syncSafePoint = false // Clear the flag, which may have been set by morestack.
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + stackGuard
if !inheritTime {
mp.p.ptr().schedtick++
}
// Check whether the profiler needs to be turned on or off.
hz := sched.profilehz
if mp.profilehz != hz {
setThreadCPUProfiler(hz)
}
trace := traceAcquire()
if trace.ok() {
trace.GoStart()
traceRelease(trace)
}
gogo(&gp.sched)
}
gogo
是Go 运行时实现 “goroutine 状态恢复” 的核心函数,它通过从 gobuf
结构中读取保存的上下文(PC、SP、BP 等),将 G 的执行状态无缝恢复到被挂起前的状态,是实现 goroutine 阻塞与唤醒、调度切换的底层关键机制。
// func gogo(buf *gobuf)
// restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), NOSPLIT, $0-8
MOVQ buf+0(FP), BX // gobuf
MOVQ gobuf_g(BX), DX
MOVQ 0(DX), CX // make sure g != nil
JMP gogo<>(SB)
TEXT gogo<>(SB), NOSPLIT, $0
get_tls(CX)
MOVQ DX, g(CX)
MOVQ DX, R14 // set the g register
MOVQ gobuf_sp(BX), SP // restore SP
MOVQ gobuf_ctxt(BX), DX
MOVQ gobuf_bp(BX), BP
MOVQ $0, gobuf_sp(BX) // clear to help garbage collector
MOVQ $0, gobuf_ctxt(BX)
MOVQ $0, gobuf_bp(BX)
MOVQ gobuf_pc(BX), BX
JMP BX
最后我们再来看一下schedule()
方法,总的来说在最终会找到一个待执行的g,并执行execute()
。
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
mp := getg().m
if mp.locks != 0 {
throw("schedule: holding locks")
}
if mp.lockedg != 0 {
stoplockedm()
execute(mp.lockedg.ptr(), false) // Never returns.
}
// We should not schedule away from a g that is executing a cgo call,
// since the cgo call is using the m's g0 stack.
if mp.incgo {
throw("schedule: in cgo")
}
top:
pp := mp.p.ptr()
pp.preempt = false
// Safety check: if we are spinning, the run queue should be empty.
// Check this before calling checkTimers, as that might call
// goready to put a ready goroutine on the local run queue.
if mp.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
throw("schedule: spinning with local work")
}
gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available
// findRunnable may have collected an allp snapshot. The snapshot is
// only required within findRunnable. Clear it to all GC to collect the
// slice.
mp.clearAllpSnapshot()
if debug.dontfreezetheworld > 0 && freezing.Load() {
// See comment in freezetheworld. We don't want to perturb
// scheduler state, so we didn't gcstopm in findRunnable, but
// also don't want to allow new goroutines to run.
//
// Deadlock here rather than in the findRunnable loop so if
// findRunnable is stuck in a loop we don't perturb that
// either.
lock(&deadlock)
lock(&deadlock)
}
// This thread is going to run a goroutine and is not spinning anymore,
// so if it was marked as spinning we need to reset it now and potentially
// start a new spinning M.
if mp.spinning {
resetspinning()
}
if sched.disable.user && !schedEnabled(gp) {
// Scheduling of this goroutine is disabled. Put it on
// the list of pending runnable goroutines for when we
// re-enable user scheduling and look again.
lock(&sched.lock)
if schedEnabled(gp) {
// Something re-enabled scheduling while we
// were acquiring the lock.
unlock(&sched.lock)
} else {
sched.disable.runnable.pushBack(gp)
unlock(&sched.lock)
goto top
}
}
// If about to schedule a not-normal goroutine (a GCworker or tracereader),
// wake a P if there is one.
if tryWakeP {
wakep()
}
if gp.lockedm != 0 {
// Hands off own p to the locked m,
// then blocks waiting for a new p.
startlockedm(gp)
goto top
}
execute(gp, inheritTime)
}
Semrelease
runtime_Semrelease 底层调用semrelease1
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}
在semrelease1 中,类似semacquire1:
- 获取信号量的等待队列
- 标记信号量加锁
- 如果等待队列中没有元素则直接返回
- 获取队首元素并解锁
- handoff 为false 不考虑"直接交接"模式
- readyWithTime(s, 5+skipframes)将元素sudog中的g 状态修改为可运行
func semrelease1(addr *uint32, handoff bool, skipframes int) {
root := semtable.rootFor(addr)
atomic.Xadd(addr, 1)
// Easy case: no waiters?
// This check must happen after the xadd, to avoid a missed wakeup
// (see loop in semacquire).
if root.nwait.Load() == 0 {
return
}
// Harder case: search for a waiter and wake it.
lockWithRank(&root.lock, lockRankRoot)
if root.nwait.Load() == 0 {
// The count is already consumed by another goroutine,
// so no need to wake up another goroutine.
unlock(&root.lock)
return
}
s, t0, tailtime := root.dequeue(addr)
if s != nil {
root.nwait.Add(-1)
}
unlock(&root.lock)
if s != nil { // May be slow or even yield, so unlock first
acquiretime := s.acquiretime
if acquiretime != 0 {
// Charge contention that this (delayed) unlock caused.
// If there are N more goroutines waiting beyond the
// one that's waking up, charge their delay as well, so that
// contention holding up many goroutines shows up as
// more costly than contention holding up a single goroutine.
// It would take O(N) time to calculate how long each goroutine
// has been waiting, so instead we charge avg(head-wait, tail-wait)*N.
// head-wait is the longest wait and tail-wait is the shortest.
// (When we do a lifo insertion, we preserve this property by
// copying the old head's acquiretime into the inserted new head.
// In that case the overall average may be slightly high, but that's fine:
// the average of the ends is only an approximation to the actual
// average anyway.)
// The root.dequeue above changed the head and tail acquiretime
// to the current time, so the next unlock will not re-count this contention.
dt0 := t0 - acquiretime
dt := dt0
if s.waiters != 0 {
dtail := t0 - tailtime
dt += (dtail + dt0) / 2 * int64(s.waiters)
}
mutexevent(dt, 3+skipframes)
}
if s.ticket != 0 {
throw("corrupted semaphore ticket")
}
if handoff && cansemacquire(addr) {
s.ticket = 1
}
readyWithTime(s, 5+skipframes)
if s.ticket == 1 && getg().m.locks == 0 && getg() != getg().m.g0 {
// Direct G handoff
//
// readyWithTime has added the waiter G as runnext in the
// current P; we now call the scheduler so that we start running
// the waiter G immediately.
//
// Note that waiter inherits our time slice: this is desirable
// to avoid having a highly contended semaphore hog the P
// indefinitely. goyield is like Gosched, but it emits a
// "preempted" trace event instead and, more importantly, puts
// the current G on the local runq instead of the global one.
// We only do this in the starving regime (handoff=true), as in
// the non-starving case it is possible for a different waiter
// to acquire the semaphore while we are yielding/scheduling,
// and this would be wasteful. We wait instead to enter starving
// regime, and then we start to do direct handoffs of ticket and P.
//
// See issue 33747 for discussion.
//
// We don't handoff directly if we're holding locks or on the
// system stack, since it's not safe to enter the scheduler.
goyield()
}
}
}
最终调用goready 方法
func readyWithTime(s *sudog, traceskip int) {
if s.releasetime != 0 {
s.releasetime = cputicks()
}
goready(s.g, traceskip)
}
goready 通过 systemstack 调用ready 方法,systemstack会切换到g0栈执行ready 方法
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
ready 中将g 的状态修改为_Grunnable
,并runqput
将g 加入到m的队尾,并尝试唤醒M,acquirem() 和releasem(mp)是相对的操作,用来对m进行锁操作。
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
status := readgstatus(gp)
// Mark runnable.
mp := acquirem() // disable preemption because it can be holding p in a local var
if status&^_Gscan != _Gwaiting {
dumpgstatus(gp)
throw("bad g->status in ready")
}
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
trace := traceAcquire()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.ok() {
trace.GoUnpark(gp, traceskip)
traceRelease(trace)
}
runqput(mp.p.ptr(), gp, next)
wakep()
releasem(mp)
}
systemstack和mcall
systemstack
和mcall
都是 Go 语言运行时中用于切换到系统栈执行代码的函数,但它们存在一些区别,具体如下:
调用发起方限制不同:
mcall
只能由非g0
的 goroutine 发起切换到g0
栈执行函数,并且在执行完指定函数后不会跳转回原来的 goroutine 栈,而是直接结束当前 goroutine 的执行。systemstack
可以由普通的 goroutine 或者g0
自身发起切换,在完成其中闭包函数调用后,会切换回到原本的 goroutine 栈继续执行。