From a56875c6f2c6e928d47c24a709f55e3b7e90f2db Mon Sep 17 00:00:00 2001 From: cryo Date: Fri, 11 Jul 2025 06:59:55 +0000 Subject: [PATCH 1/3] fix: prevent potential goroutine leak on blocked incoming channel --- mcp/streamable.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/mcp/streamable.go b/mcp/streamable.go index 11d70a3..87b19db 100644 --- a/mcp/streamable.go +++ b/mcp/streamable.go @@ -760,7 +760,12 @@ func (s *streamableClientConn) handleSSE(resp *http.Response) { // TODO: surface this error; possibly break the stream return } - s.incoming <- evt.data + + select { + case s.incoming <- evt.data: + case <-s.done: + return + } } }() From c71f318c60f80456e37acc831e896e2ff4fa0b97 Mon Sep 17 00:00:00 2001 From: cryo Date: Sun, 13 Jul 2025 17:15:57 +0000 Subject: [PATCH 2/3] add test --- mcp/streamable_test.go | 48 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/mcp/streamable_test.go b/mcp/streamable_test.go index e0e85a1..256bfd5 100644 --- a/mcp/streamable_test.go +++ b/mcp/streamable_test.go @@ -14,10 +14,12 @@ import ( "net/http/cookiejar" "net/http/httptest" "net/url" + "runtime" "strings" "sync" "sync/atomic" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -597,3 +599,49 @@ func TestEventID(t *testing.T) { }) } } + +func TestStreamableClientConnSSEGoroutineLeak(t *testing.T) { + // Initialize a streamableClientConn instance with channels + conn := &streamableClientConn{ + incoming: make(chan []byte, 1), + done: make(chan struct{}), + } + + // Construct mock SSE response data + var builder strings.Builder + for range 3 { + builder.WriteString("data: hello world\n\n") + } + resp := &http.Response{ + Body: io.NopCloser(strings.NewReader(builder.String())), + } + + // Start the handleSSE goroutine manually + go conn.handleSSE(resp) + + // Wait until incoming channel is filled + deadlineCtx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Millisecond) + defer cancelFunc() + for len(conn.incoming) < cap(conn.incoming) { // Ensure enough events are written + select { + case <-deadlineCtx.Done(): + t.Skipf("timeout when waiting for streamableClientConn.incoming to be full, test skipped: %v", deadlineCtx.Err()) + default: + // Continue checking until the channel is full + } + } + + // Now simulate calling Close() and blocking the goroutine + close(conn.done) + time.Sleep(50 * time.Millisecond) // Allow goroutine to exit + + // Check if "scanEvents" goroutine is still running + leakKey := "scanEvents" + buf := make([]byte, 1024*1024) + n := runtime.Stack(buf, true) + stack := string(buf[:n]) + + if idx := strings.Index(stack, leakKey); idx != -1 { + t.Fatalf("goroutine leak detected: %s still active", leakKey) + } +} From e03703c58396931c434e2fbe86d5939ceb7ca096 Mon Sep 17 00:00:00 2001 From: cryo Date: Wed, 16 Jul 2025 03:18:14 +0000 Subject: [PATCH 3/3] replace WaitGroup with Sleep --- mcp/streamable_test.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/mcp/streamable_test.go b/mcp/streamable_test.go index 11cd1d9..a76e68c 100644 --- a/mcp/streamable_test.go +++ b/mcp/streamable_test.go @@ -617,7 +617,12 @@ func TestStreamableClientConnSSEGoroutineLeak(t *testing.T) { } // Start the handleSSE goroutine manually - go conn.handleSSE(resp) + var wg sync.WaitGroup + wg.Add(1) + go func() { + conn.handleSSE(resp) + wg.Done() + }() // Wait until incoming channel is filled deadlineCtx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond) @@ -633,7 +638,7 @@ func TestStreamableClientConnSSEGoroutineLeak(t *testing.T) { // Now simulate calling Close() and blocking the goroutine close(conn.done) - time.Sleep(50 * time.Millisecond) // Allow goroutine to exit + wg.Wait() // Check if "scanEvents" goroutine is still running leakKey := "scanEvents"