Skip to content

Commit 3709326

Browse files
committed
Rough implementation of RACK
1 parent a04388e commit 3709326

File tree

1 file changed

+294
-5
lines changed

1 file changed

+294
-5
lines changed

association.go

Lines changed: 294 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,21 @@ type Association struct {
228228
tReconfig *rtxTimer
229229
ackTimer *ackTimer
230230

231+
// RACK / TLP state
232+
rackEnabled bool
233+
rackReoWnd time.Duration // dynamic reordering window
234+
rackMinRTT time.Duration // min observed RTT
235+
rackDeliveredTime time.Time // send time of most recently delivered original chunk
236+
rackHighestDeliveredOrigTSN uint32
237+
rackReorderingSeen bool // ever observed reordering for this association
238+
rackKeepInflatedRecoveries int // keep inflated reoWnd for 16 loss recoveries
239+
rackTimerMu sync.Mutex
240+
rackTimer *time.Timer // arms when outstanding but not (yet) overdue
241+
ptoTimerMu sync.Mutex
242+
ptoTimer *time.Timer // Tail Loss Probe timer
243+
rackWCDelAck time.Duration // 200ms default
244+
rackReoWndFloor time.Duration
245+
231246
// Chunks stored for retransmission
232247
storedInit *chunkInit
233248
storedCookieEcho *chunkCookieEcho
@@ -283,6 +298,13 @@ type Config struct {
283298
FastRtxWnd uint32
284299
// Step of congestion window increase at Congestion Avoidance
285300
CwndCAStep uint32
301+
302+
// RACK loss detection: DISABLED by default (although it should probably be enabled by default..?)
303+
EnableRACK bool
304+
// Optional: cap the minimum reordering window: 0 = use quarter-RTT
305+
RACKReoWndFloor time.Duration
306+
// Optional: receiver worst-case delayed-ACK for PTO when only one packet is in flight
307+
RACKWCDelAck time.Duration
286308
}
287309

288310
// Server accepts a SCTP stream over a conn.
@@ -392,6 +414,18 @@ func createAssociation(config Config) *Association {
392414
writeNotify: make(chan struct{}, 1),
393415
}
394416

417+
// RACK defaults
418+
assoc.rackEnabled = config.EnableRACK
419+
if assoc.rackEnabled {
420+
assoc.rackWCDelAck = config.RACKWCDelAck
421+
if assoc.rackWCDelAck == 0 {
422+
assoc.rackWCDelAck = 200 * time.Millisecond // WCDelAckT, RACK for SCTP section 2C
423+
}
424+
425+
assoc.rackReoWndFloor = config.RACKReoWndFloor // optional floor; usually 0
426+
assoc.rackKeepInflatedRecoveries = 0
427+
}
428+
395429
if assoc.name == "" {
396430
assoc.name = fmt.Sprintf("%p", assoc)
397431
}
@@ -592,6 +626,8 @@ func (a *Association) closeAllTimers() {
592626
a.t3RTX.close()
593627
a.tReconfig.close()
594628
a.ackTimer.close()
629+
a.stopRackTimer()
630+
a.stopPTOTimer()
595631
}
596632

597633
func (a *Association) readLoop() {
@@ -1667,16 +1703,19 @@ func (a *Association) getOrCreateStream(
16671703
// The caller should hold the lock.
16681704
//
16691705
//nolint:gocognit,cyclop
1670-
func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck) (map[uint16]int, uint32, error) {
1706+
func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck) (map[uint16]int, uint32, time.Time, uint32, bool, error) {
16711707
bytesAckedPerStream := map[uint16]int{}
1708+
var newestDeliveredSendTime time.Time // send time of most recently delivered original chunk
1709+
var newestDeliveredOrigTSN uint32
1710+
var deliveredFound bool
16721711

16731712
// New ack point, so pop all ACKed packets from inflightQueue
16741713
// We add 1 because the "currentAckPoint" has already been popped from the inflight queue
16751714
// For the first SACK we take care of this by setting the ackpoint to cumAck - 1
16761715
for i := a.cumulativeTSNAckPoint + 1; sna32LTE(i, selectiveAckChunk.cumulativeTSNAck); i++ {
16771716
chunkPayload, ok := a.inflightQueue.pop(i)
16781717
if !ok {
1679-
return nil, 0, fmt.Errorf("%w: %v", ErrInflightQueueTSNPop, i)
1718+
return nil, 0, time.Time{}, 0, false, fmt.Errorf("%w: %v", ErrInflightQueueTSNPop, i)
16801719
}
16811720

16821721
if !chunkPayload.acked {
@@ -1713,6 +1752,18 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck)
17131752
rtt := time.Since(chunkPayload.since).Seconds() * 1000.0
17141753
srtt := a.rtoMgr.setNewRTT(rtt)
17151754
a.srtt.Store(srtt)
1755+
1756+
// RACK (NOT RFC 4960): track minRTT and latest delivered *original* send time
1757+
if a.rackEnabled {
1758+
if a.rackMinRTT == 0 || time.Duration(rtt*1e6) < a.rackMinRTT {
1759+
a.rackMinRTT = time.Duration(rtt * 1e6)
1760+
}
1761+
if chunkPayload.since.After(newestDeliveredSendTime) {
1762+
newestDeliveredSendTime = chunkPayload.since
1763+
newestDeliveredOrigTSN = chunkPayload.tsn
1764+
deliveredFound = true
1765+
}
1766+
}
17161767
a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f",
17171768
a.name, rtt, srtt, a.rtoMgr.getRTO())
17181769
}
@@ -1732,7 +1783,7 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck)
17321783
tsn := selectiveAckChunk.cumulativeTSNAck + uint32(i)
17331784
chunkPayload, ok := a.inflightQueue.get(tsn)
17341785
if !ok {
1735-
return nil, 0, fmt.Errorf("%w: %v", ErrTSNRequestNotExist, tsn)
1786+
return nil, 0, time.Time{}, 0, false, fmt.Errorf("%w: %v", ErrTSNRequestNotExist, tsn)
17361787
}
17371788

17381789
if !chunkPayload.acked {
@@ -1752,6 +1803,16 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck)
17521803
rtt := time.Since(chunkPayload.since).Seconds() * 1000.0
17531804
srtt := a.rtoMgr.setNewRTT(rtt)
17541805
a.srtt.Store(srtt)
1806+
if a.rackEnabled {
1807+
if a.rackMinRTT == 0 || time.Duration(rtt*1e6) < a.rackMinRTT {
1808+
a.rackMinRTT = time.Duration(rtt * 1e6)
1809+
}
1810+
if chunkPayload.since.After(newestDeliveredSendTime) {
1811+
newestDeliveredSendTime = chunkPayload.since
1812+
newestDeliveredOrigTSN = chunkPayload.tsn
1813+
deliveredFound = true
1814+
}
1815+
}
17551816
a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f",
17561817
a.name, rtt, srtt, a.rtoMgr.getRTO())
17571818
}
@@ -1763,7 +1824,7 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck)
17631824
}
17641825
}
17651826

1766-
return bytesAckedPerStream, htna, nil
1827+
return bytesAckedPerStream, htna, newestDeliveredSendTime, newestDeliveredOrigTSN, deliveredFound, nil
17671828
}
17681829

17691830
// The caller should hold the lock.
@@ -1926,7 +1987,7 @@ func (a *Association) handleSack(selectiveAckChunk *chunkSelectiveAck) error {
19261987
}
19271988

19281989
// Process selective ack
1929-
bytesAckedPerStream, htna, err := a.processSelectiveAck(selectiveAckChunk)
1990+
bytesAckedPerStream, htna, newestDeliveredSendTime, newestDeliveredOrigTSN, deliveredFound, err := a.processSelectiveAck(selectiveAckChunk)
19301991
if err != nil {
19311992
return err
19321993
}
@@ -2005,6 +2066,11 @@ func (a *Association) handleSack(selectiveAckChunk *chunkSelectiveAck) error {
20052066

20062067
a.postprocessSack(state, cumTSNAckPointAdvanced)
20072068

2069+
// RACK
2070+
if a.rackEnabled {
2071+
a.onRackAfterSACK(deliveredFound, newestDeliveredSendTime, newestDeliveredOrigTSN, selectiveAckChunk)
2072+
}
2073+
20082074
return nil
20092075
}
20102076

@@ -2949,3 +3015,226 @@ func (a *Association) completeHandshake(handshakeErr error) bool {
29493015

29503016
return false
29513017
}
3018+
3019+
func (a *Association) startRackTimer(d time.Duration) {
3020+
if !a.rackEnabled || d <= 0 {
3021+
return
3022+
}
3023+
3024+
a.rackTimerMu.Lock()
3025+
defer a.rackTimerMu.Unlock()
3026+
3027+
if a.rackTimer != nil {
3028+
a.rackTimer.Stop()
3029+
}
3030+
3031+
a.rackTimer = time.AfterFunc(d, a.onRackTimeout)
3032+
}
3033+
3034+
func (a *Association) stopRackTimer() {
3035+
a.rackTimerMu.Lock()
3036+
3037+
if a.rackTimer != nil {
3038+
a.rackTimer.Stop()
3039+
}
3040+
3041+
a.rackTimerMu.Unlock()
3042+
}
3043+
3044+
func (a *Association) startPTOTimer(d time.Duration) {
3045+
if !a.rackEnabled || d <= 0 {
3046+
return
3047+
}
3048+
3049+
a.ptoTimerMu.Lock()
3050+
defer a.ptoTimerMu.Unlock()
3051+
3052+
if a.ptoTimer != nil {
3053+
a.ptoTimer.Stop()
3054+
}
3055+
3056+
a.ptoTimer = time.AfterFunc(d, a.onPTOTimer)
3057+
}
3058+
3059+
func (a *Association) stopPTOTimer() {
3060+
a.ptoTimerMu.Lock()
3061+
3062+
if a.ptoTimer != nil {
3063+
a.ptoTimer.Stop()
3064+
}
3065+
3066+
a.ptoTimerMu.Unlock()
3067+
}
3068+
3069+
// onRackAfterSACK implements the RACK logic (RACK for SCTP section 2A/B, section 3) and TLP scheduling (section 2C).
3070+
func (a *Association) onRackAfterSACK(deliveredFound bool, newestDeliveredSendTime time.Time, newestDeliveredOrigTSN uint32, sack *chunkSelectiveAck) {
3071+
now := time.Now()
3072+
3073+
// 1) Update highest delivered original TSN for reordering detection (section 2B)
3074+
if deliveredFound {
3075+
if sna32LT(a.rackHighestDeliveredOrigTSN, newestDeliveredOrigTSN) {
3076+
a.rackHighestDeliveredOrigTSN = newestDeliveredOrigTSN
3077+
} else {
3078+
// subsequent ACK acknowledges an original TSN below the recorded high-watermark ⇒ reordering observed
3079+
a.rackReorderingSeen = true
3080+
}
3081+
3082+
if newestDeliveredSendTime.After(a.rackDeliveredTime) {
3083+
a.rackDeliveredTime = newestDeliveredSendTime
3084+
}
3085+
}
3086+
3087+
// 2) Maintain ReoWND (RACK for SCTP section 2B)
3088+
if a.rackMinRTT == 0 {
3089+
// no RTT signal yet; leave as zero until we have an RTT
3090+
} else {
3091+
base := max(a.rackMinRTT/4, a.rackReoWndFloor)
3092+
// if we have never seen reordering for this connection, set to zero *during loss recovery* (RACK for SCTP section 2B)
3093+
// we approximate “during loss recovery” with inFastRecovery or T3-Rtx pending. Outside recovery keep base.
3094+
if !a.rackReorderingSeen && (a.inFastRecovery || a.t3RTX.isRunning()) {
3095+
a.rackReoWnd = 0
3096+
} else if a.rackReoWnd == 0 {
3097+
a.rackReoWnd = base
3098+
}
3099+
}
3100+
3101+
// DSACK-style inflation using SCTP duplicate TSNs (RACK for SCTP section 3 noting SCTP natively reports duplicates + RACK for SCTP section 2B policy)
3102+
if len(sack.duplicateTSN) > 0 && a.rackMinRTT > 0 {
3103+
a.rackReoWnd += max(a.rackMinRTT/4, a.rackReoWndFloor)
3104+
// keep inflated for 16 loss recoveries before reset
3105+
a.rackKeepInflatedRecoveries = 16
3106+
a.log.Tracef("[%s] RACK: DSACK/dupTSN seen, inflate reoWnd to %v", a.name, a.rackReoWnd)
3107+
}
3108+
3109+
// decrement the keep inflated counter when we leave recovery
3110+
if !a.inFastRecovery && a.rackKeepInflatedRecoveries > 0 {
3111+
a.rackKeepInflatedRecoveries--
3112+
if a.rackKeepInflatedRecoveries == 0 && a.rackMinRTT > 0 {
3113+
a.rackReoWnd = a.rackMinRTT / 4
3114+
}
3115+
}
3116+
3117+
// 3) Loss marking on ACK: any outstanding chunk whose (send_time + reoWnd) < newestDeliveredSendTime is lost (RACK for SCTP section 2A)
3118+
if !a.rackDeliveredTime.IsZero() {
3119+
for i := a.cumulativeTSNAckPoint + 1; ; i++ {
3120+
c, ok := a.inflightQueue.get(i)
3121+
if !ok {
3122+
break
3123+
}
3124+
3125+
if c.acked || c.abandoned() || c.retransmit {
3126+
continue
3127+
}
3128+
3129+
// Only consider original transmissions
3130+
if c.nSent > 1 {
3131+
continue
3132+
}
3133+
3134+
if c.since.Add(a.rackReoWnd).Before(a.rackDeliveredTime) {
3135+
c.retransmit = true
3136+
a.log.Tracef("[%s] RACK: mark lost tsn=%d (sent=%v, delivered=%v, reoWnd=%v)",
3137+
a.name, c.tsn, c.since, a.rackDeliveredTime, a.rackReoWnd)
3138+
}
3139+
}
3140+
// if we marked anything, kick the writer
3141+
a.awakeWriteLoop()
3142+
}
3143+
3144+
// 4) Arm the RACK timer if there are still outstanding but not-yet-overdue chunks (RACK for SCTP section 2A)
3145+
if a.inflightQueue.size() > 0 && !a.rackDeliveredTime.IsZero() {
3146+
// RackRTT = RTT of the most recently delivered packet
3147+
rackRTT := max(now.Sub(a.rackDeliveredTime), 0)
3148+
a.startRackTimer(rackRTT + a.rackReoWnd) // RACK for SCTP section 2A
3149+
} else {
3150+
a.stopRackTimer()
3151+
}
3152+
3153+
// 5) Re/schedule Tail Loss Probe (PTO) (RACK for SCTP section 2C)
3154+
// Triggered when new data is sent or cum-ack advances; we approximate by scheduling on every SACK that advanced
3155+
var pto time.Duration
3156+
srttMs := a.SRTT()
3157+
if srttMs > 0 {
3158+
srtt := time.Duration(srttMs * 1e6)
3159+
extra := 2 * time.Millisecond
3160+
3161+
if a.inflightQueue.size() == 1 {
3162+
extra = a.rackWCDelAck // 200ms for single outstanding, else 2ms
3163+
}
3164+
3165+
pto = 2*srtt + extra
3166+
} else {
3167+
pto = time.Second // no RTT yet
3168+
}
3169+
3170+
a.startPTOTimer(pto)
3171+
}
3172+
3173+
// onRackTimeout is fired to avoid waiting for the next ACK.
3174+
func (a *Association) onRackTimeout() {
3175+
a.lock.Lock()
3176+
defer a.lock.Unlock()
3177+
3178+
if !a.rackEnabled || a.rackDeliveredTime.IsZero() {
3179+
return
3180+
}
3181+
3182+
marked := false
3183+
for i := a.cumulativeTSNAckPoint + 1; ; i++ {
3184+
c, ok := a.inflightQueue.get(i)
3185+
3186+
if !ok {
3187+
break
3188+
}
3189+
3190+
if c.acked || c.abandoned() || c.retransmit || c.nSent > 1 {
3191+
continue
3192+
}
3193+
3194+
if c.since.Add(a.rackReoWnd).Before(a.rackDeliveredTime) {
3195+
c.retransmit = true
3196+
marked = true
3197+
a.log.Tracef("[%s] RACK timer: mark lost tsn=%d", a.name, c.tsn)
3198+
}
3199+
}
3200+
3201+
if marked {
3202+
a.awakeWriteLoop()
3203+
}
3204+
}
3205+
3206+
func (a *Association) onPTOTimer() {
3207+
a.lock.Lock()
3208+
defer a.lock.Unlock()
3209+
3210+
if !a.rackEnabled {
3211+
return
3212+
}
3213+
3214+
// Prefer unsent data if any
3215+
if a.pendingQueue.size() > 0 {
3216+
a.awakeWriteLoop()
3217+
return
3218+
}
3219+
3220+
// Otherwise retransmit the most recently sent in-flight DATA (highest TSN not acked/abandoned)
3221+
var latest *chunkPayloadData
3222+
for i := uint32(0); ; i++ {
3223+
c, ok := a.inflightQueue.get(a.cumulativeTSNAckPoint + i + 1)
3224+
if !ok {
3225+
break
3226+
}
3227+
3228+
if c.acked || c.abandoned() {
3229+
continue
3230+
}
3231+
3232+
latest = c
3233+
}
3234+
3235+
if latest != nil && !latest.retransmit {
3236+
latest.retransmit = true
3237+
a.log.Tracef("[%s] PTO fired: probe tsn=%d", a.name, latest.tsn)
3238+
a.awakeWriteLoop()
3239+
}
3240+
}

0 commit comments

Comments
 (0)