Skip to content

Commit 914a271

Browse files
committed
Extract minRTT window for RACK
1 parent 10c2088 commit 914a271

File tree

4 files changed

+158
-84
lines changed

4 files changed

+158
-84
lines changed

association.go

Lines changed: 19 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -229,15 +229,11 @@ type Association struct {
229229
ackTimer *ackTimer
230230

231231
// RACK / TLP state
232-
rackEnabled bool
233-
rackReoWnd time.Duration // dynamic reordering window
234-
rackMinRTT time.Duration // min observed RTT
235-
rackMinRTTWnd time.Duration // the window used to determine minRTT, defaults to 30s
236-
rackRTTDeque []struct { // sliding window of RTT samples within rackMinRTTWnd seconds
237-
time time.Time
238-
rtt time.Duration
239-
}
240-
rackDeliveredTime time.Time // send time of most recently delivered original chunk
232+
rackEnabled bool
233+
rackReoWnd time.Duration // dynamic reordering window
234+
rackMinRTT time.Duration // min observed RTT
235+
rackMinRTTWnd *windowedMin // the window used to determine minRTT, defaults to 30s
236+
rackDeliveredTime time.Time // send time of most recently delivered original chunk
241237
rackHighestDeliveredOrigTSN uint32
242238
rackReorderingSeen bool // ever observed reordering for this association
243239
rackKeepInflatedRecoveries int // keep inflated reoWnd for 16 loss recoveries
@@ -436,11 +432,8 @@ func createAssociation(config Config) *Association {
436432
assoc.rackWCDelAck = 200 * time.Millisecond // WCDelAckT, RACK for SCTP section 2C
437433
}
438434

439-
if config.RACKMinRTTWnd != 0 {
440-
assoc.rackMinRTTWnd = config.RACKMinRTTWnd
441-
} else {
442-
assoc.rackMinRTTWnd = 30 * time.Second // default 30s window to determine minRTT
443-
}
435+
// defaults to 30s window to determine minRTT
436+
assoc.rackMinRTTWnd = newWindowedMin(config.RACKMinRTTWnd)
444437

445438
assoc.timerUpdateCh = make(chan struct{}, 1)
446439
go assoc.timerLoop()
@@ -1791,7 +1784,7 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck)
17911784
// use a window to determine minRtt instead of a global min
17921785
// as the RTT can fluctuate, which can cause problems if going from a
17931786
// high RTT to a low RTT.
1794-
a.rackPushRTT(now, now.Sub(chunkPayload.since))
1787+
a.rackMinRTTWnd.Push(now, now.Sub(chunkPayload.since))
17951788

17961789
if chunkPayload.since.After(newestDeliveredSendTime) {
17971790
newestDeliveredSendTime = chunkPayload.since
@@ -1842,7 +1835,7 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck)
18421835

18431836
// RACK
18441837
if a.rackEnabled {
1845-
a.rackPushRTT(now, now.Sub(chunkPayload.since))
1838+
a.rackMinRTTWnd.Push(now, now.Sub(chunkPayload.since))
18461839

18471840
if chunkPayload.since.After(newestDeliveredSendTime) {
18481841
newestDeliveredSendTime = chunkPayload.since
@@ -3065,11 +3058,7 @@ func (a *Association) startRackTimer(dur time.Duration) {
30653058

30663059
a.timerMu.Unlock()
30673060

3068-
// poke the loop
3069-
select {
3070-
case a.timerUpdateCh <- struct{}{}:
3071-
default:
3072-
}
3061+
a.pokeTimerLoop()
30733062
}
30743063

30753064
func (a *Association) stopRackTimer() {
@@ -3081,10 +3070,7 @@ func (a *Association) stopRackTimer() {
30813070
a.rackDeadline = time.Time{}
30823071
a.timerMu.Unlock()
30833072

3084-
select {
3085-
case a.timerUpdateCh <- struct{}{}:
3086-
default:
3087-
}
3073+
a.pokeTimerLoop()
30883074
}
30893075

30903076
func (a *Association) startPTOTimer(dur time.Duration) {
@@ -3102,11 +3088,7 @@ func (a *Association) startPTOTimer(dur time.Duration) {
31023088

31033089
a.timerMu.Unlock()
31043090

3105-
// poke the loop
3106-
select {
3107-
case a.timerUpdateCh <- struct{}{}:
3108-
default:
3109-
}
3091+
a.pokeTimerLoop()
31103092
}
31113093

31123094
func (a *Association) stopPTOTimer() {
@@ -3118,10 +3100,7 @@ func (a *Association) stopPTOTimer() {
31183100
a.ptoDeadline = time.Time{}
31193101
a.timerMu.Unlock()
31203102

3121-
select {
3122-
case a.timerUpdateCh <- struct{}{}:
3123-
default:
3124-
}
3103+
a.pokeTimerLoop()
31253104
}
31263105

31273106
// drainTimer safely stops a timer and drains its channel if needed.
@@ -3250,7 +3229,7 @@ func (a *Association) onRackAfterSACK( // nolint:gocognit,cyclop,gocyclo
32503229
}
32513230

32523231
// 2) Maintain ReoWND (RACK for SCTP section 2B)
3253-
if minRTT := a.rackWindowMin(now); minRTT > 0 {
3232+
if minRTT := a.rackMinRTTWnd.Min(now); minRTT > 0 {
32543233
a.rackMinRTT = minRTT
32553234
}
32563235

@@ -3442,53 +3421,10 @@ func (a *Association) onPTOTimerLocked() {
34423421
}
34433422
}
34443423

3445-
// push a new RTT sample and keep a deque within [now - rackMinRTTWnd, now].
3446-
func (a *Association) rackPushRTT(now time.Time, rtt time.Duration) {
3447-
cutoff := now.Add(-a.rackMinRTTWnd)
3448-
3449-
// remove the expired rtts
3450-
deque := a.rackRTTDeque
3451-
i := 0
3452-
for i < len(deque) && deque[i].time.Before(cutoff) {
3453-
i++
3454-
}
3455-
3456-
if i > 0 {
3457-
deque = deque[i:]
3458-
}
3459-
3460-
// drop tails with >= rtt to keep nondecreasing by rtt
3461-
for len(deque) > 0 && deque[len(deque)-1].rtt >= rtt {
3462-
deque = deque[:len(deque)-1]
3463-
}
3464-
3465-
deque = append(deque, struct {
3466-
time time.Time
3467-
rtt time.Duration
3468-
}{time: now, rtt: rtt})
3469-
3470-
a.rackRTTDeque = deque
3471-
}
3472-
3473-
// return current windowed MinRTT (or 0 if no recent samples).
3474-
func (a *Association) rackWindowMin(now time.Time) time.Duration {
3475-
cutoff := now.Add(-a.rackMinRTTWnd)
3476-
deque := a.rackRTTDeque
3477-
3478-
// prune expired
3479-
i := 0
3480-
for i < len(deque) && deque[i].time.Before(cutoff) {
3481-
i++
3482-
}
3483-
3484-
if i > 0 {
3485-
deque = deque[i:]
3486-
a.rackRTTDeque = deque
3487-
}
3488-
3489-
if len(deque) == 0 {
3490-
return 0
3424+
func (a *Association) pokeTimerLoop() {
3425+
// enqueue a single wake-up without blocking.
3426+
select {
3427+
case a.timerUpdateCh <- struct{}{}:
3428+
default:
34913429
}
3492-
3493-
return deque[0].rtt
34943430
}

association_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3964,7 +3964,7 @@ func TestRACK_SuppressReoWndDuringRecovery_NoReorderingSeen(t *testing.T) {
39643964
assert.Equal(t, time.Duration(0), assoc.rackReoWnd, "reoWnd should stay 0 until a minRTT sample exists")
39653965

39663966
now := time.Now()
3967-
assoc.rackPushRTT(now, 120*time.Millisecond)
3967+
assoc.rackMinRTTWnd.Push(now, 120*time.Millisecond)
39683968

39693969
assoc.onRackAfterSACK(false, time.Time{}, 0, &chunkSelectiveAck{})
39703970
assert.Equal(

windowedmin.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
2+
// SPDX-License-Identifier: MIT
3+
4+
package sctp
5+
6+
import "time"
7+
8+
// windowedMin maintains a monotonic deque of (time,value) to answer
9+
// the minimum over a sliding window efficiently.
10+
// Not thread-safe; caller must synchronize (Association already does).
11+
type windowedMin struct {
12+
rackMinRTTWnd time.Duration
13+
deque []entry
14+
}
15+
16+
type entry struct {
17+
t time.Time
18+
v time.Duration
19+
}
20+
21+
func newWindowedMin(window time.Duration) *windowedMin {
22+
if window <= 0 {
23+
window = 30 * time.Second
24+
}
25+
26+
return &windowedMin{rackMinRTTWnd: window}
27+
}
28+
29+
// prune removes elements older than (now - wnd).
30+
func (window *windowedMin) prune(now time.Time) {
31+
cutoff := now.Add(-window.rackMinRTTWnd)
32+
i := 0
33+
for i < len(window.deque) && window.deque[i].t.Before(cutoff) {
34+
i++
35+
}
36+
37+
if i > 0 {
38+
window.deque = window.deque[i:]
39+
}
40+
}
41+
42+
// Push inserts a new sample and preserves monotonic non-decreasing values.
43+
func (window *windowedMin) Push(now time.Time, v time.Duration) {
44+
window.prune(now)
45+
46+
for n := len(window.deque); n > 0 && window.deque[n-1].v >= v; n-- {
47+
window.deque = window.deque[:n-1]
48+
}
49+
50+
window.deque = append(
51+
window.deque,
52+
entry{
53+
t: now,
54+
v: v,
55+
},
56+
)
57+
}
58+
59+
// Min returns the minimum value in the current window or 0 if empty.
60+
func (window *windowedMin) Min(now time.Time) time.Duration {
61+
window.prune(now)
62+
63+
if len(window.deque) == 0 {
64+
return 0
65+
}
66+
67+
return window.deque[0].v
68+
}
69+
70+
// Len is only for tests/diagnostics.
71+
func (window *windowedMin) Len() int {
72+
return len(window.deque)
73+
}

windowedmin_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
2+
// SPDX-License-Identifier: MIT
3+
4+
package sctp
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func TestWindowedMin_Basic(t *testing.T) {
14+
window := newWindowedMin(100 * time.Millisecond)
15+
base := time.Unix(0, 0)
16+
17+
window.Push(base, 30*time.Millisecond)
18+
assert.Equal(t, 30*time.Millisecond, window.Min(base))
19+
20+
window.Push(base.Add(10*time.Millisecond), 20*time.Millisecond)
21+
assert.Equal(t, 20*time.Millisecond, window.Min(base.Add(10*time.Millisecond)))
22+
23+
// larger value shouldn't change min
24+
window.Push(base.Add(20*time.Millisecond), 40*time.Millisecond)
25+
assert.Equal(t, 20*time.Millisecond, window.Min(base.Add(20*time.Millisecond)))
26+
27+
// decreasing again shrinks min
28+
window.Push(base.Add(30*time.Millisecond), 10*time.Millisecond)
29+
assert.Equal(t, 10*time.Millisecond, window.Min(base.Add(30*time.Millisecond)))
30+
}
31+
32+
func TestWindowedMin_WindowExpiry(t *testing.T) {
33+
window := newWindowedMin(50 * time.Millisecond)
34+
base := time.Unix(0, 0)
35+
36+
window.Push(base, 10*time.Millisecond) // t=0
37+
window.Push(base.Add(10*time.Millisecond), 20*time.Millisecond) // t=10ms
38+
39+
// at t=60ms, first sample is expired, m becomes 20ms
40+
m := window.Min(base.Add(60 * time.Millisecond))
41+
assert.Equal(t, 20*time.Millisecond, m)
42+
43+
// at t=200ms, all are expired -> 0
44+
m = window.Min(base.Add(200 * time.Millisecond))
45+
assert.Zero(t, m)
46+
}
47+
48+
func TestWindowedMin_EqualValues(t *testing.T) {
49+
window := newWindowedMin(1 * time.Second)
50+
base := time.Unix(0, 0)
51+
52+
window.Push(base, 15*time.Millisecond)
53+
window.Push(base.Add(1*time.Millisecond), 15*time.Millisecond)
54+
55+
assert.Equal(t, 1, window.Len())
56+
assert.Equal(t, 15*time.Millisecond, window.Min(base.Add(2*time.Millisecond)))
57+
}
58+
59+
func TestWindowedMin_DefaultWindow30s(t *testing.T) {
60+
zeroWnd := newWindowedMin(0)
61+
negativeWnd := newWindowedMin(-5 * time.Second)
62+
63+
assert.Equal(t, 30*time.Second, zeroWnd.rackMinRTTWnd)
64+
assert.Equal(t, 30*time.Second, negativeWnd.rackMinRTTWnd)
65+
}

0 commit comments

Comments
 (0)