Skip to content

Commit d7f2b58

Browse files
committed
Fix rare RACK race condition
1 parent 88415cd commit d7f2b58

File tree

1 file changed

+50
-18
lines changed

1 file changed

+50
-18
lines changed

association.go

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,10 @@ type Association struct {
238238
rackKeepInflatedRecoveries int // keep inflated reoWnd for 16 loss recoveries
239239
rackTimerMu sync.Mutex
240240
rackTimer *time.Timer // arms when outstanding but not (yet) overdue
241+
rackTimerGen uint32
241242
ptoTimerMu sync.Mutex
242-
ptoTimer *time.Timer // Tail Loss Probe timer
243+
ptoTimer *time.Timer // Tail Loss Probe timer
244+
ptoTimerGen uint32
243245
rackWCDelAck time.Duration // 200ms default
244246
rackReoWndFloor time.Duration
245247

@@ -299,7 +301,7 @@ type Config struct {
299301
// Step of congestion window increase at Congestion Avoidance
300302
CwndCAStep uint32
301303

302-
// RACK loss detection: DISABLED by default (although it should probably be enabled by default..?)
304+
// Whether RACK loss detection is disabled (default: false, which means RACK is enabled)
303305
DisableRACK bool
304306
// Optional: cap the minimum reordering window: 0 = use quarter-RTT
305307
RACKReoWndFloor time.Duration
@@ -3030,60 +3032,84 @@ func (a *Association) completeHandshake(handshakeErr error) bool {
30303032
return false
30313033
}
30323034

3033-
func (a *Association) startRackTimer(d time.Duration) {
3034-
if !a.rackEnabled || d <= 0 {
3035+
func (a *Association) startRackTimer(dur time.Duration) {
3036+
if !a.rackEnabled || dur <= 0 {
30353037
return
30363038
}
30373039

30383040
a.rackTimerMu.Lock()
30393041
defer a.rackTimerMu.Unlock()
30403042

3043+
gen := atomic.AddUint32(&a.rackTimerGen, 1)
3044+
30413045
if a.rackTimer != nil {
30423046
a.rackTimer.Stop()
30433047
}
30443048

3045-
a.rackTimer = time.AfterFunc(d, a.onRackTimeout)
3049+
a.rackTimer = time.AfterFunc(dur, func() {
3050+
// re-check after acquiring the state lock.
3051+
a.lock.Lock()
3052+
defer a.lock.Unlock()
3053+
3054+
if atomic.LoadUint32(&a.rackTimerGen) != gen {
3055+
return
3056+
}
3057+
3058+
a.onRackTimeoutLocked()
3059+
})
30463060
}
30473061

30483062
func (a *Association) stopRackTimer() {
30493063
a.rackTimerMu.Lock()
3064+
defer a.rackTimerMu.Unlock()
3065+
3066+
atomic.AddUint32(&a.rackTimerGen, 1)
30503067

30513068
if a.rackTimer != nil {
30523069
a.rackTimer.Stop()
30533070
}
3054-
3055-
a.rackTimerMu.Unlock()
30563071
}
30573072

3058-
func (a *Association) startPTOTimer(d time.Duration) {
3059-
if !a.rackEnabled || d <= 0 {
3073+
func (a *Association) startPTOTimer(dur time.Duration) {
3074+
if !a.rackEnabled || dur <= 0 {
30603075
return
30613076
}
30623077

30633078
a.ptoTimerMu.Lock()
30643079
defer a.ptoTimerMu.Unlock()
30653080

3081+
gen := atomic.AddUint32(&a.ptoTimerGen, 1)
3082+
30663083
if a.ptoTimer != nil {
30673084
a.ptoTimer.Stop()
30683085
}
30693086

3070-
a.ptoTimer = time.AfterFunc(d, a.onPTOTimer)
3087+
a.ptoTimer = time.AfterFunc(dur, func() {
3088+
// re-check after acquiring the state lock.
3089+
a.lock.Lock()
3090+
defer a.lock.Unlock()
3091+
3092+
if atomic.LoadUint32(&a.ptoTimerGen) != gen {
3093+
return
3094+
}
3095+
3096+
a.onPTOTimerLocked()
3097+
})
30713098
}
30723099

30733100
func (a *Association) stopPTOTimer() {
30743101
a.ptoTimerMu.Lock()
3102+
defer a.ptoTimerMu.Unlock()
3103+
3104+
atomic.AddUint32(&a.ptoTimerGen, 1)
30753105

30763106
if a.ptoTimer != nil {
30773107
a.ptoTimer.Stop()
30783108
}
3079-
3080-
a.ptoTimerMu.Unlock()
30813109
}
30823110

30833111
// onRackAfterSACK implements the RACK logic (RACK for SCTP section 2A/B, section 3) and TLP scheduling (section 2C).
3084-
//
3085-
//nolint:gocognit,cyclop
3086-
func (a *Association) onRackAfterSACK(
3112+
func (a *Association) onRackAfterSACK( // nolint:gocognit,cyclop
30873113
deliveredFound bool,
30883114
newestDeliveredSendTime time.Time,
30893115
newestDeliveredOrigTSN uint32,
@@ -3111,7 +3137,7 @@ func (a *Association) onRackAfterSACK(
31113137
} else {
31123138
base := max(a.rackMinRTT/4, a.rackReoWndFloor)
31133139
// if we have never seen reordering for this connection, set to zero *during loss recovery* (RACK for SCTP section 2B)
3114-
// we approximate during loss recovery with inFastRecovery or T3-Rtx pending. Outside recovery keep base.
3140+
// we approximate "during loss recovery" with inFastRecovery or T3-Rtx pending. outside recovery we can keep base.
31153141
if !a.rackReorderingSeen && (a.inFastRecovery || a.t3RTX.isRunning()) {
31163142
a.rackReoWnd = 0
31173143
} else if a.rackReoWnd == 0 {
@@ -3160,7 +3186,7 @@ func (a *Association) onRackAfterSACK(
31603186
a.name, chunk.tsn, chunk.since, a.rackDeliveredTime, a.rackReoWnd)
31613187
}
31623188
}
3163-
// if we marked anything, kick the writer
3189+
// if we marked anything then wake the writer
31643190
a.awakeWriteLoop()
31653191
}
31663192

@@ -3226,10 +3252,13 @@ func (a *Association) schedulePTOAfterSendLocked() {
32263252
}
32273253

32283254
// onRackTimeout is fired to avoid waiting for the next ACK.
3229-
func (a *Association) onRackTimeout() { //nolint:cyclop
3255+
func (a *Association) onRackTimeout() {
32303256
a.lock.Lock()
32313257
defer a.lock.Unlock()
3258+
a.onRackTimeoutLocked()
3259+
}
32323260

3261+
func (a *Association) onRackTimeoutLocked() { //nolint:cyclop
32333262
if !a.rackEnabled || a.rackDeliveredTime.IsZero() {
32343263
return
32353264
}
@@ -3261,7 +3290,10 @@ func (a *Association) onRackTimeout() { //nolint:cyclop
32613290
func (a *Association) onPTOTimer() {
32623291
a.lock.Lock()
32633292
defer a.lock.Unlock()
3293+
a.onPTOTimerLocked()
3294+
}
32643295

3296+
func (a *Association) onPTOTimerLocked() {
32653297
if !a.rackEnabled {
32663298
return
32673299
}

0 commit comments

Comments
 (0)