Skip to content

Commit 877b27d

Browse files
committed
Rough implementation of RACK
1 parent a04388e commit 877b27d

File tree

1 file changed

+301
-5
lines changed

1 file changed

+301
-5
lines changed

association.go

Lines changed: 301 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,21 @@ type Association struct {
228228
tReconfig *rtxTimer
229229
ackTimer *ackTimer
230230

231+
// RACK / TLP state
232+
rackEnabled bool
233+
rackReoWnd time.Duration // dynamic reordering window
234+
rackMinRTT time.Duration // min observed RTT
235+
rackDeliveredTime time.Time // send time of most recently delivered original chunk
236+
rackHighestDeliveredOrigTSN uint32
237+
rackReorderingSeen bool // ever observed reordering for this association
238+
rackKeepInflatedRecoveries int // keep inflated reoWnd for 16 loss recoveries
239+
rackTimerMu sync.Mutex
240+
rackTimer *time.Timer // arms when outstanding but not (yet) overdue
241+
ptoTimerMu sync.Mutex
242+
ptoTimer *time.Timer // Tail Loss Probe timer
243+
rackWCDelAck time.Duration // 200ms default
244+
rackReoWndFloor time.Duration
245+
231246
// Chunks stored for retransmission
232247
storedInit *chunkInit
233248
storedCookieEcho *chunkCookieEcho
@@ -283,6 +298,13 @@ type Config struct {
283298
FastRtxWnd uint32
284299
// Step of congestion window increase at Congestion Avoidance
285300
CwndCAStep uint32
301+
302+
// RACK loss detection: DISABLED by default (although it should probably be enabled by default..?)
303+
EnableRACK bool
304+
// Optional: cap the minimum reordering window: 0 = use quarter-RTT
305+
RACKReoWndFloor time.Duration
306+
// Optional: receiver worst-case delayed-ACK for PTO when only one packet is in flight
307+
RACKWCDelAck time.Duration
286308
}
287309

288310
// Server accepts a SCTP stream over a conn.
@@ -392,6 +414,18 @@ func createAssociation(config Config) *Association {
392414
writeNotify: make(chan struct{}, 1),
393415
}
394416

417+
// RACK defaults
418+
assoc.rackEnabled = config.EnableRACK
419+
if assoc.rackEnabled {
420+
assoc.rackWCDelAck = config.RACKWCDelAck
421+
if assoc.rackWCDelAck == 0 {
422+
assoc.rackWCDelAck = 200 * time.Millisecond // WCDelAckT, RACK for SCTP section 2C
423+
}
424+
425+
assoc.rackReoWndFloor = config.RACKReoWndFloor // optional floor; usually 0
426+
assoc.rackKeepInflatedRecoveries = 0
427+
}
428+
395429
if assoc.name == "" {
396430
assoc.name = fmt.Sprintf("%p", assoc)
397431
}
@@ -592,6 +626,8 @@ func (a *Association) closeAllTimers() {
592626
a.t3RTX.close()
593627
a.tReconfig.close()
594628
a.ackTimer.close()
629+
a.stopRackTimer()
630+
a.stopPTOTimer()
595631
}
596632

597633
func (a *Association) readLoop() {
@@ -1667,16 +1703,19 @@ func (a *Association) getOrCreateStream(
16671703
// The caller should hold the lock.
16681704
//
16691705
//nolint:gocognit,cyclop
1670-
func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck) (map[uint16]int, uint32, error) {
1706+
func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck) (map[uint16]int, uint32, time.Time, uint32, bool, error) {
16711707
bytesAckedPerStream := map[uint16]int{}
1708+
var newestDeliveredSendTime time.Time // send time of most recently delivered original chunk
1709+
var newestDeliveredOrigTSN uint32
1710+
var deliveredFound bool
16721711

16731712
// New ack point, so pop all ACKed packets from inflightQueue
16741713
// We add 1 because the "currentAckPoint" has already been popped from the inflight queue
16751714
// For the first SACK we take care of this by setting the ackpoint to cumAck - 1
16761715
for i := a.cumulativeTSNAckPoint + 1; sna32LTE(i, selectiveAckChunk.cumulativeTSNAck); i++ {
16771716
chunkPayload, ok := a.inflightQueue.pop(i)
16781717
if !ok {
1679-
return nil, 0, fmt.Errorf("%w: %v", ErrInflightQueueTSNPop, i)
1718+
return nil, 0, time.Time{}, 0, false, fmt.Errorf("%w: %v", ErrInflightQueueTSNPop, i)
16801719
}
16811720

16821721
if !chunkPayload.acked {
@@ -1713,6 +1752,20 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck)
17131752
rtt := time.Since(chunkPayload.since).Seconds() * 1000.0
17141753
srtt := a.rtoMgr.setNewRTT(rtt)
17151754
a.srtt.Store(srtt)
1755+
1756+
// RACK (NOT RFC 4960): track minRTT and latest delivered *original* send time
1757+
if a.rackEnabled {
1758+
if a.rackMinRTT == 0 || time.Duration(rtt*1e6) < a.rackMinRTT {
1759+
a.rackMinRTT = time.Duration(rtt * 1e6)
1760+
}
1761+
1762+
if chunkPayload.since.After(newestDeliveredSendTime) {
1763+
newestDeliveredSendTime = chunkPayload.since
1764+
newestDeliveredOrigTSN = chunkPayload.tsn
1765+
deliveredFound = true
1766+
}
1767+
}
1768+
17161769
a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f",
17171770
a.name, rtt, srtt, a.rtoMgr.getRTO())
17181771
}
@@ -1732,7 +1785,7 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck)
17321785
tsn := selectiveAckChunk.cumulativeTSNAck + uint32(i)
17331786
chunkPayload, ok := a.inflightQueue.get(tsn)
17341787
if !ok {
1735-
return nil, 0, fmt.Errorf("%w: %v", ErrTSNRequestNotExist, tsn)
1788+
return nil, 0, time.Time{}, 0, false, fmt.Errorf("%w: %v", ErrTSNRequestNotExist, tsn)
17361789
}
17371790

17381791
if !chunkPayload.acked {
@@ -1752,6 +1805,20 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck)
17521805
rtt := time.Since(chunkPayload.since).Seconds() * 1000.0
17531806
srtt := a.rtoMgr.setNewRTT(rtt)
17541807
a.srtt.Store(srtt)
1808+
1809+
// RACK
1810+
if a.rackEnabled {
1811+
if a.rackMinRTT == 0 || time.Duration(rtt*1e6) < a.rackMinRTT {
1812+
a.rackMinRTT = time.Duration(rtt * 1e6)
1813+
}
1814+
1815+
if chunkPayload.since.After(newestDeliveredSendTime) {
1816+
newestDeliveredSendTime = chunkPayload.since
1817+
newestDeliveredOrigTSN = chunkPayload.tsn
1818+
deliveredFound = true
1819+
}
1820+
}
1821+
17551822
a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f",
17561823
a.name, rtt, srtt, a.rtoMgr.getRTO())
17571824
}
@@ -1763,7 +1830,7 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck)
17631830
}
17641831
}
17651832

1766-
return bytesAckedPerStream, htna, nil
1833+
return bytesAckedPerStream, htna, newestDeliveredSendTime, newestDeliveredOrigTSN, deliveredFound, nil
17671834
}
17681835

17691836
// The caller should hold the lock.
@@ -1926,7 +1993,7 @@ func (a *Association) handleSack(selectiveAckChunk *chunkSelectiveAck) error {
19261993
}
19271994

19281995
// Process selective ack
1929-
bytesAckedPerStream, htna, err := a.processSelectiveAck(selectiveAckChunk)
1996+
bytesAckedPerStream, htna, newestDeliveredSendTime, newestDeliveredOrigTSN, deliveredFound, err := a.processSelectiveAck(selectiveAckChunk)
19301997
if err != nil {
19311998
return err
19321999
}
@@ -2005,6 +2072,11 @@ func (a *Association) handleSack(selectiveAckChunk *chunkSelectiveAck) error {
20052072

20062073
a.postprocessSack(state, cumTSNAckPointAdvanced)
20072074

2075+
// RACK
2076+
if a.rackEnabled {
2077+
a.onRackAfterSACK(deliveredFound, newestDeliveredSendTime, newestDeliveredOrigTSN, selectiveAckChunk)
2078+
}
2079+
20082080
return nil
20092081
}
20102082

@@ -2949,3 +3021,227 @@ func (a *Association) completeHandshake(handshakeErr error) bool {
29493021

29503022
return false
29513023
}
3024+
3025+
func (a *Association) startRackTimer(d time.Duration) {
3026+
if !a.rackEnabled || d <= 0 {
3027+
return
3028+
}
3029+
3030+
a.rackTimerMu.Lock()
3031+
defer a.rackTimerMu.Unlock()
3032+
3033+
if a.rackTimer != nil {
3034+
a.rackTimer.Stop()
3035+
}
3036+
3037+
a.rackTimer = time.AfterFunc(d, a.onRackTimeout)
3038+
}
3039+
3040+
func (a *Association) stopRackTimer() {
3041+
a.rackTimerMu.Lock()
3042+
3043+
if a.rackTimer != nil {
3044+
a.rackTimer.Stop()
3045+
}
3046+
3047+
a.rackTimerMu.Unlock()
3048+
}
3049+
3050+
func (a *Association) startPTOTimer(d time.Duration) {
3051+
if !a.rackEnabled || d <= 0 {
3052+
return
3053+
}
3054+
3055+
a.ptoTimerMu.Lock()
3056+
defer a.ptoTimerMu.Unlock()
3057+
3058+
if a.ptoTimer != nil {
3059+
a.ptoTimer.Stop()
3060+
}
3061+
3062+
a.ptoTimer = time.AfterFunc(d, a.onPTOTimer)
3063+
}
3064+
3065+
func (a *Association) stopPTOTimer() {
3066+
a.ptoTimerMu.Lock()
3067+
3068+
if a.ptoTimer != nil {
3069+
a.ptoTimer.Stop()
3070+
}
3071+
3072+
a.ptoTimerMu.Unlock()
3073+
}
3074+
3075+
// onRackAfterSACK implements the RACK logic (RACK for SCTP section 2A/B, section 3) and TLP scheduling (section 2C).
3076+
func (a *Association) onRackAfterSACK(deliveredFound bool, newestDeliveredSendTime time.Time, newestDeliveredOrigTSN uint32, sack *chunkSelectiveAck) {
3077+
now := time.Now()
3078+
3079+
// 1) Update highest delivered original TSN for reordering detection (section 2B)
3080+
if deliveredFound {
3081+
if sna32LT(a.rackHighestDeliveredOrigTSN, newestDeliveredOrigTSN) {
3082+
a.rackHighestDeliveredOrigTSN = newestDeliveredOrigTSN
3083+
} else {
3084+
// subsequent ACK acknowledges an original TSN below the recorded high-watermark ⇒ reordering observed
3085+
a.rackReorderingSeen = true
3086+
}
3087+
3088+
if newestDeliveredSendTime.After(a.rackDeliveredTime) {
3089+
a.rackDeliveredTime = newestDeliveredSendTime
3090+
}
3091+
}
3092+
3093+
// 2) Maintain ReoWND (RACK for SCTP section 2B)
3094+
if a.rackMinRTT == 0 {
3095+
// no RTT signal yet; leave as zero until we have an RTT
3096+
} else {
3097+
base := max(a.rackMinRTT/4, a.rackReoWndFloor)
3098+
// if we have never seen reordering for this connection, set to zero *during loss recovery* (RACK for SCTP section 2B)
3099+
// we approximate “during loss recovery” with inFastRecovery or T3-Rtx pending. Outside recovery keep base.
3100+
if !a.rackReorderingSeen && (a.inFastRecovery || a.t3RTX.isRunning()) {
3101+
a.rackReoWnd = 0
3102+
} else if a.rackReoWnd == 0 {
3103+
a.rackReoWnd = base
3104+
}
3105+
}
3106+
3107+
// DSACK-style inflation using SCTP duplicate TSNs (RACK for SCTP section 3 noting SCTP natively reports duplicates + RACK for SCTP section 2B policy)
3108+
if len(sack.duplicateTSN) > 0 && a.rackMinRTT > 0 {
3109+
a.rackReoWnd += max(a.rackMinRTT/4, a.rackReoWndFloor)
3110+
// keep inflated for 16 loss recoveries before reset
3111+
a.rackKeepInflatedRecoveries = 16
3112+
a.log.Tracef("[%s] RACK: DSACK/dupTSN seen, inflate reoWnd to %v", a.name, a.rackReoWnd)
3113+
}
3114+
3115+
// decrement the keep inflated counter when we leave recovery
3116+
if !a.inFastRecovery && a.rackKeepInflatedRecoveries > 0 {
3117+
a.rackKeepInflatedRecoveries--
3118+
if a.rackKeepInflatedRecoveries == 0 && a.rackMinRTT > 0 {
3119+
a.rackReoWnd = a.rackMinRTT / 4
3120+
}
3121+
}
3122+
3123+
// 3) Loss marking on ACK: any outstanding chunk whose (send_time + reoWnd) < newestDeliveredSendTime is lost (RACK for SCTP section 2A)
3124+
if !a.rackDeliveredTime.IsZero() {
3125+
for i := a.cumulativeTSNAckPoint + 1; ; i++ {
3126+
chunk, ok := a.inflightQueue.get(i)
3127+
if !ok {
3128+
break
3129+
}
3130+
3131+
if chunk.acked || chunk.abandoned() || chunk.retransmit {
3132+
continue
3133+
}
3134+
3135+
// Only consider original transmissions
3136+
if chunk.nSent > 1 {
3137+
continue
3138+
}
3139+
3140+
if chunk.since.Add(a.rackReoWnd).Before(a.rackDeliveredTime) {
3141+
chunk.retransmit = true
3142+
a.log.Tracef("[%s] RACK: mark lost tsn=%d (sent=%v, delivered=%v, reoWnd=%v)",
3143+
a.name, chunk.tsn, chunk.since, a.rackDeliveredTime, a.rackReoWnd)
3144+
}
3145+
}
3146+
// if we marked anything, kick the writer
3147+
a.awakeWriteLoop()
3148+
}
3149+
3150+
// 4) Arm the RACK timer if there are still outstanding but not-yet-overdue chunks (RACK for SCTP section 2A)
3151+
if a.inflightQueue.size() > 0 && !a.rackDeliveredTime.IsZero() {
3152+
// RackRTT = RTT of the most recently delivered packet
3153+
rackRTT := max(now.Sub(a.rackDeliveredTime), 0)
3154+
a.startRackTimer(rackRTT + a.rackReoWnd) // RACK for SCTP section 2A
3155+
} else {
3156+
a.stopRackTimer()
3157+
}
3158+
3159+
// 5) Re/schedule Tail Loss Probe (PTO) (RACK for SCTP section 2C)
3160+
// Triggered when new data is sent or cum-ack advances; we approximate by scheduling on every SACK that advanced
3161+
var pto time.Duration
3162+
srttMs := a.SRTT()
3163+
if srttMs > 0 {
3164+
srtt := time.Duration(srttMs * 1e6)
3165+
extra := 2 * time.Millisecond
3166+
3167+
if a.inflightQueue.size() == 1 {
3168+
extra = a.rackWCDelAck // 200ms for single outstanding, else 2ms
3169+
}
3170+
3171+
pto = 2*srtt + extra
3172+
} else {
3173+
pto = time.Second // no RTT yet
3174+
}
3175+
3176+
a.startPTOTimer(pto)
3177+
}
3178+
3179+
// onRackTimeout is fired to avoid waiting for the next ACK.
3180+
func (a *Association) onRackTimeout() {
3181+
a.lock.Lock()
3182+
defer a.lock.Unlock()
3183+
3184+
if !a.rackEnabled || a.rackDeliveredTime.IsZero() {
3185+
return
3186+
}
3187+
3188+
marked := false
3189+
for i := a.cumulativeTSNAckPoint + 1; ; i++ {
3190+
chunk, ok := a.inflightQueue.get(i)
3191+
3192+
if !ok {
3193+
break
3194+
}
3195+
3196+
if chunk.acked || chunk.abandoned() || chunk.retransmit || chunk.nSent > 1 {
3197+
continue
3198+
}
3199+
3200+
if chunk.since.Add(a.rackReoWnd).Before(a.rackDeliveredTime) {
3201+
chunk.retransmit = true
3202+
marked = true
3203+
a.log.Tracef("[%s] RACK timer: mark lost tsn=%d", a.name, chunk.tsn)
3204+
}
3205+
}
3206+
3207+
if marked {
3208+
a.awakeWriteLoop()
3209+
}
3210+
}
3211+
3212+
func (a *Association) onPTOTimer() {
3213+
a.lock.Lock()
3214+
defer a.lock.Unlock()
3215+
3216+
if !a.rackEnabled {
3217+
return
3218+
}
3219+
3220+
// Prefer unsent data if any
3221+
if a.pendingQueue.size() > 0 {
3222+
a.awakeWriteLoop()
3223+
3224+
return
3225+
}
3226+
3227+
// Otherwise retransmit the most recently sent in-flight DATA (highest TSN not acked/abandoned)
3228+
var latest *chunkPayloadData
3229+
for i := uint32(0); ; i++ {
3230+
c, ok := a.inflightQueue.get(a.cumulativeTSNAckPoint + i + 1)
3231+
if !ok {
3232+
break
3233+
}
3234+
3235+
if c.acked || c.abandoned() {
3236+
continue
3237+
}
3238+
3239+
latest = c
3240+
}
3241+
3242+
if latest != nil && !latest.retransmit {
3243+
latest.retransmit = true
3244+
a.log.Tracef("[%s] PTO fired: probe tsn=%d", a.name, latest.tsn)
3245+
a.awakeWriteLoop()
3246+
}
3247+
}

0 commit comments

Comments
 (0)