From 6792a42615fcf71bcce639cee7a2ac1f20bbce35 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 19 Aug 2025 18:06:23 +0000 Subject: [PATCH 1/7] add extra state to track calls to client.recvmsg --- stream.go | 17 ++++- test/end2end_test.go | 165 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 175 insertions(+), 7 deletions(-) diff --git a/stream.go b/stream.go index d9bbd4c57cf6..81f4ed4cadc1 100644 --- a/stream.go +++ b/stream.go @@ -549,6 +549,8 @@ type clientStream struct { sentLast bool // sent an end stream + recvFirstMsg bool // set after the first message is received + methodConfig *MethodConfig ctx context.Context // the application's context, wrapped by stats/tracing @@ -1144,11 +1146,16 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { if statusErr := a.transportStream.Status().Err(); statusErr != nil { return statusErr } + // Received no msg and status OK for non-server streaming rpcs. + if !cs.desc.ServerStreams && !cs.recvFirstMsg { + return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC") + } return io.EOF // indicates successful end of stream. } return toRPCErr(err) } + cs.recvFirstMsg = true if a.trInfo != nil { a.mu.Lock() if a.trInfo.tr != nil { @@ -1177,7 +1184,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { } else if err != nil { return toRPCErr(err) } - return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") + return status.Error(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") } func (a *csAttempt) finish(err error) { @@ -1359,6 +1366,7 @@ type addrConnStream struct { transport transport.ClientTransport ctx context.Context sentLast bool + recvFirstMsg bool desc *StreamDesc codec baseCodec sendCompressorV0 Compressor @@ -1484,10 +1492,15 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { if statusErr := as.transportStream.Status().Err(); statusErr != nil { return statusErr } + // Received no msg and status OK for non-server streaming rpcs. + if !as.desc.ServerStreams && !as.recvFirstMsg { + return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC") + } return io.EOF // indicates successful end of stream. } return toRPCErr(err) } + as.recvFirstMsg = true if as.desc.ServerStreams { // Subsequent messages should be received by subsequent RecvMsg calls. @@ -1501,7 +1514,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { } else if err != nil { return toRPCErr(err) } - return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") + return status.Error(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") } func (as *addrConnStream) finish(err error) { diff --git a/test/end2end_test.go b/test/end2end_test.go index b2f503990bc1..b983e6935de5 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3590,9 +3590,6 @@ func testClientStreamingError(t *testing.T, e env) { // Tests that a client receives a cardinality violation error for client-streaming // RPCs if the server doesn't send a message before returning status OK. func (s) TestClientStreamingCardinalityViolation_ServerHandlerMissingSendAndClose(t *testing.T) { - // TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()` - // after this is fixed. - t.Skip() ss := &stubserver.StubServer{ StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error { // Returning status OK without sending a response message.This is a @@ -3741,6 +3738,165 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } +// Tests that client receives a cardinality violation error for unary +// RPCs if the server doesn't send a message before returning status OK. +func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + defer lis.Close() + + ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error { + return nil + }) + + s := grpc.NewServer(ss) + go s.Serve(lis) + defer s.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + +// Tests the behavior for unary RPC when client calls RecvMsg() twice. +// Second call to RecvMsg should fail with io.EOF. +func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) { + e := tcpTLSEnv + te := newTest(t, e) + defer te.tearDown() + + te.startServer(&testServer{security: e.security}) + + cc := te.clientConn() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + desc := &grpc.StreamDesc{ + StreamName: "UnaryCall", + ServerStreams: false, + ClientStreams: false, + } + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall") + if err != nil { + t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) + } + + if err := stream.SendMsg(&testpb.SimpleRequest{}); err != nil { + t.Fatalf("stream.SendMsg(_) = %v, want ", err) + } + + resp := &testpb.SimpleResponse{} + if err := stream.RecvMsg(resp); err != nil { + t.Fatalf("stream.RecvMsg() = %v , want ", err) + } + + if err = stream.RecvMsg(resp); err != io.EOF { + t.Errorf("stream.RecvMsg() = %v, want error %v", err, io.EOF) + } +} + +// Tests the behavior for unary RPC when server calls SendMsg() twice. +// Client should fail with cardinality violation error. +func (s) TestUnaryRPC_ServerCallSendMsgTwice(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + defer lis.Close() + + s := grpc.NewServer() + serviceDesc := grpc.ServiceDesc{ + ServiceName: "grpc.testing.TestService", + HandlerType: (*any)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "UnaryCall", + Handler: func(_ any, stream grpc.ServerStream) error { + if err := stream.RecvMsg(&testpb.Empty{}); err != nil { + t.Errorf("stream.RecvMsg() = %v, want ", err) + } + + if err = stream.SendMsg(&testpb.Empty{}); err != nil { + t.Errorf("stream.SendMsg() = %v, want ", err) + } + + if err = stream.SendMsg(&testpb.Empty{}); err != nil { + t.Errorf("stream.SendMsg() = %v, want ", err) + } + return nil + }, + ClientStreams: false, + ServerStreams: false, + }, + }, + } + s.RegisterService(&serviceDesc, &testServer{}) + go s.Serve(lis) + defer s.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err = client.UnaryCall(ctx, &testpb.SimpleRequest{}); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + +// Tests the behavior for client-streaming RPC when client calls RecvMsg() twice. +// Second call to RecvMsg should fail with io.EOF. +func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) { + ss := stubserver.StubServer{ + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { + t.Errorf("stream.SendAndClose(_) = %v, want ", err) + } + return nil + }, + } + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := ss.Client.StreamingInputCall(ctx) + if err != nil { + t.Fatalf(".StreamingInputCall(_) = _, %v, want ", err) + } + if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil { + t.Fatalf("stream.Send(_) = %v, want ", err) + } + if err := stream.CloseSend(); err != nil { + t.Fatalf("stream.CloseSend() = %v, want ", err) + } + resp := new(testpb.StreamingInputCallResponse) + if err := stream.RecvMsg(resp); err != nil { + t.Fatalf("stream.RecvMsg() = %v , want ", err) + } + if err = stream.RecvMsg(resp); err != io.EOF { + t.Errorf("stream.RecvMsg() = %v, want error %v", err, io.EOF) + } +} + // Tests the behavior for server-side streaming when client calls SendMsg twice. // Second call to SendMsg should fail with Internal error and result in closing // the connection with a RST_STREAM. @@ -3802,7 +3958,6 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { <-handlerDone } -// TODO(i/7286) : Add tests to check server-side behavior for Unary RPC. // Tests the behavior for unary RPC when client calls SendMsg twice. Second call // to SendMsg should fail with Internal error. func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { @@ -3981,7 +4136,7 @@ func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { } // Tests that a client receives a cardinality violation error for client-streaming -// RPCs if the server call SendMsg multiple times. +// RPCs if the server call SendMsg() multiple times. func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) { ss := stubserver.StubServer{ StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { From 206ddf8b117fef7e587e5434433ee12d75e0120b Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Fri, 22 Aug 2025 07:03:05 +0000 Subject: [PATCH 2/7] Revert "add extra state to track calls to client.recvmsg" This reverts commit 6792a42615fcf71bcce639cee7a2ac1f20bbce35. --- stream.go | 17 +---- test/end2end_test.go | 165 ++----------------------------------------- 2 files changed, 7 insertions(+), 175 deletions(-) diff --git a/stream.go b/stream.go index 81f4ed4cadc1..d9bbd4c57cf6 100644 --- a/stream.go +++ b/stream.go @@ -549,8 +549,6 @@ type clientStream struct { sentLast bool // sent an end stream - recvFirstMsg bool // set after the first message is received - methodConfig *MethodConfig ctx context.Context // the application's context, wrapped by stats/tracing @@ -1146,16 +1144,11 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { if statusErr := a.transportStream.Status().Err(); statusErr != nil { return statusErr } - // Received no msg and status OK for non-server streaming rpcs. - if !cs.desc.ServerStreams && !cs.recvFirstMsg { - return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC") - } return io.EOF // indicates successful end of stream. } return toRPCErr(err) } - cs.recvFirstMsg = true if a.trInfo != nil { a.mu.Lock() if a.trInfo.tr != nil { @@ -1184,7 +1177,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { } else if err != nil { return toRPCErr(err) } - return status.Error(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") + return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") } func (a *csAttempt) finish(err error) { @@ -1366,7 +1359,6 @@ type addrConnStream struct { transport transport.ClientTransport ctx context.Context sentLast bool - recvFirstMsg bool desc *StreamDesc codec baseCodec sendCompressorV0 Compressor @@ -1492,15 +1484,10 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { if statusErr := as.transportStream.Status().Err(); statusErr != nil { return statusErr } - // Received no msg and status OK for non-server streaming rpcs. - if !as.desc.ServerStreams && !as.recvFirstMsg { - return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC") - } return io.EOF // indicates successful end of stream. } return toRPCErr(err) } - as.recvFirstMsg = true if as.desc.ServerStreams { // Subsequent messages should be received by subsequent RecvMsg calls. @@ -1514,7 +1501,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { } else if err != nil { return toRPCErr(err) } - return status.Error(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") + return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") } func (as *addrConnStream) finish(err error) { diff --git a/test/end2end_test.go b/test/end2end_test.go index b983e6935de5..b2f503990bc1 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3590,6 +3590,9 @@ func testClientStreamingError(t *testing.T, e env) { // Tests that a client receives a cardinality violation error for client-streaming // RPCs if the server doesn't send a message before returning status OK. func (s) TestClientStreamingCardinalityViolation_ServerHandlerMissingSendAndClose(t *testing.T) { + // TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()` + // after this is fixed. + t.Skip() ss := &stubserver.StubServer{ StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error { // Returning status OK without sending a response message.This is a @@ -3738,165 +3741,6 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } -// Tests that client receives a cardinality violation error for unary -// RPCs if the server doesn't send a message before returning status OK. -func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { - lis, err := testutils.LocalTCPListener() - if err != nil { - t.Fatal(err) - } - defer lis.Close() - - ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error { - return nil - }) - - s := grpc.NewServer(ss) - go s.Serve(lis) - defer s.Stop() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) - } - defer cc.Close() - - client := testgrpc.NewTestServiceClient(cc) - if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Internal { - t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) - } -} - -// Tests the behavior for unary RPC when client calls RecvMsg() twice. -// Second call to RecvMsg should fail with io.EOF. -func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) { - e := tcpTLSEnv - te := newTest(t, e) - defer te.tearDown() - - te.startServer(&testServer{security: e.security}) - - cc := te.clientConn() - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - desc := &grpc.StreamDesc{ - StreamName: "UnaryCall", - ServerStreams: false, - ClientStreams: false, - } - stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall") - if err != nil { - t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) - } - - if err := stream.SendMsg(&testpb.SimpleRequest{}); err != nil { - t.Fatalf("stream.SendMsg(_) = %v, want ", err) - } - - resp := &testpb.SimpleResponse{} - if err := stream.RecvMsg(resp); err != nil { - t.Fatalf("stream.RecvMsg() = %v , want ", err) - } - - if err = stream.RecvMsg(resp); err != io.EOF { - t.Errorf("stream.RecvMsg() = %v, want error %v", err, io.EOF) - } -} - -// Tests the behavior for unary RPC when server calls SendMsg() twice. -// Client should fail with cardinality violation error. -func (s) TestUnaryRPC_ServerCallSendMsgTwice(t *testing.T) { - lis, err := testutils.LocalTCPListener() - if err != nil { - t.Fatal(err) - } - defer lis.Close() - - s := grpc.NewServer() - serviceDesc := grpc.ServiceDesc{ - ServiceName: "grpc.testing.TestService", - HandlerType: (*any)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "UnaryCall", - Handler: func(_ any, stream grpc.ServerStream) error { - if err := stream.RecvMsg(&testpb.Empty{}); err != nil { - t.Errorf("stream.RecvMsg() = %v, want ", err) - } - - if err = stream.SendMsg(&testpb.Empty{}); err != nil { - t.Errorf("stream.SendMsg() = %v, want ", err) - } - - if err = stream.SendMsg(&testpb.Empty{}); err != nil { - t.Errorf("stream.SendMsg() = %v, want ", err) - } - return nil - }, - ClientStreams: false, - ServerStreams: false, - }, - }, - } - s.RegisterService(&serviceDesc, &testServer{}) - go s.Serve(lis) - defer s.Stop() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) - } - defer cc.Close() - - client := testgrpc.NewTestServiceClient(cc) - if _, err = client.UnaryCall(ctx, &testpb.SimpleRequest{}); status.Code(err) != codes.Internal { - t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) - } -} - -// Tests the behavior for client-streaming RPC when client calls RecvMsg() twice. -// Second call to RecvMsg should fail with io.EOF. -func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) { - ss := stubserver.StubServer{ - StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { - if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { - t.Errorf("stream.SendAndClose(_) = %v, want ", err) - } - return nil - }, - } - if err := ss.Start(nil); err != nil { - t.Fatal("Error starting server:", err) - } - defer ss.Stop() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - stream, err := ss.Client.StreamingInputCall(ctx) - if err != nil { - t.Fatalf(".StreamingInputCall(_) = _, %v, want ", err) - } - if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil { - t.Fatalf("stream.Send(_) = %v, want ", err) - } - if err := stream.CloseSend(); err != nil { - t.Fatalf("stream.CloseSend() = %v, want ", err) - } - resp := new(testpb.StreamingInputCallResponse) - if err := stream.RecvMsg(resp); err != nil { - t.Fatalf("stream.RecvMsg() = %v , want ", err) - } - if err = stream.RecvMsg(resp); err != io.EOF { - t.Errorf("stream.RecvMsg() = %v, want error %v", err, io.EOF) - } -} - // Tests the behavior for server-side streaming when client calls SendMsg twice. // Second call to SendMsg should fail with Internal error and result in closing // the connection with a RST_STREAM. @@ -3958,6 +3802,7 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { <-handlerDone } +// TODO(i/7286) : Add tests to check server-side behavior for Unary RPC. // Tests the behavior for unary RPC when client calls SendMsg twice. Second call // to SendMsg should fail with Internal error. func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { @@ -4136,7 +3981,7 @@ func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { } // Tests that a client receives a cardinality violation error for client-streaming -// RPCs if the server call SendMsg() multiple times. +// RPCs if the server call SendMsg multiple times. func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) { ss := stubserver.StubServer{ StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { From 42a184d860bcb51a0ed36c68264d38179f11eed7 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Fri, 22 Aug 2025 07:27:59 +0000 Subject: [PATCH 3/7] revert commit 20bd1e7 --- stream.go | 12 ++++- test/end2end_test.go | 110 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 116 insertions(+), 6 deletions(-) diff --git a/stream.go b/stream.go index d9bbd4c57cf6..3323588cf1d4 100644 --- a/stream.go +++ b/stream.go @@ -1144,6 +1144,10 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { if statusErr := a.transportStream.Status().Err(); statusErr != nil { return statusErr } + // Received no msg and status OK for non-server streaming rpcs. + if !cs.desc.ServerStreams { + return status.Error(codes.Internal, "cardinality violation: received no response message from non-streaming RPC") + } return io.EOF // indicates successful end of stream. } @@ -1177,7 +1181,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { } else if err != nil { return toRPCErr(err) } - return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") + return status.Error(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") } func (a *csAttempt) finish(err error) { @@ -1484,6 +1488,10 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { if statusErr := as.transportStream.Status().Err(); statusErr != nil { return statusErr } + // Received no msg and status OK for non-server streaming rpcs. + if !as.desc.ServerStreams { + return status.Error(codes.Internal, "cardinality violation: received no response message from non-streaming RPC") + } return io.EOF // indicates successful end of stream. } return toRPCErr(err) @@ -1501,7 +1509,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { } else if err != nil { return toRPCErr(err) } - return status.Errorf(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") + return status.Error(codes.Internal, "cardinality violation: expected for non server-streaming RPCs, but received another message") } func (as *addrConnStream) finish(err error) { diff --git a/test/end2end_test.go b/test/end2end_test.go index b2f503990bc1..7e926b1e7500 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3590,9 +3590,6 @@ func testClientStreamingError(t *testing.T, e env) { // Tests that a client receives a cardinality violation error for client-streaming // RPCs if the server doesn't send a message before returning status OK. func (s) TestClientStreamingCardinalityViolation_ServerHandlerMissingSendAndClose(t *testing.T) { - // TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()` - // after this is fixed. - t.Skip() ss := &stubserver.StubServer{ StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error { // Returning status OK without sending a response message.This is a @@ -3741,6 +3738,111 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } +// Tests that a client receives a cardinality violation error for unary +// RPCs if the server doesn't send a message before returning status OK. +func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + defer lis.Close() + + ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error { + return nil + }) + + s := grpc.NewServer(ss) + go s.Serve(lis) + defer s.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + +// Tests that client will receive cardinality violations when calling +// RecvMsg() multiple times for non-streaming response streams. +func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) { + e := tcpTLSEnv + te := newTest(t, e) + defer te.tearDown() + + te.startServer(&testServer{security: e.security}) + + cc := te.clientConn() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + desc := &grpc.StreamDesc{ + StreamName: "UnaryCall", + ServerStreams: false, + ClientStreams: false, + } + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall") + if err != nil { + t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) + } + + if err := stream.SendMsg(&testpb.SimpleRequest{}); err != nil { + t.Fatalf("stream.SendMsg(_) = %v, want ", err) + } + + resp := &testpb.SimpleResponse{} + if err := stream.RecvMsg(resp); err != nil { + t.Fatalf("stream.RecvMsg() = %v , want ", err) + } + + if err = stream.RecvMsg(resp); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + +// Tests that client will receive cardinality violations when calling +// RecvMsg() multiple times for non-streaming response streams. +func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) { + ss := stubserver.StubServer{ + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { + t.Errorf("stream.SendAndClose(_) = %v, want ", err) + } + return nil + }, + } + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := ss.Client.StreamingInputCall(ctx) + if err != nil { + t.Fatalf(".StreamingInputCall(_) = _, %v, want ", err) + } + if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil { + t.Fatalf("stream.Send(_) = %v, want ", err) + } + if err := stream.CloseSend(); err != nil { + t.Fatalf("stream.CloseSend() = %v, want ", err) + } + resp := new(testpb.StreamingInputCallResponse) + if err := stream.RecvMsg(resp); err != nil { + t.Fatalf("stream.RecvMsg() = %v , want ", err) + } + if err = stream.RecvMsg(resp); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + // Tests the behavior for server-side streaming when client calls SendMsg twice. // Second call to SendMsg should fail with Internal error and result in closing // the connection with a RST_STREAM. @@ -3981,7 +4083,7 @@ func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { } // Tests that a client receives a cardinality violation error for client-streaming -// RPCs if the server call SendMsg multiple times. +// RPCs if the server call SendMsg() multiple times. func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) { ss := stubserver.StubServer{ StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { From 6cb1c5408fae2fdf5344af0165da3e0562b51ece Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Fri, 22 Aug 2025 08:39:42 +0000 Subject: [PATCH 4/7] restore existing behaviour to return io.EOF on repeated client.RecvMsg() calls --- stream.go | 13 +++++--- test/end2end_test.go | 70 +++++++++++++++++++++++++++++++++++++++----- 2 files changed, 71 insertions(+), 12 deletions(-) diff --git a/stream.go b/stream.go index 3323588cf1d4..81f4ed4cadc1 100644 --- a/stream.go +++ b/stream.go @@ -549,6 +549,8 @@ type clientStream struct { sentLast bool // sent an end stream + recvFirstMsg bool // set after the first message is received + methodConfig *MethodConfig ctx context.Context // the application's context, wrapped by stats/tracing @@ -1145,14 +1147,15 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { return statusErr } // Received no msg and status OK for non-server streaming rpcs. - if !cs.desc.ServerStreams { - return status.Error(codes.Internal, "cardinality violation: received no response message from non-streaming RPC") + if !cs.desc.ServerStreams && !cs.recvFirstMsg { + return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC") } return io.EOF // indicates successful end of stream. } return toRPCErr(err) } + cs.recvFirstMsg = true if a.trInfo != nil { a.mu.Lock() if a.trInfo.tr != nil { @@ -1363,6 +1366,7 @@ type addrConnStream struct { transport transport.ClientTransport ctx context.Context sentLast bool + recvFirstMsg bool desc *StreamDesc codec baseCodec sendCompressorV0 Compressor @@ -1489,13 +1493,14 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { return statusErr } // Received no msg and status OK for non-server streaming rpcs. - if !as.desc.ServerStreams { - return status.Error(codes.Internal, "cardinality violation: received no response message from non-streaming RPC") + if !as.desc.ServerStreams && !as.recvFirstMsg { + return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC") } return io.EOF // indicates successful end of stream. } return toRPCErr(err) } + as.recvFirstMsg = true if as.desc.ServerStreams { // Subsequent messages should be received by subsequent RecvMsg calls. diff --git a/test/end2end_test.go b/test/end2end_test.go index 7e926b1e7500..e87309bab8f8 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3738,7 +3738,7 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } -// Tests that a client receives a cardinality violation error for unary +// Tests that client receives a cardinality violation error for unary // RPCs if the server doesn't send a message before returning status OK. func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { lis, err := testutils.LocalTCPListener() @@ -3769,8 +3769,8 @@ func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { } } -// Tests that client will receive cardinality violations when calling -// RecvMsg() multiple times for non-streaming response streams. +// Tests the behavior for unary RPC when client calls RecvMsg() twice. +// Second call to RecvMsg should fail with io.EOF. func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) { e := tcpTLSEnv te := newTest(t, e) @@ -3801,13 +3801,67 @@ func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) { t.Fatalf("stream.RecvMsg() = %v , want ", err) } - if err = stream.RecvMsg(resp); status.Code(err) != codes.Internal { + if err = stream.RecvMsg(resp); err != io.EOF { + t.Errorf("stream.RecvMsg() = %v, want error %v", err, io.EOF) + } +} + +// Tests the behavior for unary RPC when server calls SendMsg() twice. +// Client should fail with cardinality violation error. +func (s) TestUnaryRPC_ServerCallSendMsgTwice(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + defer lis.Close() + + s := grpc.NewServer() + serviceDesc := grpc.ServiceDesc{ + ServiceName: "grpc.testing.TestService", + HandlerType: (*any)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "UnaryCall", + Handler: func(_ any, stream grpc.ServerStream) error { + if err := stream.RecvMsg(&testpb.Empty{}); err != nil { + t.Errorf("stream.RecvMsg() = %v, want ", err) + } + + if err = stream.SendMsg(&testpb.Empty{}); err != nil { + t.Errorf("stream.SendMsg() = %v, want ", err) + } + + if err = stream.SendMsg(&testpb.Empty{}); err != nil { + t.Errorf("stream.SendMsg() = %v, want ", err) + } + return nil + }, + ClientStreams: false, + ServerStreams: false, + }, + }, + } + s.RegisterService(&serviceDesc, &testServer{}) + go s.Serve(lis) + defer s.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err = client.UnaryCall(ctx, &testpb.SimpleRequest{}); status.Code(err) != codes.Internal { t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) } } -// Tests that client will receive cardinality violations when calling -// RecvMsg() multiple times for non-streaming response streams. +// Tests the behavior for client-streaming RPC when client calls RecvMsg() twice. +// Second call to RecvMsg should fail with io.EOF. func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) { ss := stubserver.StubServer{ StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { @@ -3838,8 +3892,8 @@ func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) { if err := stream.RecvMsg(resp); err != nil { t.Fatalf("stream.RecvMsg() = %v , want ", err) } - if err = stream.RecvMsg(resp); status.Code(err) != codes.Internal { - t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + if err = stream.RecvMsg(resp); err != io.EOF { + t.Errorf("stream.RecvMsg() = %v, want error %v", err, io.EOF) } } From 6d1b015729b0666376859bdd75888011f88b1980 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Fri, 22 Aug 2025 08:59:36 +0000 Subject: [PATCH 5/7] removing TODO --- test/end2end_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index e87309bab8f8..b983e6935de5 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3958,7 +3958,6 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { <-handlerDone } -// TODO(i/7286) : Add tests to check server-side behavior for Unary RPC. // Tests the behavior for unary RPC when client calls SendMsg twice. Second call // to SendMsg should fail with Internal error. func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { From 451d1cd7a16cda0da82477c62283357149ade00e Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 25 Aug 2025 15:56:36 +0000 Subject: [PATCH 6/7] resolving nit --- test/end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index b983e6935de5..9157c525c094 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -4136,7 +4136,7 @@ func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { } // Tests that a client receives a cardinality violation error for client-streaming -// RPCs if the server call SendMsg() multiple times. +// RPCs if the server calls SendMsg() multiple times. func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) { ss := stubserver.StubServer{ StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { From 6fc6a02a602674052187715e996e4592ec12c9a3 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 25 Aug 2025 19:15:29 +0000 Subject: [PATCH 7/7] change variable name --- stream.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/stream.go b/stream.go index 81f4ed4cadc1..0a0af8961f05 100644 --- a/stream.go +++ b/stream.go @@ -549,7 +549,7 @@ type clientStream struct { sentLast bool // sent an end stream - recvFirstMsg bool // set after the first message is received + receivedFirstMsg bool // set after the first message is received methodConfig *MethodConfig @@ -1147,7 +1147,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { return statusErr } // Received no msg and status OK for non-server streaming rpcs. - if !cs.desc.ServerStreams && !cs.recvFirstMsg { + if !cs.desc.ServerStreams && !cs.receivedFirstMsg { return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC") } return io.EOF // indicates successful end of stream. @@ -1155,7 +1155,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { return toRPCErr(err) } - cs.recvFirstMsg = true + cs.receivedFirstMsg = true if a.trInfo != nil { a.mu.Lock() if a.trInfo.tr != nil { @@ -1366,7 +1366,7 @@ type addrConnStream struct { transport transport.ClientTransport ctx context.Context sentLast bool - recvFirstMsg bool + receivedFirstMsg bool desc *StreamDesc codec baseCodec sendCompressorV0 Compressor @@ -1493,14 +1493,14 @@ func (as *addrConnStream) RecvMsg(m any) (err error) { return statusErr } // Received no msg and status OK for non-server streaming rpcs. - if !as.desc.ServerStreams && !as.recvFirstMsg { + if !as.desc.ServerStreams && !as.receivedFirstMsg { return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC") } return io.EOF // indicates successful end of stream. } return toRPCErr(err) } - as.recvFirstMsg = true + as.receivedFirstMsg = true if as.desc.ServerStreams { // Subsequent messages should be received by subsequent RecvMsg calls.