Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,10 +1340,10 @@ func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCo
// called to interrupt the potential blocking on other goroutines.
s.cancel()

oldState := s.swapState(streamDone)
if oldState == streamDone {
return
}
// We can't return early even if the stream's state is "done" as the state
// might have been set by the `finishStream` method. Deleting the stream via
// `finishStream` can get blocked on flow control.
s.swapState(streamDone)
t.deleteStream(s, eosReceived)

t.controlBuf.put(&cleanupStream{
Expand Down
73 changes: 73 additions & 0 deletions test/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,76 @@ func (s) TestRSTDuringMessageRead(t *testing.T) {
t.Fatalf("client.EmptyCall() returned %v; want status with code %v", err, codes.Canceled)
}
}

// Test verifies that a client-side cancellation correctly frees up resources on
// the server. The test setup is designed to simulate a scenario where a server
// is blocked from sending a large message due to a full client-side flow
// control window. The client-side cancellation of this blocked RPC then frees
// up the max concurrent streams quota on the server, allowing a new RPC to be
// created successfully.
func (s) TestCancelWhileServerWaitingForFlowControl(t *testing.T) {
serverDoneCh := make(chan struct{}, 2)
const flowControlWindowSize = 65535
ss := &stubserver.StubServer{
StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
// Send a large message to exhaust the client's flow control window.
stream.Send(&testpb.StreamingOutputCallResponse{
Payload: &testpb.Payload{
Body: make([]byte, flowControlWindowSize+1),
},
})
serverDoneCh <- struct{}{}
return nil
},
}

// Create a server that allows only 1 stream at a time.
ss = stubserver.StartTestService(t, ss, grpc.MaxConcurrentStreams(1))
defer ss.Stop()
// Use a static flow control window.
if err := ss.StartClient(grpc.WithInitialWindowSize(flowControlWindowSize)); err != nil {
t.Fatalf("Error while start test service client: %v", err)
}
client := ss.Client
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

streamCtx, streamCancel := context.WithCancel(ctx)
defer streamCancel()

if _, err := client.StreamingOutputCall(streamCtx, &testpb.StreamingOutputCallRequest{}); err != nil {
t.Fatalf("Failed to create server streaming RPC: %v", err)
}

// Wait for the server handler to return. This should cause the trailers to
// be buffered on the server, waiting for flow control quota to first send
// the data frame.
select {
case <-ctx.Done():
t.Fatal("Context timed out waiting for server handler to return.")
case <-serverDoneCh:
}

// Attempt to create a stream. It should fail since the previous stream is
// still blocked.
shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer shortCancel()
_, err := client.StreamingOutputCall(shortCtx, &testpb.StreamingOutputCallRequest{})
if status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("Server stream creation returned error with unexpected status code: %v, want code: %v", err, codes.DeadlineExceeded)
}

// Cancel the RPC, this should free up concurrent stream quota on the
// server.
streamCancel()

// Attempt to create another stream.
stream, err := client.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
if err != nil {
t.Fatalf("Failed to create server streaming RPC: %v", err)
}
_, err = stream.Recv()
if err != nil {
t.Fatalf("Failed to read from the stream: %v", err)
}
}