Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal: add cause to canceled context when Client sending a RST_STREAM due to an error on their end #7778

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions examples/features/context/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Context

This example shows how servers can process requests in separate goroutines and
handle context cancellation.

```
go run server/main.go
```

```
go run client/main.go
```
161 changes: 161 additions & 0 deletions examples/features/context/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary client demonstrates how to cancel in-flight RPCs by canceling the
// context passed to the RPC.
package main

import (
"context"
"flag"
"fmt"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
pb "google.golang.org/grpc/examples/features/proto/echo"
"google.golang.org/grpc/status"
)

var addr = flag.String("addr", "localhost:50051", "the address to connect to")

func sendMessage(stream pb.Echo_BidirectionalStreamingEchoClient, msg string) error {
fmt.Printf("sending message %q\n", msg)
return stream.Send(&pb.EchoRequest{Message: msg})
}

func recvMessage(stream pb.Echo_BidirectionalStreamingEchoClient, wantErrCode codes.Code) {
res, err := stream.Recv()
if status.Code(err) != wantErrCode {
log.Fatalf("stream.Recv() = %v, %v; want _, status.Code(err)=%v", res, err, wantErrCode)
}
if err != nil {
fmt.Printf("stream.Recv() returned expected error %v\n", err)
return
}
fmt.Printf("received message %q\n", res.GetMessage())
}

func cancelStream() {
fmt.Println("sending two messages and then canceling")
conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()

c := pb.NewEchoClient(conn)

// Initiate the stream with a context that supports cancellation.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
stream, err := c.BidirectionalStreamingEcho(ctx)
if err != nil {
log.Fatalf("error creating stream: %v", err)
}

// Send some test messages.
if err := sendMessage(stream, "hello"); err != nil {
log.Fatalf("error sending on stream: %v", err)
}
if err := sendMessage(stream, "world"); err != nil {
log.Fatalf("error sending on stream: %v", err)
}

// Ensure the RPC is working.
recvMessage(stream, codes.OK)
recvMessage(stream, codes.OK)

fmt.Println("canceling context")
cancel()
}

func closeConnection() {
fmt.Println("sending two messages and then closing connection")
conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer func() {
fmt.Println("closing connection")
conn.Close()
}()

c := pb.NewEchoClient(conn)

stream, err := c.BidirectionalStreamingEcho(context.Background())
if err != nil {
log.Fatalf("error creating stream: %v", err)
}

// Send some test messages.
if err := sendMessage(stream, "hello"); err != nil {
log.Fatalf("error sending on stream: %v", err)
}
if err := sendMessage(stream, "world"); err != nil {
log.Fatalf("error sending on stream: %v", err)
}

// Ensure the RPC is working.
recvMessage(stream, codes.OK)
recvMessage(stream, codes.OK)
}

func timout() {
fmt.Println("sending two messages and then waiting for timeout")
conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()

c := pb.NewEchoClient(conn)

// Initiate the stream with a context that supports cancellation.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

stream, err := c.BidirectionalStreamingEcho(ctx)
if err != nil {
log.Fatalf("error creating stream: %v", err)
}

// Send some test messages.
if err := sendMessage(stream, "hello"); err != nil {
log.Fatalf("error sending on stream: %v", err)
}
if err := sendMessage(stream, "world"); err != nil {
log.Fatalf("error sending on stream: %v", err)
}

recvMessage(stream, codes.OK)
recvMessage(stream, codes.OK)

fmt.Println("waiting for timeout")
<-ctx.Done()
}

func main() {
flag.Parse()

// simulate some client behaviors which terminate with different reasons
cancelStream()
closeConnection()
timout()
}
96 changes: 96 additions & 0 deletions examples/features/context/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary server demonstrates how to handle canceled contexts when a client
// cancels an in-flight RPC.
package main

import (
"context"
"errors"
"flag"
"fmt"
"io"
"log"
"net"

"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/transport"

pb "google.golang.org/grpc/examples/features/proto/echo"
)

var port = flag.Int("port", 50051, "the port to serve on")

type server struct {
pb.UnimplementedEchoServer
}

func workOnMessage(ctx context.Context, msg string) {
fmt.Printf("starting work on message: %q\n", msg)
i := 0
for {
if err := ctx.Err(); err != nil {
cause := context.Cause(ctx)
fmt.Printf("'%v' with cause '%v', message worker for message %q stopping.\n", err, cause, msg)
var httpErr *transport.HTTP2CodeError
if errors.As(cause, &httpErr) {
switch httpErr.Code {
case http2.ErrCodeNo:
return
default:
fmt.Printf("unexpected HTTP/2 error: %v", httpErr)
}
}
return
}
// simulate work on message but don't flood the logs
i++
}
}

func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
ctx := stream.Context()
for {
recv, err := stream.Recv()
if err != nil {
fmt.Printf("server: error receiving from stream: %v\n", err)
if err == io.EOF {
return nil
}
return err
}
msg := recv.Message
go workOnMessage(ctx, msg)
stream.Send(&pb.EchoResponse{Message: msg})
}
}

func main() {
flag.Parse()

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
fmt.Printf("server listening at port %v\n", lis.Addr())
s := grpc.NewServer()
pb.RegisterEchoServer(s, &server{})
s.Serve(lis)
}
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/prometheus/client_golang v1.20.5
go.opentelemetry.io/otel/exporters/prometheus v0.53.0
go.opentelemetry.io/otel/sdk/metric v1.31.0
golang.org/x/net v0.30.0
golang.org/x/oauth2 v0.23.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53
google.golang.org/grpc v1.67.1
Expand Down Expand Up @@ -69,7 +70,6 @@ require (
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
Expand Down
36 changes: 36 additions & 0 deletions internal/transport/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package transport

import (
"context"
"time"
)

func createContextWithTimeout(ctx context.Context, timeoutSet bool, timeout time.Duration) (context.Context, context.CancelCauseFunc) {
var timoutCancel context.CancelFunc
if timeoutSet {
ctx, timoutCancel = context.WithTimeout(ctx, timeout)
}
ctx, cancel := context.WithCancelCause(ctx)
if timoutCancel != nil {
context.AfterFunc(ctx, timoutCancel)
}
return ctx, cancel
}
Loading