wait.Until 方法可以按period频率执行 f 函数或方法,具体来说Until 会执行f 并在f 执行完成后等待period 然后继续执行f。直到stopCh 接受到消息。
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, true, stopCh)
}
使用实例: 间隔1s执行一次 c.runWorker
wait.Until(c.runWorker, time.Second, stopCh)
Until 调用了 JitterUntil,sliding 设为'true'
// JitterUntil loops until stop channel is closed, running f every period.
//
// If jitterFactor is positive, the period is jittered before every run of f.
// If jitterFactor is not positive, the period is unchanged and not jittered.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
//
// Close stopCh to stop. f may not be invoked if stop channel is already
// closed. Pass NeverStop to if you don't want it stop.
func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
}
即执行BackoffUntil,其中根据NewJitteredBackoffManager 生成一个BackoffManagerImpl
// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
// is negative, backoff will not be jittered.
func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
return &jitteredBackoffManagerImpl{
clock: c,
duration: duration,
jitter: jitter,
backoffTimer: nil,
}
}
BackoffUntil会在一个for 循环中首先判断是否接收到了stopCh,如果没有接收到则执行f (sliding 为true,如果sliding 为 false 则在执行f 前给timer 赋值),然后执行backoff.Backoff() 给timer赋值。执行完成f 以后,等待timer 到期,然后继续执行。
// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.
//
// If sliding is true, the period is computed after f runs. If it is false then
// period includes the runtime for f.
func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
var t clock.Timer
for {
select {
case <-stopCh:
return
default:
}
if !sliding {
t = backoff.Backoff()
}
func() {
defer runtime.HandleCrash()
f()
}()
if sliding {
t = backoff.Backoff()
}
// NOTE: b/c there is no priority selection in golang
// it is possible for this to race, meaning we could
// trigger t.C and stopCh, and t.C select falls through.
// In order to mitigate we re-check stopCh at the beginning
// of every loop to prevent extra executions of f().
select {
case <-stopCh:
if !t.Stop() {
<-t.C()
}
return
case <-t.C():
}
}
}
backoff.Backoff() 首先需要获得 getNextBackoff(),jitter 是抖动参数,j.jitter > 0.0 则需要将duration 抖动,duration+=随机的一个零到一的数乘jitter。然后更新jitteredBackoffManagerImpl.backoffTimer 并返回。
type jitteredBackoffManagerImpl struct {
clock clock.Clock
duration time.Duration
jitter float64
backoffTimer clock.Timer
}
func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
jitteredPeriod := j.duration
if j.jitter > 0.0 {
jitteredPeriod = Jitter(j.duration, j.jitter)
}
return jitteredPeriod
}
// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for jittered backoff.
// The returned timer must be drained before calling Backoff() the second time
func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
backoff := j.getNextBackoff()
if j.backoffTimer == nil {
j.backoffTimer = j.clock.NewTimer(backoff)
} else {
j.backoffTimer.Reset(backoff)
}
return j.backoffTimer
}
// Jitter returns a time.Duration between duration and duration + maxFactor *
// duration.
//
// This allows clients to avoid converging on periodic behavior. If maxFactor
// is 0.0, a suggested default value will be chosen.
func Jitter(duration time.Duration, maxFactor float64) time.Duration {
if maxFactor <= 0.0 {
maxFactor = 1.0
}
wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
return wait
}