Skip to content

Commit 778d331

Browse files
committed
fix turn leaks
1 parent 90e44fc commit 778d331

File tree

4 files changed

+146
-17
lines changed

4 files changed

+146
-17
lines changed

internal/pool/pool.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,10 @@ func (p *ConnPool) queuedNewConn(ctx context.Context) (*Conn, error) {
536536
var err error
537537
defer func() {
538538
if err != nil {
539-
w.cancel(ctx, p)
539+
if cn := w.cancel(); cn != nil {
540+
p.putIdleConn(ctx, cn)
541+
p.freeTurn()
542+
}
540543
}
541544
}()
542545

@@ -563,6 +566,7 @@ func (p *ConnPool) queuedNewConn(ctx context.Context) (*Conn, error) {
563566
return
564567
} else if cnErr == nil && !delivered {
565568
p.putIdleConn(dialCtx, cn)
569+
p.freeTurn()
566570
freeTurnCalled = true
567571
} else {
568572
p.freeTurn()
@@ -604,8 +608,6 @@ func (p *ConnPool) putIdleConn(ctx context.Context, cn *Conn) {
604608
// poolSize is increased in newConn
605609
p.idleConns = append(p.idleConns, cn)
606610
p.idleConnsLen.Add(1)
607-
608-
p.freeTurn()
609611
}
610612

611613
func (p *ConnPool) waitTurn(ctx context.Context) error {

internal/pool/pool_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,11 @@ var _ = Describe("queuedNewConn", func() {
604604

605605
// Wait for first request to complete
606606
<-done1
607+
608+
// Verify all turns are released after requests complete
609+
Eventually(func() int {
610+
return testPool.QueueLen()
611+
}, "1s", "50ms").Should(Equal(0), "All turns should be released after requests complete")
607612
})
608613

609614
It("should handle context cancellation while waiting for connection result", func() {
@@ -646,8 +651,19 @@ var _ = Describe("queuedNewConn", func() {
646651
<-done
647652
Expect(err2).To(Equal(context.DeadlineExceeded))
648653

654+
// Verify turn state - background goroutine may still hold turn
655+
// Note: Background connection creation will complete and release turn
656+
Eventually(func() int {
657+
return testPool.QueueLen()
658+
}, "1s", "50ms").Should(Equal(1), "Only conn1's turn should be held")
659+
649660
// Clean up - release the first connection
650661
testPool.Put(ctx, conn1)
662+
663+
// Verify all turns are released after cleanup
664+
Eventually(func() int {
665+
return testPool.QueueLen()
666+
}, "1s", "50ms").Should(Equal(0), "All turns should be released after cleanup")
651667
})
652668

653669
It("should handle dial failures gracefully", func() {
@@ -668,6 +684,11 @@ var _ = Describe("queuedNewConn", func() {
668684
_, err := testPool.Get(ctx)
669685
Expect(err).To(HaveOccurred())
670686
Expect(err.Error()).To(ContainSubstring("dial failed"))
687+
688+
// Verify turn is released after dial failure
689+
Eventually(func() int {
690+
return testPool.QueueLen()
691+
}, "1s", "50ms").Should(Equal(0), "Turn should be released after dial failure")
671692
})
672693

673694
It("should handle connection creation success with normal delivery", func() {
@@ -909,6 +930,112 @@ var _ = Describe("queuedNewConn", func() {
909930

910931
// Cleanup
911932
testPool.Put(context.Background(), conn2)
933+
934+
// Verify turn is released after putIdleConn path completes
935+
// This is critical: ensures freeTurn() was called in the putIdleConn branch
936+
Eventually(func() int {
937+
return testPool.QueueLen()
938+
}, "1s", "50ms").Should(Equal(0),
939+
"Turn should be released after putIdleConn path completes")
940+
})
941+
942+
It("should not leak turn when delivering connection via putIdleConn", func() {
943+
// This test verifies that freeTurn() is called when putIdleConn successfully
944+
// delivers a connection to another waiting request
945+
//
946+
// Scenario:
947+
// 1. Request A: timeout 150ms, connection creation takes 200ms
948+
// 2. Request B: timeout 500ms, connection creation takes 400ms
949+
// 3. Both requests enter dialsQueue and start async connection creation
950+
// 4. Request A times out at 150ms
951+
// 5. Request A's connection completes at 200ms
952+
// 6. putIdleConn delivers Request A's connection to Request B
953+
// 7. queuedNewConn must call freeTurn()
954+
// 8. Check: QueueLen should be 1 (only B holding turn), not 2 (A's turn leaked)
955+
956+
callCount := int32(0)
957+
958+
controlledDialer := func(ctx context.Context) (net.Conn, error) {
959+
count := atomic.AddInt32(&callCount, 1)
960+
if count == 1 {
961+
// Request A's connection: takes 200ms
962+
time.Sleep(200 * time.Millisecond)
963+
} else {
964+
// Request B's connection: takes 400ms (longer, so A's connection is used)
965+
time.Sleep(400 * time.Millisecond)
966+
}
967+
return newDummyConn(), nil
968+
}
969+
970+
testPool := pool.NewConnPool(&pool.Options{
971+
Dialer: controlledDialer,
972+
PoolSize: 2, // Allows both requests to get turns
973+
MaxConcurrentDials: 2, // Allows both connections to be created simultaneously
974+
DialTimeout: 500 * time.Millisecond,
975+
PoolTimeout: 1 * time.Second,
976+
})
977+
defer testPool.Close()
978+
979+
// Verify initial state
980+
Expect(testPool.QueueLen()).To(Equal(0))
981+
982+
// Request A: Short timeout (150ms), connection takes 200ms
983+
reqADone := make(chan error, 1)
984+
go func() {
985+
defer GinkgoRecover()
986+
shortCtx, cancel := context.WithTimeout(ctx, 150*time.Millisecond)
987+
defer cancel()
988+
_, err := testPool.Get(shortCtx)
989+
reqADone <- err
990+
}()
991+
992+
// Wait for Request A to acquire turn and enter dialsQueue
993+
time.Sleep(50 * time.Millisecond)
994+
Expect(testPool.QueueLen()).To(Equal(1), "Request A should occupy turn")
995+
996+
// Request B: Long timeout (500ms), will receive Request A's connection
997+
reqBDone := make(chan struct{})
998+
var reqBConn *pool.Conn
999+
var reqBErr error
1000+
go func() {
1001+
defer GinkgoRecover()
1002+
longCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
1003+
defer cancel()
1004+
reqBConn, reqBErr = testPool.Get(longCtx)
1005+
close(reqBDone)
1006+
}()
1007+
1008+
// Wait for Request B to acquire turn and enter dialsQueue
1009+
time.Sleep(50 * time.Millisecond)
1010+
Expect(testPool.QueueLen()).To(Equal(2), "Both requests should occupy turns")
1011+
1012+
// Request A times out at 150ms
1013+
reqAErr := <-reqADone
1014+
Expect(reqAErr).To(HaveOccurred(), "Request A should timeout")
1015+
1016+
// Request A's connection completes at 200ms
1017+
// putIdleConn delivers it to Request B via tryDeliver
1018+
// queuedNewConn MUST call freeTurn() to release Request A's turn
1019+
<-reqBDone
1020+
Expect(reqBErr).NotTo(HaveOccurred(), "Request B should receive Request A's connection")
1021+
Expect(reqBConn).NotTo(BeNil())
1022+
1023+
// CRITICAL CHECK: Turn leak detection
1024+
// After Request B receives connection from putIdleConn:
1025+
// - Request A's turn SHOULD be released (via freeTurn)
1026+
// - Request B's turn is still held (will release on Put)
1027+
// Expected QueueLen: 1 (only Request B)
1028+
// If Bug exists (missing freeTurn): QueueLen: 2 (Request A's turn leaked)
1029+
time.Sleep(100 * time.Millisecond) // Allow time for turn release
1030+
currentQueueLen := testPool.QueueLen()
1031+
1032+
Expect(currentQueueLen).To(Equal(1),
1033+
"QueueLen should be 1 (only Request B holding turn). "+
1034+
"If it's 2, Request A's turn leaked due to missing freeTurn()")
1035+
1036+
// Cleanup
1037+
testPool.Put(ctx, reqBConn)
1038+
Eventually(func() int { return testPool.QueueLen() }, "500ms").Should(Equal(0))
9121039
})
9131040
})
9141041

internal/pool/want_conn.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (w *wantConn) tryDeliver(cn *Conn, err error) bool {
3737
return true
3838
}
3939

40-
func (w *wantConn) cancel(ctx context.Context, p *ConnPool) {
40+
func (w *wantConn) cancel() *Conn {
4141
w.mu.Lock()
4242
var cn *Conn
4343
if w.done {
@@ -54,9 +54,7 @@ func (w *wantConn) cancel(ctx context.Context, p *ConnPool) {
5454
w.ctx = nil
5555
w.mu.Unlock()
5656

57-
if cn != nil {
58-
p.putIdleConn(ctx, cn)
59-
}
57+
return cn
6058
}
6159

6260
type wantConnResult struct {

internal/pool/want_conn_test.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,13 @@ func TestWantConn_cancel_NotDone(t *testing.T) {
125125
result: make(chan wantConnResult, 1),
126126
}
127127

128-
// Create a mock pool
129-
pool := &ConnPool{}
130-
131128
// Test cancel when not done
132-
w.cancel(context.Background(), pool)
129+
cn := w.cancel()
130+
131+
// Should return nil since no connection was not delivered
132+
if cn != nil {
133+
t.Errorf("cancel()= %v, want nil when no connection delivered", cn)
134+
}
133135

134136
// Check that wantConn is marked as done
135137
if !w.done {
@@ -163,13 +165,13 @@ func TestWantConn_cancel_AlreadyDone(t *testing.T) {
163165
testErr := errors.New("test error")
164166
w.result <- wantConnResult{cn: nil, err: testErr}
165167

166-
// Create a mock pool
167-
pool := &ConnPool{
168-
cfg: &Options{},
169-
}
170-
171168
// Test cancel when already done
172-
w.cancel(context.Background(), pool)
169+
cn := w.cancel()
170+
171+
// Should return nil since the result had no connection
172+
if cn != nil {
173+
t.Errorf("cancel()= %v, want nil when result had no connection", cn)
174+
}
173175

174176
// Check that wantConn remains done
175177
if !w.done {

0 commit comments

Comments
 (0)