Skip to content

Commit eb3bd88

Browse files
committed
Clean up smells
1 parent ba92453 commit eb3bd88

File tree

2 files changed

+245
-8
lines changed

2 files changed

+245
-8
lines changed

association.go

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1708,7 +1708,14 @@ func (a *Association) getOrCreateStream(
17081708
// The caller should hold the lock.
17091709
//
17101710
//nolint:gocognit,cyclop
1711-
func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck) (map[uint16]int, uint32, time.Time, uint32, bool, error) {
1711+
func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck) (
1712+
map[uint16]int,
1713+
uint32,
1714+
time.Time,
1715+
uint32,
1716+
bool,
1717+
error,
1718+
) {
17121719
bytesAckedPerStream := map[uint16]int{}
17131720
var newestDeliveredSendTime time.Time // send time of most recently delivered original chunk
17141721
var newestDeliveredOrigTSN uint32
@@ -1723,7 +1730,7 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck)
17231730
return nil, 0, time.Time{}, 0, false, fmt.Errorf("%w: %v", ErrInflightQueueTSNPop, i)
17241731
}
17251732

1726-
if !chunkPayload.acked {
1733+
if !chunkPayload.acked { //nolint:nestif
17271734
// RFC 4960 sec 6.3.2. Retransmission Timer Rules
17281735
// R3) Whenever a SACK is received that acknowledges the DATA chunk
17291736
// with the earliest outstanding TSN for that address, restart the
@@ -1793,7 +1800,7 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck)
17931800
return nil, 0, time.Time{}, 0, false, fmt.Errorf("%w: %v", ErrTSNRequestNotExist, tsn)
17941801
}
17951802

1796-
if !chunkPayload.acked {
1803+
if !chunkPayload.acked { //nolint:nestif
17971804
nBytesAcked := a.inflightQueue.markAsAcked(tsn)
17981805

17991806
// Sum the number of bytes acknowledged per stream
@@ -1998,7 +2005,9 @@ func (a *Association) handleSack(selectiveAckChunk *chunkSelectiveAck) error {
19982005
}
19992006

20002007
// Process selective ack
2001-
bytesAckedPerStream, htna, newestDeliveredSendTime, newestDeliveredOrigTSN, deliveredFound, err := a.processSelectiveAck(selectiveAckChunk)
2008+
bytesAckedPerStream, htna,
2009+
newestDeliveredSendTime, newestDeliveredOrigTSN,
2010+
deliveredFound, err := a.processSelectiveAck(selectiveAckChunk)
20022011
if err != nil {
20032012
return err
20042013
}
@@ -3078,7 +3087,14 @@ func (a *Association) stopPTOTimer() {
30783087
}
30793088

30803089
// onRackAfterSACK implements the RACK logic (RACK for SCTP section 2A/B, section 3) and TLP scheduling (section 2C).
3081-
func (a *Association) onRackAfterSACK(deliveredFound bool, newestDeliveredSendTime time.Time, newestDeliveredOrigTSN uint32, sack *chunkSelectiveAck) {
3090+
//
3091+
//nolint:gocognit,cyclop
3092+
func (a *Association) onRackAfterSACK(
3093+
deliveredFound bool,
3094+
newestDeliveredSendTime time.Time,
3095+
newestDeliveredOrigTSN uint32,
3096+
sack *chunkSelectiveAck,
3097+
) {
30823098
now := time.Now()
30833099

30843100
// 1) Update highest delivered original TSN for reordering detection (section 2B)
@@ -3109,7 +3125,8 @@ func (a *Association) onRackAfterSACK(deliveredFound bool, newestDeliveredSendTi
31093125
}
31103126
}
31113127

3112-
// DSACK-style inflation using SCTP duplicate TSNs (RACK for SCTP section 3 noting SCTP natively reports duplicates + RACK for SCTP section 2B policy)
3128+
// DSACK-style inflation using SCTP duplicate TSNs (RACK for SCTP section 3 noting SCTP
3129+
// natively reports duplicates + RACK for SCTP section 2B policy)
31133130
if len(sack.duplicateTSN) > 0 && a.rackMinRTT > 0 {
31143131
a.rackReoWnd += max(a.rackMinRTT/4, a.rackReoWndFloor)
31153132
// keep inflated for 16 loss recoveries before reset
@@ -3125,7 +3142,8 @@ func (a *Association) onRackAfterSACK(deliveredFound bool, newestDeliveredSendTi
31253142
}
31263143
}
31273144

3128-
// 3) Loss marking on ACK: any outstanding chunk whose (send_time + reoWnd) < newestDeliveredSendTime is lost (RACK for SCTP section 2A)
3145+
// 3) Loss marking on ACK: any outstanding chunk whose (send_time + reoWnd) < newestDeliveredSendTime
3146+
// is lost (RACK for SCTP section 2A)
31293147
if !a.rackDeliveredTime.IsZero() {
31303148
for i := a.cumulativeTSNAckPoint + 1; ; i++ {
31313149
chunk, ok := a.inflightQueue.get(i)
@@ -3214,7 +3232,7 @@ func (a *Association) schedulePTOAfterSendLocked() {
32143232
}
32153233

32163234
// onRackTimeout is fired to avoid waiting for the next ACK.
3217-
func (a *Association) onRackTimeout() {
3235+
func (a *Association) onRackTimeout() { //nolint:cyclop
32183236
a.lock.Lock()
32193237
defer a.lock.Unlock()
32203238

association_test.go

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3796,3 +3796,222 @@ func TestConfigMTU(t *testing.T) {
37963796
require.NoError(t, a2.Close())
37973797
require.NoError(t, conn2.Close())
37983798
}
3799+
3800+
// makes an Association without starting read/write loops, skips init(), just the minimal state.
3801+
func newRackTestAssoc(t *testing.T) *Association {
3802+
t.Helper()
3803+
3804+
lg := logging.NewDefaultLoggerFactory()
3805+
assoc := createAssociation(Config{
3806+
LoggerFactory: lg,
3807+
})
3808+
3809+
// Put the association into a sane "established" state with fresh queues.
3810+
assoc.setState(established)
3811+
assoc.peerVerificationTag = 1
3812+
assoc.sourcePort = defaultSCTPSrcDstPort
3813+
assoc.destinationPort = defaultSCTPSrcDstPort
3814+
3815+
// Deterministic TSN base.
3816+
assoc.initialTSN = 100
3817+
assoc.myNextTSN = 102 // we'll populate TSN=100,101 manually below
3818+
assoc.cumulativeTSNAckPoint = 99
3819+
assoc.advancedPeerTSNAckPoint = 99
3820+
3821+
// fresh queues
3822+
assoc.inflightQueue = newPayloadQueue()
3823+
assoc.payloadQueue = newReceivePayloadQueue(getMaxTSNOffset(assoc.maxReceiveBufferSize))
3824+
3825+
// RACK defaults for tests
3826+
assoc.rackEnabled = true
3827+
assoc.rackReorderingSeen = false
3828+
assoc.rackReoWndFloor = 0
3829+
3830+
// Have a non-zero SRTT so SRTT-bounding code runs deterministically.
3831+
assoc.srtt.Store(float64(100.0)) // 100 ms
3832+
3833+
return assoc
3834+
}
3835+
3836+
func mkChunk(tsn uint32, since time.Time) *chunkPayloadData {
3837+
return &chunkPayloadData{
3838+
streamIdentifier: 1,
3839+
streamSequenceNumber: 1,
3840+
beginningFragment: true,
3841+
endingFragment: true,
3842+
userData: []byte("x"),
3843+
tsn: tsn,
3844+
since: since,
3845+
nSent: 1, // original transmission
3846+
}
3847+
}
3848+
3849+
func TestRACK_EnabledDefaultAndDisableOption(t *testing.T) {
3850+
// default -> enabled
3851+
a := createAssociation(Config{LoggerFactory: logging.NewDefaultLoggerFactory()})
3852+
assert.True(t, a.rackEnabled, "RACK should be enabled by default")
3853+
3854+
// DisableRACK -> disabled
3855+
b := createAssociation(Config{
3856+
LoggerFactory: logging.NewDefaultLoggerFactory(),
3857+
DisableRACK: true,
3858+
})
3859+
assert.False(t, b.rackEnabled, "RACK should be disabled when DisableRACK is set")
3860+
}
3861+
3862+
func TestRACK_MarkLossOnACK(t *testing.T) {
3863+
assoc := newRackTestAssoc(t)
3864+
3865+
// MinRTT=40ms => base reoWnd = 10ms
3866+
assoc.rackMinRTT = 40 * time.Millisecond
3867+
assoc.rackReoWnd = 0 // let onRackAfterSACK initialize to base
3868+
3869+
now := time.Now()
3870+
// Outstanding: TSN=100 (older), TSN=101 (newer, will be SACKed as gap)
3871+
cA := mkChunk(100, now.Add(-50*time.Millisecond))
3872+
cB := mkChunk(101, now) // most recently delivered send-time
3873+
3874+
assoc.inflightQueue.pushNoCheck(cA)
3875+
assoc.inflightQueue.pushNoCheck(cB)
3876+
3877+
// cumulativeTSNAck=99, gap block [2..2] => 99+2=101.
3878+
sack := &chunkSelectiveAck{
3879+
cumulativeTSNAck: 99,
3880+
advertisedReceiverWindowCredit: 65535,
3881+
gapAckBlocks: []gapAckBlock{{start: 2, end: 2}},
3882+
duplicateTSN: nil,
3883+
}
3884+
3885+
err := assoc.handleSack(sack)
3886+
require.NoError(t, err)
3887+
3888+
// RACK should have marked TSN=100 lost (since + reoWnd < deliveredTime).
3889+
gotA, _ := assoc.inflightQueue.get(100)
3890+
require.NotNil(t, gotA)
3891+
assert.True(t, gotA.retransmit, "RACK should mark older TSN lost on ACK")
3892+
}
3893+
3894+
func TestRACK_TimerMarksLost(t *testing.T) {
3895+
assoc := newRackTestAssoc(t)
3896+
3897+
assoc.rackEnabled = true
3898+
assoc.rackMinRTT = 80 * time.Millisecond
3899+
assoc.rackReoWnd = 20 * time.Millisecond
3900+
assoc.rackDeliveredTime = time.Now() // pretend we just delivered something
3901+
3902+
// One outstanding original transmission far in the past.
3903+
c := mkChunk(100, time.Now().Add(-200*time.Millisecond))
3904+
assoc.inflightQueue.pushNoCheck(c)
3905+
3906+
// Simulate timer.
3907+
assoc.onRackTimeout()
3908+
3909+
got, _ := assoc.inflightQueue.get(100)
3910+
require.NotNil(t, got)
3911+
assert.True(t, got.retransmit, "RACK timer should mark overdue original as lost")
3912+
}
3913+
3914+
func TestRACK_DSACKInflatesAndDecays(t *testing.T) {
3915+
assoc := newRackTestAssoc(t)
3916+
3917+
assoc.rackMinRTT = 100 * time.Millisecond
3918+
assoc.rackReoWnd = 25 * time.Millisecond // base is 25ms; will inflate by +25ms
3919+
assoc.rackKeepInflatedRecoveries = 0
3920+
3921+
// DSACK (duplicate TSN) present -> inflate by max(minRTT/4, floor) and set counter=16
3922+
sack := &chunkSelectiveAck{
3923+
cumulativeTSNAck: 99,
3924+
duplicateTSN: []uint32{123},
3925+
}
3926+
3927+
// Note that we're checking for 15 and 14 instead of 16 and 15 because it immediately
3928+
// decrements when not in fast recovery.
3929+
assoc.onRackAfterSACK(false, time.Time{}, 0, sack)
3930+
assert.Equal(t, 50*time.Millisecond, assoc.rackReoWnd, "reoWnd should inflate on DSACK")
3931+
assert.Equal(t, 15, assoc.rackKeepInflatedRecoveries, "keep-inflated counter should be 15")
3932+
3933+
// When not in fast recovery, the counter decays each pass.
3934+
assoc.inFastRecovery = false
3935+
assoc.onRackAfterSACK(false, time.Time{}, 0, &chunkSelectiveAck{})
3936+
assert.Equal(t, 14, assoc.rackKeepInflatedRecoveries)
3937+
3938+
// Drive counter to zero and ensure reoWnd resets to base (minRTT/4).
3939+
assoc.rackKeepInflatedRecoveries = 1
3940+
assoc.onRackAfterSACK(false, time.Time{}, 0, &chunkSelectiveAck{})
3941+
assert.Equal(t, 0, assoc.rackKeepInflatedRecoveries)
3942+
assert.Equal(t, 25*time.Millisecond, assoc.rackReoWnd, "reoWnd should reset to base after decay")
3943+
}
3944+
3945+
func TestRACK_SuppressReoWndDuringRecovery_NoReorderingSeen(t *testing.T) {
3946+
assoc := newRackTestAssoc(t)
3947+
3948+
assoc.rackMinRTT = 120 * time.Millisecond
3949+
assoc.rackReoWnd = 40 * time.Millisecond
3950+
assoc.rackReorderingSeen = false
3951+
assoc.inFastRecovery = true
3952+
3953+
// During recovery with no reordering observed, reoWnd must go to zero.
3954+
assoc.onRackAfterSACK(false, time.Time{}, 0, &chunkSelectiveAck{})
3955+
assert.Equal(t, time.Duration(0), assoc.rackReoWnd, "reoWnd should be suppressed during recovery w/o reordering")
3956+
3957+
// After recovery ends, if reoWnd==0, it should re-initialize to base.
3958+
assoc.inFastRecovery = false
3959+
assoc.onRackAfterSACK(false, time.Time{}, 0, &chunkSelectiveAck{})
3960+
assert.Equal(t, 30*time.Millisecond, assoc.rackReoWnd, "reoWnd should re-initialize to base (minRTT/4) after recovery")
3961+
}
3962+
3963+
func TestRACK_ReoWndBoundedBySRTT(t *testing.T) {
3964+
a := newRackTestAssoc(t)
3965+
3966+
// Set a very large reoWnd, and a small SRTT (10ms).
3967+
a.rackReoWnd = 200 * time.Millisecond
3968+
a.srtt.Store(float64(10.0))
3969+
3970+
// Any onRackAfterSACK pass should bound reoWnd by SRTT.
3971+
a.onRackAfterSACK(false, time.Time{}, 0, &chunkSelectiveAck{})
3972+
assert.Equal(t, 10*time.Millisecond, a.rackReoWnd, "reoWnd must be bounded by SRTT")
3973+
}
3974+
3975+
func TestRACK_PTO_ProbesLatestOutstanding_WhenNoPending(t *testing.T) {
3976+
assoc := newRackTestAssoc(t)
3977+
3978+
// Two outstanding, none acked/abandoned.
3979+
now := time.Now()
3980+
c0 := mkChunk(100, now.Add(-10*time.Millisecond))
3981+
c1 := mkChunk(101, now)
3982+
assoc.inflightQueue.pushNoCheck(c0)
3983+
assoc.inflightQueue.pushNoCheck(c1)
3984+
3985+
// No pending -> PTO should mark latest outstanding for retransmit.
3986+
assoc.onPTOTimer()
3987+
3988+
got0, _ := assoc.inflightQueue.get(100)
3989+
got1, _ := assoc.inflightQueue.get(101)
3990+
require.NotNil(t, got0)
3991+
require.NotNil(t, got1)
3992+
3993+
assert.False(t, got0.retransmit, "older TSN should not be probed by PTO")
3994+
assert.True(t, got1.retransmit, "latest outstanding should be probed by PTO")
3995+
}
3996+
3997+
func TestRACK_PTO_DoesNotProbe_WhenPendingExists(t *testing.T) {
3998+
assoc := newRackTestAssoc(t)
3999+
4000+
// One outstanding
4001+
assoc.inflightQueue.pushNoCheck(mkChunk(100, time.Now()))
4002+
4003+
// Add something pending (generic non-nil chunk).
4004+
assoc.pendingQueue.push(&chunkPayloadData{
4005+
streamIdentifier: 2,
4006+
beginningFragment: true,
4007+
endingFragment: true,
4008+
userData: []byte("pending"),
4009+
})
4010+
4011+
// With pending data, PTO should NOT mark retransmit and simply wake sender.
4012+
assoc.onPTOTimer()
4013+
4014+
got, _ := assoc.inflightQueue.get(100)
4015+
require.NotNil(t, got)
4016+
assert.False(t, got.retransmit, "PTO must prefer sending pending data over probing")
4017+
}

0 commit comments

Comments
 (0)