diff --git a/stream.go b/stream.go index d9bbd4c57cf6..0a0af8961f05 100644 --- a/stream.go +++ b/stream.go @@ -549,6 +549,8 @@ type clientStream struct { sentLast bool // sent an end stream + receivedFirstMsg bool // set after the first message is received + methodConfig *MethodConfig ctx context.Context // the application's context, wrapped by stats/tracing @@ -1144,11 +1146,16 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { if statusErr := a.transportStream.Status().Err(); statusErr != nil { return statusErr } + // Received no msg and status OK for non-server streaming rpcs. + if !cs.desc.ServerStreams && !cs.receivedFirstMsg { + return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC") + } return io.EOF // indicates successful end of stream. } return toRPCErr(err) } + cs.receivedFirstMsg = true if a.trInfo != nil { a.mu.Lock() if a.trInfo.tr != nil { @@ -1177,7 +1184,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { } else if err != nil { return toRPCErr(err) } - return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") + return status.Error(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") } func (a *csAttempt) finish(err error) { @@ -1359,6 +1366,7 @@ type addrConnStream struct { transport transport.ClientTransport ctx context.Context sentLast bool + receivedFirstMsg bool desc *StreamDesc codec baseCodec sendCompressorV0 Compressor @@ -1484,10 +1492,15 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { if statusErr := as.transportStream.Status().Err(); statusErr != nil { return statusErr } + // Received no msg and status OK for non-server streaming rpcs. + if !as.desc.ServerStreams && !as.receivedFirstMsg { + return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC") + } return io.EOF // indicates successful end of stream. } return toRPCErr(err) } + as.receivedFirstMsg = true if as.desc.ServerStreams { // Subsequent messages should be received by subsequent RecvMsg calls. @@ -1501,7 +1514,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { } else if err != nil { return toRPCErr(err) } - return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") + return status.Error(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") } func (as *addrConnStream) finish(err error) { diff --git a/test/end2end_test.go b/test/end2end_test.go index b2f503990bc1..9157c525c094 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3590,9 +3590,6 @@ func testClientStreamingError(t *testing.T, e env) { // Tests that a client receives a cardinality violation error for client-streaming // RPCs if the server doesn't send a message before returning status OK. func (s) TestClientStreamingCardinalityViolation_ServerHandlerMissingSendAndClose(t *testing.T) { - // TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()` - // after this is fixed. - t.Skip() ss := &stubserver.StubServer{ StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error { // Returning status OK without sending a response message.This is a @@ -3741,6 +3738,165 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } +// Tests that client receives a cardinality violation error for unary +// RPCs if the server doesn't send a message before returning status OK. +func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + defer lis.Close() + + ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error { + return nil + }) + + s := grpc.NewServer(ss) + go s.Serve(lis) + defer s.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + +// Tests the behavior for unary RPC when client calls RecvMsg() twice. +// Second call to RecvMsg should fail with io.EOF. +func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) { + e := tcpTLSEnv + te := newTest(t, e) + defer te.tearDown() + + te.startServer(&testServer{security: e.security}) + + cc := te.clientConn() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + desc := &grpc.StreamDesc{ + StreamName: "UnaryCall", + ServerStreams: false, + ClientStreams: false, + } + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall") + if err != nil { + t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) + } + + if err := stream.SendMsg(&testpb.SimpleRequest{}); err != nil { + t.Fatalf("stream.SendMsg(_) = %v, want ", err) + } + + resp := &testpb.SimpleResponse{} + if err := stream.RecvMsg(resp); err != nil { + t.Fatalf("stream.RecvMsg() = %v , want ", err) + } + + if err = stream.RecvMsg(resp); err != io.EOF { + t.Errorf("stream.RecvMsg() = %v, want error %v", err, io.EOF) + } +} + +// Tests the behavior for unary RPC when server calls SendMsg() twice. +// Client should fail with cardinality violation error. +func (s) TestUnaryRPC_ServerCallSendMsgTwice(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + defer lis.Close() + + s := grpc.NewServer() + serviceDesc := grpc.ServiceDesc{ + ServiceName: "grpc.testing.TestService", + HandlerType: (*any)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "UnaryCall", + Handler: func(_ any, stream grpc.ServerStream) error { + if err := stream.RecvMsg(&testpb.Empty{}); err != nil { + t.Errorf("stream.RecvMsg() = %v, want ", err) + } + + if err = stream.SendMsg(&testpb.Empty{}); err != nil { + t.Errorf("stream.SendMsg() = %v, want ", err) + } + + if err = stream.SendMsg(&testpb.Empty{}); err != nil { + t.Errorf("stream.SendMsg() = %v, want ", err) + } + return nil + }, + ClientStreams: false, + ServerStreams: false, + }, + }, + } + s.RegisterService(&serviceDesc, &testServer{}) + go s.Serve(lis) + defer s.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err = client.UnaryCall(ctx, &testpb.SimpleRequest{}); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + +// Tests the behavior for client-streaming RPC when client calls RecvMsg() twice. +// Second call to RecvMsg should fail with io.EOF. +func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) { + ss := stubserver.StubServer{ + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { + t.Errorf("stream.SendAndClose(_) = %v, want ", err) + } + return nil + }, + } + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := ss.Client.StreamingInputCall(ctx) + if err != nil { + t.Fatalf(".StreamingInputCall(_) = _, %v, want ", err) + } + if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil { + t.Fatalf("stream.Send(_) = %v, want ", err) + } + if err := stream.CloseSend(); err != nil { + t.Fatalf("stream.CloseSend() = %v, want ", err) + } + resp := new(testpb.StreamingInputCallResponse) + if err := stream.RecvMsg(resp); err != nil { + t.Fatalf("stream.RecvMsg() = %v , want ", err) + } + if err = stream.RecvMsg(resp); err != io.EOF { + t.Errorf("stream.RecvMsg() = %v, want error %v", err, io.EOF) + } +} + // Tests the behavior for server-side streaming when client calls SendMsg twice. // Second call to SendMsg should fail with Internal error and result in closing // the connection with a RST_STREAM. @@ -3802,7 +3958,6 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { <-handlerDone } -// TODO(i/7286) : Add tests to check server-side behavior for Unary RPC. // Tests the behavior for unary RPC when client calls SendMsg twice. Second call // to SendMsg should fail with Internal error. func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { @@ -3981,7 +4136,7 @@ func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { } // Tests that a client receives a cardinality violation error for client-streaming -// RPCs if the server call SendMsg multiple times. +// RPCs if the server calls SendMsg() multiple times. func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) { ss := stubserver.StubServer{ StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {