Skip to content

Commit cf92bf8

Browse files
committed
Update ack_timer to RFC 9260
1 parent a69c6a2 commit cf92bf8

File tree

2 files changed

+95
-53
lines changed

2 files changed

+95
-53
lines changed

ack_timer.go

Lines changed: 73 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,18 @@
44
package sctp
55

66
import (
7-
"math"
87
"sync"
98
"time"
109
)
1110

1211
const (
12+
// RFC 9260 section 6.2.1: the delayed SACK timer SHOULD be 200ms or less. Max 500ms.
1313
ackInterval time.Duration = 200 * time.Millisecond
1414
)
1515

16-
// ackTimerObserver is the inteface to an ack timer observer.
16+
// ackTimerObserver is the interface to an ack timer observer.
1717
type ackTimerObserver interface {
18-
onAckTimeout()
18+
onAckTimeout() // called to send a SACK now (either timer fired or 2nd packet rule)
1919
}
2020

2121
type ackTimerState uint8
@@ -26,74 +26,119 @@ const (
2626
ackTimerClosed
2727
)
2828

29-
// ackTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1.
29+
// ackTimer provides the delayed-ACK behavior per RFC 9260 section 6.2.1.
3030
type ackTimer struct {
3131
timer *time.Timer
3232
observer ackTimerObserver
33-
mutex sync.Mutex
34-
state ackTimerState
35-
pending uint8
33+
34+
mutex sync.Mutex
35+
state ackTimerState
36+
packets uint8 // number of packets observed since last SACK trigger (0 or 1 while running)
3637
}
3738

3839
// newAckTimer creates a new acknowledgement timer used to enable delayed ack.
3940
func newAckTimer(observer ackTimerObserver) *ackTimer {
4041
t := &ackTimer{observer: observer}
41-
t.timer = time.AfterFunc(math.MaxInt64, t.timeout)
42-
t.timer.Stop()
42+
t.timer = time.NewTimer(time.Hour)
43+
44+
if !t.timer.Stop() {
45+
<-t.timer.C
46+
}
4347

4448
return t
4549
}
4650

4751
func (t *ackTimer) timeout() {
4852
t.mutex.Lock()
49-
if t.pending--; t.pending == 0 && t.state == ackTimerStarted {
50-
t.state = ackTimerStopped
51-
defer t.observer.onAckTimeout()
53+
54+
if t.state != ackTimerStarted {
55+
t.mutex.Unlock()
56+
57+
return
5258
}
59+
60+
t.state = ackTimerStopped
61+
t.packets = 0
62+
obs := t.observer
63+
5364
t.mutex.Unlock()
65+
66+
// Trigger SACK on timer expiry.
67+
obs.onAckTimeout()
5468
}
5569

56-
// start starts the timer.
70+
// start notes an arriving packet eligible for delayed SACK logic.
71+
// 1st call while stopped: arm timer.
72+
// 2nd call while timer running: send ACK immediately and stop timer.
5773
func (t *ackTimer) start() bool {
5874
t.mutex.Lock()
5975
defer t.mutex.Unlock()
6076

61-
// this timer is already closed or already running
62-
if t.state != ackTimerStopped {
77+
if t.state == ackTimerClosed {
6378
return false
6479
}
6580

66-
t.state = ackTimerStarted
67-
t.pending++
68-
t.timer.Reset(ackInterval)
81+
switch t.state {
82+
case ackTimerStopped:
83+
t.state = ackTimerStarted
84+
t.packets = 1
85+
86+
// Arm timer to ackInterval and hook timeout.
87+
t.timer.Reset(ackInterval)
88+
89+
go func() { // isolate timer callback
90+
<-t.timer.C
91+
t.timeout()
92+
}()
93+
94+
return true
6995

70-
return true
96+
case ackTimerStarted:
97+
// Second packet rule: send SACK immediately.
98+
t.packets++
99+
100+
if t.packets >= 2 {
101+
t.timer.Stop()
102+
103+
t.state = ackTimerStopped
104+
t.packets = 0
105+
obs := t.observer
106+
107+
// Fire outside the lock to avoid re-entrancy issues.
108+
go obs.onAckTimeout()
109+
}
110+
111+
return true
112+
113+
default: // closed
114+
return false
115+
}
71116
}
72117

73-
// stops the timer. this is similar to stop() but subsequent start() call
74-
// will fail (the timer is no longer usable).
118+
// stop cancels a running delayed-ACK cycle (no immediate ACK).
75119
func (t *ackTimer) stop() {
76120
t.mutex.Lock()
77121
defer t.mutex.Unlock()
78122

79123
if t.state == ackTimerStarted {
80-
if t.timer.Stop() {
81-
t.pending--
82-
}
124+
t.timer.Stop()
125+
83126
t.state = ackTimerStopped
127+
t.packets = 0
84128
}
85129
}
86130

87-
// closes the timer. this is similar to stop() but subsequent start() call
88-
// will fail (the timer is no longer usable).
131+
// close permanently disables the timer; subsequent start() calls are ignored.
89132
func (t *ackTimer) close() {
90133
t.mutex.Lock()
91134
defer t.mutex.Unlock()
92135

93-
if t.state == ackTimerStarted && t.timer.Stop() {
94-
t.pending--
136+
if t.state == ackTimerStarted {
137+
t.timer.Stop()
95138
}
139+
96140
t.state = ackTimerClosed
141+
t.packets = 0
97142
}
98143

99144
// isRunning tests if the timer is running.

ack_timer_test.go

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,50 +22,48 @@ func (o *testAckTimerObserver) onAckTimeout() {
2222
}
2323

2424
func TestAckTimer(t *testing.T) {
25-
t.Run("start and close", func(t *testing.T) {
25+
t.Run("start and close (second packet rule)", func(t *testing.T) {
2626
var nCbs uint32
2727
rt := newAckTimer(&testAckTimerObserver{
2828
onAckTO: func() {
29-
t.Log("ack timed out")
3029
atomic.AddUint32(&nCbs, 1)
3130
},
3231
})
3332

3433
for i := 0; i < 2; i++ {
35-
// should start ok
34+
// First eligible packet -> arm delayed SACK timer
3635
ok := rt.start()
37-
assert.True(t, ok, "start() should succeed")
38-
assert.True(t, rt.isRunning(), "should be running")
36+
assert.True(t, ok, "first start() should succeed")
37+
assert.True(t, rt.isRunning(), "timer should be running after first start")
3938

40-
// subsequent start is a noop
39+
// Second eligible packet -> immediate SACK, timer stops (RFC 9260 section 6.2.1)
4140
ok = rt.start()
42-
assert.False(t, ok, "start() should NOT succeed once closed")
43-
assert.True(t, rt.isRunning(), "should be running")
41+
assert.True(t, ok, "second start() should be accepted and trigger immediate ACK")
4442

45-
// Sleep more than 2 * 200msec interval to test if it times out only once
46-
time.Sleep(ackInterval*2 + 50*time.Millisecond)
43+
// Give the async callback time to fire.
44+
time.Sleep(20 * time.Millisecond)
4745

4846
assert.Equalf(t, uint32(1), atomic.LoadUint32(&nCbs),
49-
"should be called once (actual: %d)", atomic.LoadUint32(&nCbs))
47+
"immediate ACK should have fired exactly once (iter %d)", i)
48+
assert.False(t, rt.isRunning(), "timer should have stopped after immediate ACK")
5049

50+
// Reset for next iteration
5151
atomic.StoreUint32(&nCbs, 0)
5252
}
5353

54-
// should close ok
54+
// Close disables future starts
5555
rt.close()
56-
assert.False(t, rt.isRunning(), "should not be running")
56+
assert.False(t, rt.isRunning(), "timer should not be running after close")
5757

58-
// once closed, it cannot start
5958
ok := rt.start()
6059
assert.False(t, ok, "start() should NOT succeed once closed")
61-
assert.False(t, rt.isRunning(), "should not be running")
60+
assert.False(t, rt.isRunning(), "still not running after refused start on closed timer")
6261
})
6362

64-
t.Run("start and stop", func(t *testing.T) {
63+
t.Run("start and stop (no timeout)", func(t *testing.T) {
6564
var nCbs uint32
6665
rt := newAckTimer(&testAckTimerObserver{
6766
onAckTO: func() {
68-
t.Log("ack timed out")
6967
atomic.AddUint32(&nCbs, 1)
7068
},
7169
})
@@ -74,26 +72,25 @@ func TestAckTimer(t *testing.T) {
7472
// should start ok
7573
ok := rt.start()
7674
assert.True(t, ok, "start() should succeed")
77-
assert.True(t, rt.isRunning(), "should be running")
75+
assert.True(t, rt.isRunning(), "timer should be running")
7876

7977
// stop immedidately
8078
rt.stop()
81-
assert.False(t, rt.isRunning(), "should not be running")
79+
assert.False(t, rt.isRunning(), "timer should not be running after stop()")
8280
}
8381

84-
// Sleep more than 200msec of interval to test if it never times out
82+
// Wait > interval to ensure no stray timeout happened.
8583
time.Sleep(ackInterval + 50*time.Millisecond)
86-
8784
assert.Equalf(t, uint32(0), atomic.LoadUint32(&nCbs),
88-
"should not be timed out (actual: %d)", atomic.LoadUint32(&nCbs))
85+
"no ACK should have fired after stop() (got %d)", atomic.LoadUint32(&nCbs))
8986

9087
// can start again
9188
ok := rt.start()
9289
assert.True(t, ok, "start() should succeed again")
93-
assert.True(t, rt.isRunning(), "should be running")
90+
assert.True(t, rt.isRunning(), "timer should be running")
9491

95-
// should close ok
92+
// Close disables and stops
9693
rt.close()
97-
assert.False(t, rt.isRunning(), "should not be running")
94+
assert.False(t, rt.isRunning(), "timer should not be running after close")
9895
})
9996
}

0 commit comments

Comments
 (0)