Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,8 @@ type clientStream struct {

sentLast bool // sent an end stream

recvFirstMsg bool // set after the first message is received
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please expand recv to received. The abbreviation is a bit ambiguous and could be interpreted as either receive or received.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


methodConfig *MethodConfig

ctx context.Context // the application's context, wrapped by stats/tracing
Expand Down Expand Up @@ -1144,11 +1146,16 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
if statusErr := a.transportStream.Status().Err(); statusErr != nil {
return statusErr
}
// Received no msg and status OK for non-server streaming rpcs.
if !cs.desc.ServerStreams && !cs.recvFirstMsg {
return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
}
return io.EOF // indicates successful end of stream.
}

return toRPCErr(err)
}
cs.recvFirstMsg = true
if a.trInfo != nil {
a.mu.Lock()
if a.trInfo.tr != nil {
Expand Down Expand Up @@ -1177,7 +1184,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
} else if err != nil {
return toRPCErr(err)
}
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
}

func (a *csAttempt) finish(err error) {
Expand Down Expand Up @@ -1359,6 +1366,7 @@ type addrConnStream struct {
transport transport.ClientTransport
ctx context.Context
sentLast bool
recvFirstMsg bool
desc *StreamDesc
codec baseCodec
sendCompressorV0 Compressor
Expand Down Expand Up @@ -1484,10 +1492,15 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
if statusErr := as.transportStream.Status().Err(); statusErr != nil {
return statusErr
}
// Received no msg and status OK for non-server streaming rpcs.
if !as.desc.ServerStreams && !as.recvFirstMsg {
return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
}
return io.EOF // indicates successful end of stream.
}
return toRPCErr(err)
}
as.recvFirstMsg = true

if as.desc.ServerStreams {
// Subsequent messages should be received by subsequent RecvMsg calls.
Expand All @@ -1501,7 +1514,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
} else if err != nil {
return toRPCErr(err)
}
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
}

func (as *addrConnStream) finish(err error) {
Expand Down
164 changes: 160 additions & 4 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3590,9 +3590,6 @@ func testClientStreamingError(t *testing.T, e env) {
// Tests that a client receives a cardinality violation error for client-streaming
// RPCs if the server doesn't send a message before returning status OK.
func (s) TestClientStreamingCardinalityViolation_ServerHandlerMissingSendAndClose(t *testing.T) {
// TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()`
// after this is fixed.
t.Skip()
ss := &stubserver.StubServer{
StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error {
// Returning status OK without sending a response message.This is a
Expand Down Expand Up @@ -3741,6 +3738,165 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) {
}
}

// Tests that client receives a cardinality violation error for unary
// RPCs if the server doesn't send a message before returning status OK.
func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) {
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatal(err)
}
defer lis.Close()

ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error {
return nil
})

s := grpc.NewServer(ss)
go s.Serve(lis)
defer s.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err)
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Internal {
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal)
}
}

// Tests the behavior for unary RPC when client calls RecvMsg() twice.
// Second call to RecvMsg should fail with io.EOF.
func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) {
e := tcpTLSEnv
te := newTest(t, e)
defer te.tearDown()

te.startServer(&testServer{security: e.security})

cc := te.clientConn()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

desc := &grpc.StreamDesc{
StreamName: "UnaryCall",
ServerStreams: false,
ClientStreams: false,
}
stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall")
if err != nil {
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
}

if err := stream.SendMsg(&testpb.SimpleRequest{}); err != nil {
t.Fatalf("stream.SendMsg(_) = %v, want <nil>", err)
}

resp := &testpb.SimpleResponse{}
if err := stream.RecvMsg(resp); err != nil {
t.Fatalf("stream.RecvMsg() = %v , want <nil>", err)
}

if err = stream.RecvMsg(resp); err != io.EOF {
t.Errorf("stream.RecvMsg() = %v, want error %v", err, io.EOF)
}
}

// Tests the behavior for unary RPC when server calls SendMsg() twice.
// Client should fail with cardinality violation error.
func (s) TestUnaryRPC_ServerCallSendMsgTwice(t *testing.T) {
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatal(err)
}
defer lis.Close()

s := grpc.NewServer()
serviceDesc := grpc.ServiceDesc{
ServiceName: "grpc.testing.TestService",
HandlerType: (*any)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "UnaryCall",
Handler: func(_ any, stream grpc.ServerStream) error {
if err := stream.RecvMsg(&testpb.Empty{}); err != nil {
t.Errorf("stream.RecvMsg() = %v, want <nil>", err)
}

if err = stream.SendMsg(&testpb.Empty{}); err != nil {
t.Errorf("stream.SendMsg() = %v, want <nil>", err)
}

if err = stream.SendMsg(&testpb.Empty{}); err != nil {
t.Errorf("stream.SendMsg() = %v, want <nil>", err)
}
return nil
},
ClientStreams: false,
ServerStreams: false,
},
},
}
s.RegisterService(&serviceDesc, &testServer{})
go s.Serve(lis)
defer s.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err)
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
if _, err = client.UnaryCall(ctx, &testpb.SimpleRequest{}); status.Code(err) != codes.Internal {
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal)
}
}

// Tests the behavior for client-streaming RPC when client calls RecvMsg() twice.
// Second call to RecvMsg should fail with io.EOF.
func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) {
ss := stubserver.StubServer{
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil {
t.Errorf("stream.SendAndClose(_) = %v, want <nil>", err)
}
return nil
},
}
if err := ss.Start(nil); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := ss.Client.StreamingInputCall(ctx)
if err != nil {
t.Fatalf(".StreamingInputCall(_) = _, %v, want <nil>", err)
}
if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil {
t.Fatalf("stream.Send(_) = %v, want <nil>", err)
}
if err := stream.CloseSend(); err != nil {
t.Fatalf("stream.CloseSend() = %v, want <nil>", err)
}
resp := new(testpb.StreamingInputCallResponse)
if err := stream.RecvMsg(resp); err != nil {
t.Fatalf("stream.RecvMsg() = %v , want <nil>", err)
}
if err = stream.RecvMsg(resp); err != io.EOF {
t.Errorf("stream.RecvMsg() = %v, want error %v", err, io.EOF)
}
}

// Tests the behavior for server-side streaming when client calls SendMsg twice.
// Second call to SendMsg should fail with Internal error and result in closing
// the connection with a RST_STREAM.
Expand Down Expand Up @@ -3981,7 +4137,7 @@ func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) {
}

// Tests that a client receives a cardinality violation error for client-streaming
// RPCs if the server call SendMsg multiple times.
// RPCs if the server call SendMsg() multiple times.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// RPCs if the server call SendMsg() multiple times.
// RPCs if the server calls SendMsg() multiple times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) {
ss := stubserver.StubServer{
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
Expand Down