Skip to content

Commit bb8671d

Browse files
committed
Improve RACK and PTO timers
1 parent 244b97f commit bb8671d

File tree

1 file changed

+157
-51
lines changed

1 file changed

+157
-51
lines changed

association.go

Lines changed: 157 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -241,14 +241,16 @@ type Association struct {
241241
rackHighestDeliveredOrigTSN uint32
242242
rackReorderingSeen bool // ever observed reordering for this association
243243
rackKeepInflatedRecoveries int // keep inflated reoWnd for 16 loss recoveries
244-
rackTimerMu sync.Mutex
245-
rackTimer *time.Timer // arms when outstanding but not (yet) overdue
246-
rackTimerGen uint32
247-
ptoTimerMu sync.Mutex
248-
ptoTimer *time.Timer // Tail Loss Probe timer
249-
ptoTimerGen uint32
250-
rackWCDelAck time.Duration // 200ms default
251-
rackReoWndFloor time.Duration
244+
245+
// Unified timer for RACK and PTO driven by a single goroutine.
246+
// Deadlines are protected with timerMu.
247+
timerMu sync.Mutex
248+
timerUpdateCh chan struct{}
249+
rackDeadline time.Time
250+
ptoDeadline time.Time
251+
252+
rackWCDelAck time.Duration // 200ms default
253+
rackReoWndFloor time.Duration
252254

253255
// Chunks stored for retransmission
254256
storedInit *chunkInit
@@ -440,6 +442,9 @@ func createAssociation(config Config) *Association {
440442
assoc.rackMinRTTWnd = 30 * time.Second // default 30s window to determine minRTT
441443
}
442444

445+
assoc.timerUpdateCh = make(chan struct{}, 1)
446+
go assoc.timerLoop()
447+
443448
assoc.rackReoWndFloor = config.RACKReoWndFloor // optional floor; usually 0
444449
assoc.rackKeepInflatedRecoveries = 0
445450
}
@@ -3046,78 +3051,179 @@ func (a *Association) completeHandshake(handshakeErr error) bool {
30463051
}
30473052

30483053
func (a *Association) startRackTimer(dur time.Duration) {
3049-
if !a.rackEnabled || dur <= 0 {
3054+
if !a.rackEnabled {
30503055
return
30513056
}
30523057

3053-
a.rackTimerMu.Lock()
3054-
defer a.rackTimerMu.Unlock()
3055-
3056-
gen := atomic.AddUint32(&a.rackTimerGen, 1)
3058+
a.timerMu.Lock()
30573059

3058-
if a.rackTimer != nil {
3059-
a.rackTimer.Stop()
3060+
if dur <= 0 {
3061+
a.rackDeadline = time.Time{}
3062+
} else {
3063+
a.rackDeadline = time.Now().Add(dur)
30603064
}
30613065

3062-
a.rackTimer = time.AfterFunc(dur, func() {
3063-
// re-check after acquiring the state lock.
3064-
a.lock.Lock()
3065-
defer a.lock.Unlock()
3066-
3067-
if atomic.LoadUint32(&a.rackTimerGen) != gen {
3068-
return
3069-
}
3066+
a.timerMu.Unlock()
30703067

3071-
a.onRackTimeoutLocked()
3072-
})
3068+
// poke the loop
3069+
select {
3070+
case a.timerUpdateCh <- struct{}{}:
3071+
default:
3072+
}
30733073
}
30743074

30753075
func (a *Association) stopRackTimer() {
3076-
a.rackTimerMu.Lock()
3077-
defer a.rackTimerMu.Unlock()
3076+
if !a.rackEnabled {
3077+
return
3078+
}
30783079

3079-
atomic.AddUint32(&a.rackTimerGen, 1)
3080+
a.timerMu.Lock()
3081+
a.rackDeadline = time.Time{}
3082+
a.timerMu.Unlock()
30803083

3081-
if a.rackTimer != nil {
3082-
a.rackTimer.Stop()
3084+
select {
3085+
case a.timerUpdateCh <- struct{}{}:
3086+
default:
30833087
}
30843088
}
30853089

30863090
func (a *Association) startPTOTimer(dur time.Duration) {
3087-
if !a.rackEnabled || dur <= 0 {
3091+
if !a.rackEnabled {
30883092
return
30893093
}
30903094

3091-
a.ptoTimerMu.Lock()
3092-
defer a.ptoTimerMu.Unlock()
3095+
a.timerMu.Lock()
30933096

3094-
gen := atomic.AddUint32(&a.ptoTimerGen, 1)
3097+
if dur <= 0 {
3098+
a.ptoDeadline = time.Time{}
3099+
} else {
3100+
a.ptoDeadline = time.Now().Add(dur)
3101+
}
3102+
3103+
a.timerMu.Unlock()
30953104

3096-
if a.ptoTimer != nil {
3097-
a.ptoTimer.Stop()
3105+
// poke the loop
3106+
select {
3107+
case a.timerUpdateCh <- struct{}{}:
3108+
default:
30983109
}
3110+
}
30993111

3100-
a.ptoTimer = time.AfterFunc(dur, func() {
3101-
// re-check after acquiring the state lock.
3102-
a.lock.Lock()
3103-
defer a.lock.Unlock()
3112+
func (a *Association) stopPTOTimer() {
3113+
if !a.rackEnabled {
3114+
return
3115+
}
31043116

3105-
if atomic.LoadUint32(&a.ptoTimerGen) != gen {
3106-
return
3107-
}
3117+
a.timerMu.Lock()
3118+
a.ptoDeadline = time.Time{}
3119+
a.timerMu.Unlock()
31083120

3109-
a.onPTOTimerLocked()
3110-
})
3121+
select {
3122+
case a.timerUpdateCh <- struct{}{}:
3123+
default:
3124+
}
31113125
}
31123126

3113-
func (a *Association) stopPTOTimer() {
3114-
a.ptoTimerMu.Lock()
3115-
defer a.ptoTimerMu.Unlock()
3127+
// drainTimer safely stops a timer and drains its channel if needed.
3128+
func drainTimer(t *time.Timer) {
3129+
if !t.Stop() {
3130+
select {
3131+
case <-t.C:
3132+
default:
3133+
}
3134+
}
3135+
}
3136+
3137+
// timerLoop runs one goroutine per association for RACK and PTO deadlines.
3138+
// this only runs if RACK is enabled.
3139+
func (a *Association) timerLoop() { //nolint:gocognit,cyclop
3140+
// begin with a disarmed timer.
3141+
timer := time.NewTimer(time.Hour)
3142+
drainTimer(timer)
3143+
armed := false
3144+
3145+
for {
3146+
// compute the earliest non-zero deadline.
3147+
a.timerMu.Lock()
3148+
rackDeadline := a.rackDeadline
3149+
ptoDeadline := a.ptoDeadline
3150+
a.timerMu.Unlock()
3151+
3152+
var next time.Time
3153+
switch {
3154+
case rackDeadline.IsZero():
3155+
next = ptoDeadline
3156+
case ptoDeadline.IsZero():
3157+
next = rackDeadline
3158+
default:
3159+
if rackDeadline.Before(ptoDeadline) {
3160+
next = rackDeadline
3161+
} else {
3162+
next = ptoDeadline
3163+
}
3164+
}
3165+
3166+
if next.IsZero() {
3167+
if armed {
3168+
drainTimer(timer)
3169+
armed = false
3170+
}
3171+
} else {
3172+
d := time.Until(next)
3173+
3174+
if d <= 0 {
3175+
d = time.Nanosecond
3176+
}
3177+
3178+
if armed {
3179+
drainTimer(timer)
3180+
}
3181+
3182+
timer.Reset(d)
3183+
armed = true
3184+
}
3185+
3186+
select {
3187+
case <-a.closeWriteLoopCh:
3188+
if armed {
3189+
drainTimer(timer)
3190+
}
3191+
3192+
return
3193+
3194+
case <-a.timerUpdateCh:
3195+
// re-compute deadlines and (re)arm in next loop iteration.
3196+
3197+
case <-timer.C:
3198+
armed = false
3199+
3200+
// snapshot & clear due deadlines before firing to avoid races with re-arms.
3201+
now := time.Now()
3202+
var fireRack, firePTO bool
31163203

3117-
atomic.AddUint32(&a.ptoTimerGen, 1)
3204+
a.timerMu.Lock()
31183205

3119-
if a.ptoTimer != nil {
3120-
a.ptoTimer.Stop()
3206+
if !a.rackDeadline.IsZero() && !now.Before(a.rackDeadline) {
3207+
fireRack = true
3208+
a.rackDeadline = time.Time{}
3209+
}
3210+
3211+
if !a.ptoDeadline.IsZero() && !now.Before(a.ptoDeadline) {
3212+
firePTO = true
3213+
a.ptoDeadline = time.Time{}
3214+
}
3215+
3216+
a.timerMu.Unlock()
3217+
3218+
// fire callbacks without holding timerMu.
3219+
if fireRack {
3220+
a.onRackTimeout()
3221+
}
3222+
3223+
if firePTO {
3224+
a.onPTOTimer()
3225+
}
3226+
}
31213227
}
31223228
}
31233229

0 commit comments

Comments
 (0)