diff --git a/integration/streaming/test.pb.go b/integration/streaming/test.pb.go index bd7075e4b..de76ad596 100644 --- a/integration/streaming/test.pb.go +++ b/integration/streaming/test.pb.go @@ -211,7 +211,7 @@ var file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDesc = 0x52, 0x03, 0x61, 0x64, 0x64, 0x22, 0x29, 0x0a, 0x03, 0x53, 0x75, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x75, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x73, 0x75, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x6e, 0x75, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x6e, 0x75, 0x6d, - 0x32, 0xa0, 0x04, 0x0a, 0x09, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x5a, + 0x32, 0xfa, 0x04, 0x0a, 0x09, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x5a, 0x0a, 0x04, 0x45, 0x63, 0x68, 0x6f, 0x12, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, @@ -245,11 +245,17 @@ var file_github_com_containerd_ttrpc_integration_streaming_test_proto_rawDesc = 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x28, - 0x01, 0x30, 0x01, 0x42, 0x3d, 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72, - 0x70, 0x63, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x73, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, - 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x01, 0x30, 0x01, 0x12, 0x58, 0x0a, 0x12, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x50, 0x61, 0x79, 0x6c, + 0x6f, 0x61, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x1a, 0x28, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x2e, + 0x45, 0x63, 0x68, 0x6f, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x30, 0x01, 0x42, 0x3d, 0x5a, + 0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x74, + 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2f, 0x69, 0x6e, 0x74, + 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, + 0x6e, 0x67, 0x3b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -278,14 +284,16 @@ var file_github_com_containerd_ttrpc_integration_streaming_test_proto_depIdxs = 2, // 3: ttrpc.integration.streaming.Streaming.DivideStream:input_type -> ttrpc.integration.streaming.Sum 0, // 4: ttrpc.integration.streaming.Streaming.EchoNull:input_type -> ttrpc.integration.streaming.EchoPayload 0, // 5: ttrpc.integration.streaming.Streaming.EchoNullStream:input_type -> ttrpc.integration.streaming.EchoPayload - 0, // 6: ttrpc.integration.streaming.Streaming.Echo:output_type -> ttrpc.integration.streaming.EchoPayload - 0, // 7: ttrpc.integration.streaming.Streaming.EchoStream:output_type -> ttrpc.integration.streaming.EchoPayload - 2, // 8: ttrpc.integration.streaming.Streaming.SumStream:output_type -> ttrpc.integration.streaming.Sum - 1, // 9: ttrpc.integration.streaming.Streaming.DivideStream:output_type -> ttrpc.integration.streaming.Part - 3, // 10: ttrpc.integration.streaming.Streaming.EchoNull:output_type -> google.protobuf.Empty - 3, // 11: ttrpc.integration.streaming.Streaming.EchoNullStream:output_type -> google.protobuf.Empty - 6, // [6:12] is the sub-list for method output_type - 0, // [0:6] is the sub-list for method input_type + 3, // 6: ttrpc.integration.streaming.Streaming.EmptyPayloadStream:input_type -> google.protobuf.Empty + 0, // 7: ttrpc.integration.streaming.Streaming.Echo:output_type -> ttrpc.integration.streaming.EchoPayload + 0, // 8: ttrpc.integration.streaming.Streaming.EchoStream:output_type -> ttrpc.integration.streaming.EchoPayload + 2, // 9: ttrpc.integration.streaming.Streaming.SumStream:output_type -> ttrpc.integration.streaming.Sum + 1, // 10: ttrpc.integration.streaming.Streaming.DivideStream:output_type -> ttrpc.integration.streaming.Part + 3, // 11: ttrpc.integration.streaming.Streaming.EchoNull:output_type -> google.protobuf.Empty + 3, // 12: ttrpc.integration.streaming.Streaming.EchoNullStream:output_type -> google.protobuf.Empty + 0, // 13: ttrpc.integration.streaming.Streaming.EmptyPayloadStream:output_type -> ttrpc.integration.streaming.EchoPayload + 7, // [7:14] is the sub-list for method output_type + 0, // [0:7] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/integration/streaming/test.proto b/integration/streaming/test.proto index 3f46ccf73..474a81722 100644 --- a/integration/streaming/test.proto +++ b/integration/streaming/test.proto @@ -34,6 +34,7 @@ service Streaming { rpc DivideStream(Sum) returns (stream Part); rpc EchoNull(stream EchoPayload) returns (google.protobuf.Empty); rpc EchoNullStream(stream EchoPayload) returns (stream google.protobuf.Empty); + rpc EmptyPayloadStream(google.protobuf.Empty) returns (stream EchoPayload); } message EchoPayload { diff --git a/integration/streaming/test_ttrpc.pb.go b/integration/streaming/test_ttrpc.pb.go index 5a0675cc4..d3fbfce25 100644 --- a/integration/streaming/test_ttrpc.pb.go +++ b/integration/streaming/test_ttrpc.pb.go @@ -15,6 +15,7 @@ type TTRPCStreamingService interface { DivideStream(context.Context, *Sum, TTRPCStreaming_DivideStreamServer) error EchoNull(context.Context, TTRPCStreaming_EchoNullServer) (*emptypb.Empty, error) EchoNullStream(context.Context, TTRPCStreaming_EchoNullStreamServer) error + EmptyPayloadStream(context.Context, *emptypb.Empty, TTRPCStreaming_EmptyPayloadStreamServer) error } type TTRPCStreaming_EchoStreamServer interface { @@ -108,6 +109,19 @@ func (x *ttrpcstreamingEchoNullStreamServer) Recv() (*EchoPayload, error) { return m, nil } +type TTRPCStreaming_EmptyPayloadStreamServer interface { + Send(*EchoPayload) error + ttrpc.StreamServer +} + +type ttrpcstreamingEmptyPayloadStreamServer struct { + ttrpc.StreamServer +} + +func (x *ttrpcstreamingEmptyPayloadStreamServer) Send(m *EchoPayload) error { + return x.StreamServer.SendMsg(m) +} + func RegisterTTRPCStreamingService(srv *ttrpc.Server, svc TTRPCStreamingService) { srv.RegisterService("ttrpc.integration.streaming.Streaming", &ttrpc.ServiceDesc{ Methods: map[string]ttrpc.Method{ @@ -159,6 +173,17 @@ func RegisterTTRPCStreamingService(srv *ttrpc.Server, svc TTRPCStreamingService) StreamingClient: true, StreamingServer: true, }, + "EmptyPayloadStream": { + Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) { + m := new(emptypb.Empty) + if err := stream.RecvMsg(m); err != nil { + return nil, err + } + return nil, svc.EmptyPayloadStream(ctx, m, &ttrpcstreamingEmptyPayloadStreamServer{stream}) + }, + StreamingClient: false, + StreamingServer: true, + }, }, }) } @@ -170,6 +195,7 @@ type TTRPCStreamingClient interface { DivideStream(context.Context, *Sum) (TTRPCStreaming_DivideStreamClient, error) EchoNull(context.Context) (TTRPCStreaming_EchoNullClient, error) EchoNullStream(context.Context) (TTRPCStreaming_EchoNullStreamClient, error) + EmptyPayloadStream(context.Context, *emptypb.Empty) (TTRPCStreaming_EmptyPayloadStreamClient, error) } type ttrpcstreamingClient struct { @@ -360,3 +386,32 @@ func (x *ttrpcstreamingEchoNullStreamClient) Recv() (*emptypb.Empty, error) { } return m, nil } + +func (c *ttrpcstreamingClient) EmptyPayloadStream(ctx context.Context, req *emptypb.Empty) (TTRPCStreaming_EmptyPayloadStreamClient, error) { + stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{ + StreamingClient: false, + StreamingServer: true, + }, "ttrpc.integration.streaming.Streaming", "EmptyPayloadStream", req) + if err != nil { + return nil, err + } + x := &ttrpcstreamingEmptyPayloadStreamClient{stream} + return x, nil +} + +type TTRPCStreaming_EmptyPayloadStreamClient interface { + Recv() (*EchoPayload, error) + ttrpc.ClientStream +} + +type ttrpcstreamingEmptyPayloadStreamClient struct { + ttrpc.ClientStream +} + +func (x *ttrpcstreamingEmptyPayloadStreamClient) Recv() (*EchoPayload, error) { + m := new(EchoPayload) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} diff --git a/integration/streaming_test.go b/integration/streaming_test.go index 159b13c4f..ac062585a 100644 --- a/integration/streaming_test.go +++ b/integration/streaming_test.go @@ -31,6 +31,7 @@ import ( "github.com/containerd/ttrpc" "github.com/containerd/ttrpc/integration/streaming" "github.com/golang/protobuf/ptypes/empty" + "google.golang.org/protobuf/types/known/emptypb" ) func runService(ctx context.Context, t testing.TB, service streaming.TTRPCStreamingService) (streaming.TTRPCStreamingClient, func()) { @@ -190,6 +191,14 @@ func (tss *testStreamingService) EchoNullStream(_ context.Context, es streaming. return sendErr } +func (tss *testStreamingService) EmptyPayloadStream(_ context.Context, _ *emptypb.Empty, streamer streaming.TTRPCStreaming_EmptyPayloadStreamServer) error { + if err := streamer.Send(&streaming.EchoPayload{Seq: 1}); err != nil { + return err + } + + return streamer.Send(&streaming.EchoPayload{Seq: 2}) +} + func TestStreamingService(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -203,6 +212,7 @@ func TestStreamingService(t *testing.T) { t.Run("DivideStream", divideStreamTest(ctx, client)) t.Run("EchoNull", echoNullTest(ctx, client)) t.Run("EchoNullStream", echoNullStreamTest(ctx, client)) + t.Run("EmptyPayloadStream", emptyPayloadStream(ctx, client)) } func echoTest(ctx context.Context, client streaming.TTRPCStreamingClient) func(t *testing.T) { @@ -385,6 +395,33 @@ func echoNullStreamTest(ctx context.Context, client streaming.TTRPCStreamingClie } } +func emptyPayloadStream(ctx context.Context, client streaming.TTRPCStreamingClient) func(t *testing.T) { + return func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + stream, err := client.EmptyPayloadStream(ctx, nil) + if err != nil { + t.Fatal(err) + } + + for i := uint32(1); i < 3; i++ { + first, err := stream.Recv() + if err != nil { + t.Fatal(err) + } + + if first.Seq != i { + t.Fatalf("unexpected seq: %d != %d", first.Seq, i) + } + } + + if _, err := stream.Recv(); err != io.EOF { + t.Fatalf("Expected io.EOF, got %v", err) + } + } +} + func assertNextEcho(t testing.TB, a, b *streaming.EchoPayload) { t.Helper() if a.Msg != b.Msg { diff --git a/services.go b/services.go index 6aabfbb4d..6d092bf95 100644 --- a/services.go +++ b/services.go @@ -140,7 +140,11 @@ func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*sta respond(st, p, stream.StreamingServer, true) }() - if req.Payload != nil { + // Empty proto messages serialized to 0 payloads, + // so signatures like: rpc Stream(google.protobuf.Empty) returns (stream Data); + // don't get invoked here, which causes hang on client side. + // See https://github.com/containerd/ttrpc/issues/126 + if req.Payload != nil || !info.StreamingClient { unmarshal := func(obj interface{}) error { return protoUnmarshal(req.Payload, obj) }