diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 7e53eb1735ef..64e47dd3ac1a 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -1340,10 +1340,10 @@ func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCo // called to interrupt the potential blocking on other goroutines. s.cancel() - oldState := s.swapState(streamDone) - if oldState == streamDone { - return - } + // We can't return early even if the stream's state is "done" as the state + // might have been set by the `finishStream` method. Deleting the stream via + // `finishStream` can get blocked on flow control. + s.swapState(streamDone) t.deleteStream(s, eosReceived) t.controlBuf.put(&cleanupStream{ diff --git a/test/transport_test.go b/test/transport_test.go index 10e9ab57de30..cc591c7c7037 100644 --- a/test/transport_test.go +++ b/test/transport_test.go @@ -229,3 +229,76 @@ func (s) TestRSTDuringMessageRead(t *testing.T) { t.Fatalf("client.EmptyCall() returned %v; want status with code %v", err, codes.Canceled) } } + +// Test verifies that a client-side cancellation correctly frees up resources on +// the server. The test setup is designed to simulate a scenario where a server +// is blocked from sending a large message due to a full client-side flow +// control window. The client-side cancellation of this blocked RPC then frees +// up the max concurrent streams quota on the server, allowing a new RPC to be +// created successfully. +func (s) TestCancelWhileServerWaitingForFlowControl(t *testing.T) { + serverDoneCh := make(chan struct{}, 2) + const flowControlWindowSize = 65535 + ss := &stubserver.StubServer{ + StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error { + // Send a large message to exhaust the client's flow control window. + stream.Send(&testpb.StreamingOutputCallResponse{ + Payload: &testpb.Payload{ + Body: make([]byte, flowControlWindowSize+1), + }, + }) + serverDoneCh <- struct{}{} + return nil + }, + } + + // Create a server that allows only 1 stream at a time. + ss = stubserver.StartTestService(t, ss, grpc.MaxConcurrentStreams(1)) + defer ss.Stop() + // Use a static flow control window. + if err := ss.StartClient(grpc.WithInitialWindowSize(flowControlWindowSize)); err != nil { + t.Fatalf("Error while start test service client: %v", err) + } + client := ss.Client + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + streamCtx, streamCancel := context.WithCancel(ctx) + defer streamCancel() + + if _, err := client.StreamingOutputCall(streamCtx, &testpb.StreamingOutputCallRequest{}); err != nil { + t.Fatalf("Failed to create server streaming RPC: %v", err) + } + + // Wait for the server handler to return. This should cause the trailers to + // be buffered on the server, waiting for flow control quota to first send + // the data frame. + select { + case <-ctx.Done(): + t.Fatal("Context timed out waiting for server handler to return.") + case <-serverDoneCh: + } + + // Attempt to create a stream. It should fail since the previous stream is + // still blocked. + shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer shortCancel() + _, err := client.StreamingOutputCall(shortCtx, &testpb.StreamingOutputCallRequest{}) + if status.Code(err) != codes.DeadlineExceeded { + t.Fatalf("Server stream creation returned error with unexpected status code: %v, want code: %v", err, codes.DeadlineExceeded) + } + + // Cancel the RPC, this should free up concurrent stream quota on the + // server. + streamCancel() + + // Attempt to create another stream. + stream, err := client.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{}) + if err != nil { + t.Fatalf("Failed to create server streaming RPC: %v", err) + } + _, err = stream.Recv() + if err != nil { + t.Fatalf("Failed to read from the stream: %v", err) + } +}