wait.Until 方法

wait.Until 方法可以按period频率执行 f 函数或方法,具体来说Until 会执行f 并在f 执行完成后等待period 然后继续执行f。直到stopCh 接受到消息。

func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
   JitterUntil(f, period, 0.0, true, stopCh)
}

使用实例: 间隔1s执行一次 c.runWorker

wait.Until(c.runWorker, time.Second, stopCh)

Until 调用了 JitterUntil,sliding 设为'true'

// JitterUntil loops until stop channel is closed, running f every period.
//
// If jitterFactor is positive, the period is jittered before every run of f.
// If jitterFactor is not positive, the period is unchanged and not jittered.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
//
// Close stopCh to stop. f may not be invoked if stop channel is already
// closed. Pass NeverStop to if you don't want it stop.
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
   BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
}

即执行BackoffUntil,其中根据NewJitteredBackoffManager 生成一个BackoffManagerImpl

// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
// is negative, backoff will not be jittered.
func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
   return &jitteredBackoffManagerImpl{
      clock:        c,
      duration:     duration,
      jitter:       jitter,
      backoffTimer: nil,
   }
}

BackoffUntil会在一个for 循环中首先判断是否接收到了stopCh,如果没有接收到则执行f (sliding 为true,如果sliding 为 false 则在执行f 前给timer 赋值),然后执行backoff.Backoff() 给timer赋值。执行完成f 以后,等待timer 到期,然后继续执行。

// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
   var t clock.Timer
   for {
      select {
      case <-stopCh:
         return
      default:
      }

      if !sliding {
         t = backoff.Backoff()
      }

      func() {
         defer runtime.HandleCrash()
         f()
      }()

      if sliding {
         t = backoff.Backoff()
      }

      // NOTE: b/c there is no priority selection in golang
      // it is possible for this to race, meaning we could
      // trigger t.C and stopCh, and t.C select falls through.
      // In order to mitigate we re-check stopCh at the beginning
      // of every loop to prevent extra executions of f().
      select {
      case <-stopCh:
         if !t.Stop() {
            <-t.C()
         }
         return
      case <-t.C():
      }
   }
}

backoff.Backoff() 首先需要获得 getNextBackoff(),jitter 是抖动参数,j.jitter > 0.0 则需要将duration 抖动,duration+=随机的一个零到一的数乘jitter。然后更新jitteredBackoffManagerImpl.backoffTimer 并返回。

type jitteredBackoffManagerImpl struct {
	clock        clock.Clock
	duration     time.Duration
	jitter       float64
	backoffTimer clock.Timer
}
func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
   jitteredPeriod := j.duration
   if j.jitter > 0.0 {
      jitteredPeriod = Jitter(j.duration, j.jitter)
   }
   return jitteredPeriod
}

// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for jittered backoff.
// The returned timer must be drained before calling Backoff() the second time
func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
   backoff := j.getNextBackoff()
   if j.backoffTimer == nil {
      j.backoffTimer = j.clock.NewTimer(backoff)
   } else {
      j.backoffTimer.Reset(backoff)
   }
   return j.backoffTimer
}

// Jitter returns a time.Duration between duration and duration + maxFactor *
// duration.
//
// This allows clients to avoid converging on periodic behavior. If maxFactor
// is 0.0, a suggested default value will be chosen.
func Jitter(duration time.Duration, maxFactor float64) time.Duration {
	if maxFactor <= 0.0 {
		maxFactor = 1.0
	}
	wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
	return wait
}
updatedupdated2024-11-012024-11-01