Skip to content

mcp/stream: fix prevent potential goroutine leak on blocked incoming channel #123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions mcp/streamable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (
"net/http/cookiejar"
"net/http/httptest"
"net/url"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -597,3 +599,49 @@ func TestEventID(t *testing.T) {
})
}
}

func TestStreamableClientConnSSEGoroutineLeak(t *testing.T) {
// Initialize a streamableClientConn instance with channels
conn := &streamableClientConn{
incoming: make(chan []byte, 1),
done: make(chan struct{}),
}

// Construct mock SSE response data
var builder strings.Builder
for range 3 {
builder.WriteString("data: hello world\n\n")
}
resp := &http.Response{
Body: io.NopCloser(strings.NewReader(builder.String())),
}

// Start the handleSSE goroutine manually
go conn.handleSSE(resp)

// Wait until incoming channel is filled
deadlineCtx, cancelFunc := context.WithTimeout(context.Background(), 30*time.Millisecond)
defer cancelFunc()
for len(conn.incoming) < cap(conn.incoming) { // Ensure enough events are written
select {
case <-deadlineCtx.Done():
t.Skipf("timeout when waiting for streamableClientConn.incoming to be full, test skipped: %v", deadlineCtx.Err())
default:
// Continue checking until the channel is full
}
}

// Now simulate calling Close() and blocking the goroutine
close(conn.done)
time.Sleep(50 * time.Millisecond) // Allow goroutine to exit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a sync.WaitGroup instead of a sleep to remove flakiness?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔, I agree that sync.WaitGroup is a great tool for waiting on goroutines, but in this case, since scanEvents is launched within handleSSE and doesn't expose a separate wait condition, sync.WaitGroup wouldn't directly help in waiting for scanEvents to exit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, we could wrap the call to handleSSE:

go func() {
    conn.handleSSE(resp)
    wg.Done()
}

Copy link
Contributor Author

@cryo-zd cryo-zd Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👋, sorry for my delayed response.

I’ve implemented the proposed approach using a sync.WaitGroup around handleSSE, and replaced the time.Sleep with wg.Wait().

However, the test fails intermittently — although the leak has been correctly fixed. My understanding is that the scanEvents goroutine, which is launched inside handleSSE, might not observe the closed done channel and exit before handleSSE returns and wg.Wait() completes due to scheduling variance. So the test catches it as a false positive leak, even though the fix is correct.

Thanks again for the helpful suggestions and patience!
testFail

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rebase with upstream and upload the test?

I added the waitgroup and I am not able to reproduce the failures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick response!

I actually noticed two days ago that the leak issue had been fixed — great to see that resolved.

I've rebased the branch and pushed the test update using sync.WaitGroup as suggested. On my local runs, the test sometimes passes, but occasionally fails due to a detected goroutine leak, which I believe is caused by scheduling variance.

This test currently relies on timing in two places to simulate the blocked goroutine scenario, and I personally feel this makes it a bit less elegant and deterministic. Given the potential for flakiness in CI, I'm also happy to close this PR if you'd prefer not to include this regression test.

test3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why it's failing on your machine, it seems to work on mine after running it 10000 times 🤔 :

:~/w/go-sdk/mcp$ go test -count=10000 -race -run TestStreamableClientConnSSEGoroutineLeak
PASS
ok      github.com/modelcontextprotocol/go-sdk/mcp      12.092s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👋, sorry for the delay and thank you so much for your patience and continued feedback!

To help validate the issue further, I added a dedicated step to my GitHub Actions workflow in my personal fork using the following command:

- name: Test
      run: go test -v ./...
- name: TestGoroutineLeak
      run: go test -count=1000 -v -race -run TestStreamableClientConnSSEGoroutineLeak ./mcp

This directly targets the streamable_test.go file in the mcp package, where the leak test is defined. I noticed a failure in this setup using the same environment as upstream CI.

Here's the link to the failing run: link

When you have a moment, you could take a look and confirm whether it aligns with what you’d expect?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for that!

I understand now that sleep is necessary to allow for scheduling variance. We can keep the sync.waitgroup and add back the small delay to allow for scanEvents to shut down. Then it should be good to submit. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again for all the helpful discussion and feedback.

While preparing to update the PR, I noticed that the latest version of the codebase has significantly changed the SSE handling logic( #133 ). The handleSSE function no longer starts a separate scanEvents goroutine. Instead, it now processes the stream directly through processStream in a blocking loop.

As a result, the potential scanEvents goroutine leak scenario we were testing for no longer exists in this implementation. The regression test now always passes, but it no longer provides meaningful protection or value.

To avoid unnecessary test complexity and maintenance cost, I'm going to go ahead and close this PR and the related issue.Please feel free to reopen if there's still interest in preserving a version of the test for historical reference.

Thanks again for your time and thoughtful review!


// Check if "scanEvents" goroutine is still running
leakKey := "scanEvents"
buf := make([]byte, 1024*1024)
n := runtime.Stack(buf, true)
stack := string(buf[:n])

if idx := strings.Index(stack, leakKey); idx != -1 {
t.Fatalf("goroutine leak detected: %s still active", leakKey)
}
}
Loading