goroutine 阻塞和唤醒

Go 运行时通过 Semacquire(goroutine 尝试获取信号量,失败则用 sudog 记录状态并挂起)和 Semrelease(释放信号量后唤醒等待的 goroutine,改状态为可运行并加入调度)实现资源同步,过程中依赖 g0 栈切换工具(mcall/systemstack)保障安全操作,最终支撑 goroutine 轻量级并发的高效调度。

Semacquire

runtime_SemacquireRWMutex位于runtime/sema.go,

func sync_runtime_SemacquireRWMutexR(addr *uint32, lifo bool, skipframes int) {
    semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes, waitReasonSyncRWMutexRLock)
}

semacquire1 大致流程如下:

  1. 获取当前goroutin

  2. 如果信号量的值是1 则说明没有其他goroutin 占用,将值-1 并返回

  3. 从 P 中获取一个sudog 对象

  4. 执行for 循环:1)将当前sudog 加入信号量的等待队列,该m 会尝试调度其他g 执行,这个g会等待信号量被唤醒,若此时信号量的值是1 则说明没有其他goroutin 占用,将值-1 并继续执行后续逻辑

  5. 释放sudog

sudog 是用于管理 goroutine 等待状态的关键结构体(例如在信号量、锁、channel 等同步操作中),其生命周期通常是:

  1. 当 goroutine 需要等待资源时,通过 acquireSudog() 从缓存池获取一个 sudog
  2. 使用 sudog 记录等待信息(如关联的 goroutine、等待的资源地址、在等待队列中的位置等);
  3. 当 goroutine 被唤醒(获取到资源)后,sudog 不再需要,通过 releaseSudog(s) 将其归还到缓存池。
for {
    lockWithRank(&root.lock, lockRankRoot)
    // Add ourselves to nwait to disable "easy case" in semrelease.
    root.nwait.Add(1)
    // Check cansemacquire to avoid missed wakeup.
    if cansemacquire(addr) {
       root.nwait.Add(-1)
       unlock(&root.lock)
       break
    }
    // Any semrelease after the cansemacquire knows we're waiting
    // (we set nwait above), so go to sleep.
    root.queue(addr, s, lifo)
    goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
    if s.ticket != 0 || cansemacquire(addr) {
       break
    }
 }

这段循环中会root.nwait.Add(1), 这是与semrelease逻辑配合使用的, semrelease依据 root.nwait 决定是否唤醒等待者

当其他 goroutine 释放信号量(semrelease)时,会检查 root.nwait 的值:

  • root.nwait == 0(无等待者),semrelease 只需简单增加信号量计数(快速释放);
  • root.nwait > 0(有等待者),semrelease 必须从等待队列中唤醒一个等待者(慢速释放),并减少 root.nwait 计数。
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
    gp := getg()
    if gp != gp.m.curg {
       throw("semacquire not on the G stack")
    }

    // Easy case.
    if cansemacquire(addr) {
       return
    }

    // Harder case:
    // increment waiter count
    // try cansemacquire one more time, return if succeeded
    // enqueue itself as a waiter
    // sleep
    // (waiter descriptor is dequeued by signaler)
    s := acquireSudog()
    root := semtable.rootFor(addr)
    t0 := int64(0)
    s.releasetime = 0
    s.acquiretime = 0
    s.ticket = 0
    if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
       t0 = cputicks()
       s.releasetime = -1
    }
    if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
       if t0 == 0 {
          t0 = cputicks()
       }
       s.acquiretime = t0
    }
    for {
       lockWithRank(&root.lock, lockRankRoot)
       // Add ourselves to nwait to disable "easy case" in semrelease.
       root.nwait.Add(1)
       // Check cansemacquire to avoid missed wakeup.
       if cansemacquire(addr) {
          root.nwait.Add(-1)
          unlock(&root.lock)
          break
       }
       // Any semrelease after the cansemacquire knows we're waiting
       // (we set nwait above), so go to sleep.
       root.queue(addr, s, lifo)
       goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
       if s.ticket != 0 || cansemacquire(addr) {
          break
       }
    }
    if s.releasetime > 0 {
       blockevent(s.releasetime-t0, 3+skipframes)
    }
    releaseSudog(s)
}

goparkunlock会将当前 goroutine 切换到 "等待状态",同时会释放(解锁)之前持有的锁(对应信号量等待队列的锁)

// Puts the current goroutine into a waiting state and unlocks the lock.
// The goroutine can be made runnable again by calling goready(gp).
func goparkunlock(lock *mutex, reason waitReason, traceReason traceBlockReason, traceskip int) {
    gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceReason, traceskip)
}

reason 为 waitReasonSemacquire。 gopark会将该g 对应的m 加锁,并且保存需要解锁的资源(对应信号量等待队列的锁),以及解锁的回调函数。然后释放m。最后执行macll 函数,mcall 是一个特殊函数,用于从 G 的用户栈切换到 M 的g0执行指定函数(park_m)。

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceReason traceBlockReason, traceskip int) {
    if reason != waitReasonSleep {
       checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
    }
    mp := acquirem()
    gp := mp.curg
    status := readgstatus(gp)
    if status != _Grunning && status != _Gscanrunning {
       throw("gopark: bad g status")
    }
    mp.waitlock = lock
    mp.waitunlockf = unlockf
    gp.waitreason = reason
    mp.waitTraceBlockReason = traceReason
    mp.waitTraceSkip = traceskip
    releasem(mp)
    // can't do anything that might move the G between Ms here.
    mcall(park_m)
}

mcall是通过汇编代码实现,具体位置可参考 src/runtime/asm_amd64.s,用处是将执行栈从当前 goroutine(G)的用户栈切换到系统线程(M)的g0 栈,并在 kernel 栈上执行指定函数 fnfn此处为park_m,参数为当前goroutine(G)。

// func mcall(fn func(*g))
// Switch to m->g0's stack, call fn(g).
// Fn must never return. It should gogo(&g->sched)
// to keep running g.
TEXT runtime·mcall<ABIInternal>(SB), NOSPLIT, $0-8
    MOVQ   AX, DX // DX = fn

    // Save state in g->sched. The caller's SP and PC are restored by gogo to
    // resume execution in the caller's frame (implicit return). The caller's BP
    // is also restored to support frame pointer unwinding.
    MOVQ   SP, BX // hide (SP) reads from vet
    MOVQ   8(BX), BX  // caller's PC
    MOVQ   BX, (g_sched+gobuf_pc)(R14)
    LEAQ   fn+0(FP), BX   // caller's SP
    MOVQ   BX, (g_sched+gobuf_sp)(R14)
    // Get the caller's frame pointer by dereferencing BP. Storing BP as it is
    // can cause a frame pointer cycle, see CL 476235.
    MOVQ   (BP), BX // caller's BP
    MOVQ   BX, (g_sched+gobuf_bp)(R14)

    // switch to m->g0 & its stack, call fn
    MOVQ   g_m(R14), BX
    MOVQ   m_g0(BX), SI   // SI = g.m.g0
    CMPQ   SI, R14    // if g == m->g0 call badmcall
    JNE    goodm
    JMP    runtime·badmcall(SB)
goodm:
    MOVQ   R14, AX       // AX (and arg 0) = g
    MOVQ   SI, R14       // g = g.m.g0
    get_tls(CX)       // Set G in TLS
    MOVQ   R14, g(CX)
    MOVQ   (g_sched+gobuf_sp)(R14), SP    // sp = g0.sched.sp
    MOVQ   $0, BP // clear frame pointer, as caller may execute on another M
    PUSHQ  AX // open up space for fn's arg spill slot
    MOVQ   0(DX), R12
    CALL   R12       // fn(g)
    // The Windows native stack unwinder incorrectly classifies the next instruction
    // as part of the function epilogue, producing a wrong call stack.
    // Add a NOP to work around this issue. See go.dev/issue/67007.
    BYTE   $0x90
    POPQ   AX
    JMP    runtime·badmcall2(SB)
    RET

park_m 中:

  1. 首先将g 的状态转换为_Gwaiting
  2. dropg() 将m与当前 g解绑
  3. 然后会查看waitunlockf(信号量等待队列的锁解锁的回调函数),如果回调函数失败则execute(gp, true)继续执行 g,
  4. 否则通过schedule() 去调度新的g 执行。
// park continuation on g0.
func park_m(gp *g) {
    mp := getg().m

    trace := traceAcquire()

    // If g is in a synctest group, we don't want to let the group
    // become idle until after the waitunlockf (if any) has confirmed
    // that the park is happening.
    // We need to record gp.bubble here, since waitunlockf can change it.
    bubble := gp.bubble
    if bubble != nil {
       bubble.incActive()
    }

    if trace.ok() {
       // Trace the event before the transition. It may take a
       // stack trace, but we won't own the stack after the
       // transition anymore.
       trace.GoPark(mp.waitTraceBlockReason, mp.waitTraceSkip)
    }
    // N.B. Not using casGToWaiting here because the waitreason is
    // set by park_m's caller.
    casgstatus(gp, _Grunning, _Gwaiting)
    if trace.ok() {
       traceRelease(trace)
    }

    dropg()

    if fn := mp.waitunlockf; fn != nil {
       ok := fn(gp, mp.waitlock)
       mp.waitunlockf = nil
       mp.waitlock = nil
       if !ok {
          trace := traceAcquire()
          casgstatus(gp, _Gwaiting, _Grunnable)
          if bubble != nil {
             bubble.decActive()
          }
          if trace.ok() {
             trace.GoUnpark(gp, 2)
             traceRelease(trace)
          }
          execute(gp, true) // Schedule it back, never returns.
       }
    }

    if bubble != nil {
       bubble.decActive()
    }

    schedule()
}

在此我们先介绍一下execute 方法:

  1. 绑定 MG 的关联关系;
  2. G 的状态从 “就绪” 改为 “运行”;
  3. 初始化运行所需的状态(栈检查阈值、时间片等);
  4. 通过 gogo 切换到 G 的执行上下文,正式开始执行。
func execute(gp *g, inheritTime bool) {
    //代码运行在 M 的 g0 栈 上,而非被调度的 gp 栈上,所以可以获得m
    mp := getg().m

    if goroutineProfile.active {
       // Make sure that gp has had its stack written out to the goroutine
       // profile, exactly as it was when the goroutine profiler first stopped
       // the world.
       tryRecordGoroutineProfile(gp, nil, osyield)
    }

    // Assign gp.m before entering _Grunning so running Gs have an M.
    mp.curg = gp
    gp.m = mp
    gp.syncSafePoint = false // Clear the flag, which may have been set by morestack.
    casgstatus(gp, _Grunnable, _Grunning)
    gp.waitsince = 0
    gp.preempt = false
    gp.stackguard0 = gp.stack.lo + stackGuard
    if !inheritTime {
       mp.p.ptr().schedtick++
    }

    // Check whether the profiler needs to be turned on or off.
    hz := sched.profilehz
    if mp.profilehz != hz {
       setThreadCPUProfiler(hz)
    }

    trace := traceAcquire()
    if trace.ok() {
       trace.GoStart()
       traceRelease(trace)
    }

    gogo(&gp.sched)
}

gogo 是Go 运行时实现 “goroutine 状态恢复” 的核心函数,它通过从 gobuf 结构中读取保存的上下文(PC、SP、BP 等),将 G 的执行状态无缝恢复到被挂起前的状态,是实现 goroutine 阻塞与唤醒、调度切换的底层关键机制。

// func gogo(buf *gobuf)
// restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), NOSPLIT, $0-8
    MOVQ   buf+0(FP), BX     // gobuf
    MOVQ   gobuf_g(BX), DX
    MOVQ   0(DX), CX     // make sure g != nil
    JMP    gogo<>(SB)

TEXT gogo<>(SB), NOSPLIT, $0
    get_tls(CX)
    MOVQ   DX, g(CX)
    MOVQ   DX, R14       // set the g register
    MOVQ   gobuf_sp(BX), SP   // restore SP
    MOVQ   gobuf_ctxt(BX), DX
    MOVQ   gobuf_bp(BX), BP
    MOVQ   $0, gobuf_sp(BX)   // clear to help garbage collector
    MOVQ   $0, gobuf_ctxt(BX)
    MOVQ   $0, gobuf_bp(BX)
    MOVQ   gobuf_pc(BX), BX
    JMP    BX

最后我们再来看一下schedule()方法,总的来说在最终会找到一个待执行的g,并执行execute()

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
    mp := getg().m

    if mp.locks != 0 {
       throw("schedule: holding locks")
    }

    if mp.lockedg != 0 {
       stoplockedm()
       execute(mp.lockedg.ptr(), false) // Never returns.
    }

    // We should not schedule away from a g that is executing a cgo call,
    // since the cgo call is using the m's g0 stack.
    if mp.incgo {
       throw("schedule: in cgo")
    }

top:
    pp := mp.p.ptr()
    pp.preempt = false

    // Safety check: if we are spinning, the run queue should be empty.
    // Check this before calling checkTimers, as that might call
    // goready to put a ready goroutine on the local run queue.
    if mp.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
       throw("schedule: spinning with local work")
    }

    gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available

    // findRunnable may have collected an allp snapshot. The snapshot is
    // only required within findRunnable. Clear it to all GC to collect the
    // slice.
    mp.clearAllpSnapshot()

    if debug.dontfreezetheworld > 0 && freezing.Load() {
       // See comment in freezetheworld. We don't want to perturb
       // scheduler state, so we didn't gcstopm in findRunnable, but
       // also don't want to allow new goroutines to run.
       //
       // Deadlock here rather than in the findRunnable loop so if
       // findRunnable is stuck in a loop we don't perturb that
       // either.
       lock(&deadlock)
       lock(&deadlock)
    }

    // This thread is going to run a goroutine and is not spinning anymore,
    // so if it was marked as spinning we need to reset it now and potentially
    // start a new spinning M.
    if mp.spinning {
       resetspinning()
    }

    if sched.disable.user && !schedEnabled(gp) {
       // Scheduling of this goroutine is disabled. Put it on
       // the list of pending runnable goroutines for when we
       // re-enable user scheduling and look again.
       lock(&sched.lock)
       if schedEnabled(gp) {
          // Something re-enabled scheduling while we
          // were acquiring the lock.
          unlock(&sched.lock)
       } else {
          sched.disable.runnable.pushBack(gp)
          unlock(&sched.lock)
          goto top
       }
    }

    // If about to schedule a not-normal goroutine (a GCworker or tracereader),
    // wake a P if there is one.
    if tryWakeP {
       wakep()
    }
    if gp.lockedm != 0 {
       // Hands off own p to the locked m,
       // then blocks waiting for a new p.
       startlockedm(gp)
       goto top
    }

    execute(gp, inheritTime)
}

Semrelease

runtime_Semrelease 底层调用semrelease1

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
    semrelease1(addr, handoff, skipframes)
}

在semrelease1 中,类似semacquire1:

  1. 获取信号量的等待队列
  2. 标记信号量加锁
  3. 如果等待队列中没有元素则直接返回
  4. 获取队首元素并解锁
  5. handoff 为false 不考虑"直接交接"模式
  6. readyWithTime(s, 5+skipframes)将元素sudog中的g 状态修改为可运行
func semrelease1(addr *uint32, handoff bool, skipframes int) {
    root := semtable.rootFor(addr)
    atomic.Xadd(addr, 1)

    // Easy case: no waiters?
    // This check must happen after the xadd, to avoid a missed wakeup
    // (see loop in semacquire).
    if root.nwait.Load() == 0 {
       return
    }

    // Harder case: search for a waiter and wake it.
    lockWithRank(&root.lock, lockRankRoot)
    if root.nwait.Load() == 0 {
       // The count is already consumed by another goroutine,
       // so no need to wake up another goroutine.
       unlock(&root.lock)
       return
    }
    s, t0, tailtime := root.dequeue(addr)
    if s != nil {
       root.nwait.Add(-1)
    }
    unlock(&root.lock)
    if s != nil { // May be slow or even yield, so unlock first
       acquiretime := s.acquiretime
       if acquiretime != 0 {
          // Charge contention that this (delayed) unlock caused.
          // If there are N more goroutines waiting beyond the
          // one that's waking up, charge their delay as well, so that
          // contention holding up many goroutines shows up as
          // more costly than contention holding up a single goroutine.
          // It would take O(N) time to calculate how long each goroutine
          // has been waiting, so instead we charge avg(head-wait, tail-wait)*N.
          // head-wait is the longest wait and tail-wait is the shortest.
          // (When we do a lifo insertion, we preserve this property by
          // copying the old head's acquiretime into the inserted new head.
          // In that case the overall average may be slightly high, but that's fine:
          // the average of the ends is only an approximation to the actual
          // average anyway.)
          // The root.dequeue above changed the head and tail acquiretime
          // to the current time, so the next unlock will not re-count this contention.
          dt0 := t0 - acquiretime
          dt := dt0
          if s.waiters != 0 {
             dtail := t0 - tailtime
             dt += (dtail + dt0) / 2 * int64(s.waiters)
          }
          mutexevent(dt, 3+skipframes)
       }
       if s.ticket != 0 {
          throw("corrupted semaphore ticket")
       }
       if handoff && cansemacquire(addr) {
          s.ticket = 1
       }
       readyWithTime(s, 5+skipframes)
       if s.ticket == 1 && getg().m.locks == 0 && getg() != getg().m.g0 {
          // Direct G handoff
          //
          // readyWithTime has added the waiter G as runnext in the
          // current P; we now call the scheduler so that we start running
          // the waiter G immediately.
          //
          // Note that waiter inherits our time slice: this is desirable
          // to avoid having a highly contended semaphore hog the P
          // indefinitely. goyield is like Gosched, but it emits a
          // "preempted" trace event instead and, more importantly, puts
          // the current G on the local runq instead of the global one.
          // We only do this in the starving regime (handoff=true), as in
          // the non-starving case it is possible for a different waiter
          // to acquire the semaphore while we are yielding/scheduling,
          // and this would be wasteful. We wait instead to enter starving
          // regime, and then we start to do direct handoffs of ticket and P.
          //
          // See issue 33747 for discussion.
          //
          // We don't handoff directly if we're holding locks or on the
          // system stack, since it's not safe to enter the scheduler.
          goyield()
       }
    }
}

最终调用goready 方法

func readyWithTime(s *sudog, traceskip int) {
    if s.releasetime != 0 {
       s.releasetime = cputicks()
    }
    goready(s.g, traceskip)
}

goready 通过 systemstack 调用ready 方法,systemstack会切换到g0栈执行ready 方法

func goready(gp *g, traceskip int) {
    systemstack(func() {
       ready(gp, traceskip, true)
    })
}

ready 中将g 的状态修改为_Grunnable,并runqput将g 加入到m的队尾,并尝试唤醒M,acquirem() 和releasem(mp)是相对的操作,用来对m进行锁操作。

// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
    status := readgstatus(gp)

    // Mark runnable.
    mp := acquirem() // disable preemption because it can be holding p in a local var
    if status&^_Gscan != _Gwaiting {
       dumpgstatus(gp)
       throw("bad g->status in ready")
    }

    // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
    trace := traceAcquire()
    casgstatus(gp, _Gwaiting, _Grunnable)
    if trace.ok() {
       trace.GoUnpark(gp, traceskip)
       traceRelease(trace)
    }
    runqput(mp.p.ptr(), gp, next)
    wakep()
    releasem(mp)
}

systemstack和mcall

systemstackmcall都是 Go 语言运行时中用于切换到系统栈执行代码的函数,但它们存在一些区别,具体如下:

调用发起方限制不同:

mcall只能由非g0的 goroutine 发起切换到g0栈执行函数,并且在执行完指定函数后不会跳转回原来的 goroutine 栈,而是直接结束当前 goroutine 的执行。systemstack可以由普通的 goroutine 或者g0自身发起切换,在完成其中闭包函数调用后,会切换回到原本的 goroutine 栈继续执行。

updatedupdated2025-10-172025-10-17