Skip to content

Commit 655e193

Browse files
committed
Update ack_timer to use goroutines and RFC 9260
1 parent a69c6a2 commit 655e193

File tree

2 files changed

+211
-63
lines changed

2 files changed

+211
-63
lines changed

ack_timer.go

Lines changed: 145 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,103 +4,204 @@
44
package sctp
55

66
import (
7-
"math"
87
"sync"
98
"time"
109
)
1110

1211
const (
12+
// RFC 9260 delayed SACK timer recommendation: 200 ms. It must not exceed 500ms.
13+
// This timer only provides the max wait bound.
1314
ackInterval time.Duration = 200 * time.Millisecond
1415
)
1516

16-
// ackTimerObserver is the inteface to an ack timer observer.
1717
type ackTimerObserver interface {
1818
onAckTimeout()
1919
}
2020

21-
type ackTimerState uint8
21+
type timerCmdKind uint8
2222

2323
const (
24-
ackTimerStopped ackTimerState = iota
25-
ackTimerStarted
26-
ackTimerClosed
24+
cmdStart timerCmdKind = iota
25+
cmdStop
26+
cmdClose
27+
cmdIsRunning
2728
)
2829

29-
// ackTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1.
30+
type timerCmd struct {
31+
kind timerCmdKind
32+
resp chan bool // for start/stop/isRunning result, nil for close
33+
}
34+
35+
// ackTimer: single goroutine + reusable time.Timer, controlled by commands.
3036
type ackTimer struct {
31-
timer *time.Timer
3237
observer ackTimerObserver
33-
mutex sync.Mutex
34-
state ackTimerState
35-
pending uint8
38+
39+
mutex sync.Mutex
40+
closed bool
41+
42+
cmdCh chan timerCmd
43+
done chan struct{}
3644
}
3745

38-
// newAckTimer creates a new acknowledgement timer used to enable delayed ack.
3946
func newAckTimer(observer ackTimerObserver) *ackTimer {
40-
t := &ackTimer{observer: observer}
41-
t.timer = time.AfterFunc(math.MaxInt64, t.timeout)
42-
t.timer.Stop()
47+
t := &ackTimer{
48+
observer: observer,
49+
cmdCh: make(chan timerCmd),
50+
done: make(chan struct{}),
51+
}
52+
53+
go t.run()
4354

4455
return t
4556
}
4657

47-
func (t *ackTimer) timeout() {
48-
t.mutex.Lock()
49-
if t.pending--; t.pending == 0 && t.state == ackTimerStarted {
50-
t.state = ackTimerStopped
51-
defer t.observer.onAckTimeout()
58+
func (t *ackTimer) run() { //nolint:gocognit,cyclop
59+
timer := time.NewTimer(time.Hour)
60+
61+
// Ensure timer is fully stopped and channel drained.
62+
if !timer.Stop() {
63+
select {
64+
case <-timer.C:
65+
default:
66+
}
67+
}
68+
69+
var (
70+
running bool
71+
timerC <-chan time.Time // nil when not running
72+
)
73+
74+
for {
75+
select {
76+
case <-timerC:
77+
// timer fired, disarm then notify.
78+
running = false
79+
timerC = nil
80+
t.observer.onAckTimeout()
81+
82+
case cmd := <-t.cmdCh:
83+
switch cmd.kind {
84+
case cmdStart:
85+
if t.closed || running {
86+
if cmd.resp != nil {
87+
cmd.resp <- false
88+
}
89+
90+
break
91+
}
92+
93+
// Reset reusable timer to ackInterval.
94+
if !timer.Stop() {
95+
select {
96+
case <-timer.C:
97+
default:
98+
}
99+
}
100+
101+
timer.Reset(ackInterval)
102+
timerC = timer.C
103+
running = true
104+
105+
if cmd.resp != nil {
106+
cmd.resp <- true
107+
}
108+
109+
case cmdStop:
110+
if running {
111+
// Stop and drain if necessary.
112+
if !timer.Stop() {
113+
select {
114+
case <-timer.C:
115+
default:
116+
}
117+
}
118+
119+
timerC = nil
120+
running = false
121+
}
122+
123+
if cmd.resp != nil {
124+
cmd.resp <- true
125+
}
126+
127+
case cmdIsRunning:
128+
if cmd.resp != nil {
129+
cmd.resp <- running
130+
}
131+
132+
case cmdClose:
133+
// Stop and drain once, exit loop.
134+
if running && !timer.Stop() {
135+
select {
136+
case <-timer.C:
137+
default:
138+
}
139+
}
140+
141+
close(t.done)
142+
143+
return
144+
}
145+
}
52146
}
53-
t.mutex.Unlock()
54147
}
55148

56-
// start starts the timer.
149+
// start arms the timer and returns false if already started or closed.
57150
func (t *ackTimer) start() bool {
58151
t.mutex.Lock()
59152
defer t.mutex.Unlock()
60153

61-
// this timer is already closed or already running
62-
if t.state != ackTimerStopped {
154+
if t.closed {
63155
return false
64156
}
65157

66-
t.state = ackTimerStarted
67-
t.pending++
68-
t.timer.Reset(ackInterval)
158+
resp := make(chan bool, 1)
159+
t.cmdCh <- timerCmd{kind: cmdStart, resp: resp}
69160

70-
return true
161+
return <-resp
71162
}
72163

73-
// stops the timer. this is similar to stop() but subsequent start() call
74-
// will fail (the timer is no longer usable).
164+
// stop disarms the timer if running.
75165
func (t *ackTimer) stop() {
76166
t.mutex.Lock()
77167
defer t.mutex.Unlock()
78168

79-
if t.state == ackTimerStarted {
80-
if t.timer.Stop() {
81-
t.pending--
82-
}
83-
t.state = ackTimerStopped
169+
if t.closed {
170+
return
84171
}
172+
173+
resp := make(chan bool, 1)
174+
t.cmdCh <- timerCmd{kind: cmdStop, resp: resp}
175+
<-resp
85176
}
86177

87-
// closes the timer. this is similar to stop() but subsequent start() call
88-
// will fail (the timer is no longer usable).
178+
// close permanently disables the timer and cannot be started again.
89179
func (t *ackTimer) close() {
90180
t.mutex.Lock()
91-
defer t.mutex.Unlock()
181+
if t.closed {
182+
t.mutex.Unlock()
92183

93-
if t.state == ackTimerStarted && t.timer.Stop() {
94-
t.pending--
184+
return
95185
}
96-
t.state = ackTimerClosed
186+
187+
t.closed = true
188+
t.mutex.Unlock()
189+
190+
t.cmdCh <- timerCmd{kind: cmdClose}
191+
<-t.done
97192
}
98193

99-
// isRunning tests if the timer is running.
100-
// Debug purpose only.
194+
// isRunning is for debugging/tests.
101195
func (t *ackTimer) isRunning() bool {
102196
t.mutex.Lock()
103197
defer t.mutex.Unlock()
104198

105-
return t.state == ackTimerStarted
199+
if t.closed {
200+
return false
201+
}
202+
203+
resp := make(chan bool, 1)
204+
t.cmdCh <- timerCmd{kind: cmdIsRunning, resp: resp}
205+
206+
return <-resp
106207
}

ack_timer_test.go

Lines changed: 66 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,21 @@ func TestAckTimer(t *testing.T) {
2626
var nCbs uint32
2727
rt := newAckTimer(&testAckTimerObserver{
2828
onAckTO: func() {
29-
t.Log("ack timed out")
3029
atomic.AddUint32(&nCbs, 1)
3130
},
3231
})
3332

3433
for i := 0; i < 2; i++ {
35-
// should start ok
3634
ok := rt.start()
3735
assert.True(t, ok, "start() should succeed")
3836
assert.True(t, rt.isRunning(), "should be running")
3937

4038
// subsequent start is a noop
4139
ok = rt.start()
42-
assert.False(t, ok, "start() should NOT succeed once closed")
43-
assert.True(t, rt.isRunning(), "should be running")
40+
assert.False(t, ok, "start() should NOT succeed while already running")
41+
assert.True(t, rt.isRunning(), "should still be running")
4442

45-
// Sleep more than 2 * 200msec interval to test if it times out only once
43+
// Sleep more than 2 * ackInterval to ensure only one timeout occurs.
4644
time.Sleep(ackInterval*2 + 50*time.Millisecond)
4745

4846
assert.Equalf(t, uint32(1), atomic.LoadUint32(&nCbs),
@@ -51,11 +49,9 @@ func TestAckTimer(t *testing.T) {
5149
atomic.StoreUint32(&nCbs, 0)
5250
}
5351

54-
// should close ok
52+
// close prevents further starts
5553
rt.close()
56-
assert.False(t, rt.isRunning(), "should not be running")
57-
58-
// once closed, it cannot start
54+
assert.False(t, rt.isRunning(), "should not be running after close")
5955
ok := rt.start()
6056
assert.False(t, ok, "start() should NOT succeed once closed")
6157
assert.False(t, rt.isRunning(), "should not be running")
@@ -65,35 +61,86 @@ func TestAckTimer(t *testing.T) {
6561
var nCbs uint32
6662
rt := newAckTimer(&testAckTimerObserver{
6763
onAckTO: func() {
68-
t.Log("ack timed out")
6964
atomic.AddUint32(&nCbs, 1)
7065
},
7166
})
7267

7368
for i := 0; i < 2; i++ {
74-
// should start ok
7569
ok := rt.start()
7670
assert.True(t, ok, "start() should succeed")
7771
assert.True(t, rt.isRunning(), "should be running")
7872

79-
// stop immedidately
73+
// stop immediately
8074
rt.stop()
81-
assert.False(t, rt.isRunning(), "should not be running")
75+
assert.False(t, rt.isRunning(), "should not be running after stop()")
8276
}
8377

84-
// Sleep more than 200msec of interval to test if it never times out
78+
// ensure no callbacks after stops
8579
time.Sleep(ackInterval + 50*time.Millisecond)
86-
8780
assert.Equalf(t, uint32(0), atomic.LoadUint32(&nCbs),
88-
"should not be timed out (actual: %d)", atomic.LoadUint32(&nCbs))
81+
"should not time out (actual: %d)", atomic.LoadUint32(&nCbs))
8982

90-
// can start again
83+
// can start again after stop
9184
ok := rt.start()
9285
assert.True(t, ok, "start() should succeed again")
9386
assert.True(t, rt.isRunning(), "should be running")
9487

95-
// should close ok
9688
rt.close()
97-
assert.False(t, rt.isRunning(), "should not be running")
89+
assert.False(t, rt.isRunning(), "should not be running after close")
90+
})
91+
92+
t.Run("stop shortly before deadline prevents fire", func(t *testing.T) {
93+
var nCbs uint32
94+
rt := newAckTimer(&testAckTimerObserver{
95+
onAckTO: func() {
96+
atomic.AddUint32(&nCbs, 1)
97+
},
98+
})
99+
100+
ok := rt.start()
101+
assert.True(t, ok)
102+
assert.True(t, rt.isRunning())
103+
104+
// Stop just before the deadline; leave a little margin.
105+
time.Sleep(ackInterval/2 + 10*time.Millisecond)
106+
rt.stop()
107+
assert.False(t, rt.isRunning())
108+
109+
// Wait past the original deadline; must not fire.
110+
time.Sleep(ackInterval)
111+
assert.Equal(t, uint32(0), atomic.LoadUint32(&nCbs))
112+
})
113+
114+
t.Run("light start/stop stress (no leaks, no stray fires)", func(t *testing.T) {
115+
var nCbs uint32
116+
rt := newAckTimer(&testAckTimerObserver{
117+
onAckTO: func() {
118+
atomic.AddUint32(&nCbs, 1)
119+
},
120+
})
121+
122+
// Several quick start/stop cycles; none should fire.
123+
for i := 0; i < 10; i++ {
124+
ok := rt.start()
125+
// It's possible a previous iteration is still running if logic regressed; guard expectations.
126+
if ok {
127+
assert.True(t, rt.isRunning())
128+
}
129+
130+
// Keep well under the interval.
131+
time.Sleep(ackInterval / 10)
132+
rt.stop()
133+
assert.False(t, rt.isRunning())
134+
}
135+
136+
time.Sleep(ackInterval + 50*time.Millisecond)
137+
assert.Equal(t, uint32(0), atomic.LoadUint32(&nCbs), "no callbacks expected in stress loop")
138+
139+
// Final sanity: a clean start should fire exactly once.
140+
atomic.StoreUint32(&nCbs, 0)
141+
ok := rt.start()
142+
assert.True(t, ok)
143+
time.Sleep(ackInterval*2 + 50*time.Millisecond)
144+
assert.Equal(t, uint32(1), atomic.LoadUint32(&nCbs))
98145
})
99146
}

0 commit comments

Comments
 (0)