Skip to content

Commit

Permalink
add example for context propagation in a server with goroutines.
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Strobusch <[email protected]>
  • Loading branch information
dastrobu committed Nov 14, 2024
1 parent 77899b4 commit a9a5167
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 1 deletion.
12 changes: 12 additions & 0 deletions examples/features/context/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Context

This example shows how servers can process requests in separate goroutines and
handle context cancellation.

```
go run server/main.go
```

```
go run client/main.go
```
161 changes: 161 additions & 0 deletions examples/features/context/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary client demonstrates how to cancel in-flight RPCs by canceling the
// context passed to the RPC.
package main

import (
"context"
"flag"
"fmt"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
pb "google.golang.org/grpc/examples/features/proto/echo"
"google.golang.org/grpc/status"
)

var addr = flag.String("addr", "localhost:50051", "the address to connect to")

func sendMessage(stream pb.Echo_BidirectionalStreamingEchoClient, msg string) error {
fmt.Printf("sending message %q\n", msg)
return stream.Send(&pb.EchoRequest{Message: msg})
}

func recvMessage(stream pb.Echo_BidirectionalStreamingEchoClient, wantErrCode codes.Code) {
res, err := stream.Recv()
if status.Code(err) != wantErrCode {
log.Fatalf("stream.Recv() = %v, %v; want _, status.Code(err)=%v", res, err, wantErrCode)
}
if err != nil {
fmt.Printf("stream.Recv() returned expected error %v\n", err)
return
}
fmt.Printf("received message %q\n", res.GetMessage())
}

func cancelStream() {
fmt.Println("sending two messages and then canceling")
conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()

c := pb.NewEchoClient(conn)

// Initiate the stream with a context that supports cancellation.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
stream, err := c.BidirectionalStreamingEcho(ctx)
if err != nil {
log.Fatalf("error creating stream: %v", err)
}

// Send some test messages.
if err := sendMessage(stream, "hello"); err != nil {
log.Fatalf("error sending on stream: %v", err)
}
if err := sendMessage(stream, "world"); err != nil {
log.Fatalf("error sending on stream: %v", err)
}

// Ensure the RPC is working.
recvMessage(stream, codes.OK)
recvMessage(stream, codes.OK)

fmt.Println("canceling context")
cancel()
}

func closeConnection() {
fmt.Println("sending two messages and then closing connection")
conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer func() {
fmt.Println("closing connection")
conn.Close()
}()

c := pb.NewEchoClient(conn)

stream, err := c.BidirectionalStreamingEcho(context.Background())
if err != nil {
log.Fatalf("error creating stream: %v", err)
}

// Send some test messages.
if err := sendMessage(stream, "hello"); err != nil {
log.Fatalf("error sending on stream: %v", err)
}
if err := sendMessage(stream, "world"); err != nil {
log.Fatalf("error sending on stream: %v", err)
}

// Ensure the RPC is working.
recvMessage(stream, codes.OK)
recvMessage(stream, codes.OK)
}

func timout() {
fmt.Println("sending two messages and then waiting for timeout")
conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()

c := pb.NewEchoClient(conn)

// Initiate the stream with a context that supports cancellation.
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

stream, err := c.BidirectionalStreamingEcho(ctx)
if err != nil {
log.Fatalf("error creating stream: %v", err)
}

// Send some test messages.
if err := sendMessage(stream, "hello"); err != nil {
log.Fatalf("error sending on stream: %v", err)
}
if err := sendMessage(stream, "world"); err != nil {
log.Fatalf("error sending on stream: %v", err)
}

recvMessage(stream, codes.OK)
recvMessage(stream, codes.OK)

fmt.Println("waiting for timeout")
<-ctx.Done()
}

func main() {
flag.Parse()

// simulate some client behaviors which terminate with different reasons
cancelStream()
closeConnection()
timout()
}
96 changes: 96 additions & 0 deletions examples/features/context/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary server demonstrates how to handle canceled contexts when a client
// cancels an in-flight RPC.
package main

import (
"context"
"errors"
"flag"
"fmt"
"io"
"log"
"net"

"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/transport"

pb "google.golang.org/grpc/examples/features/proto/echo"
)

var port = flag.Int("port", 50051, "the port to serve on")

type server struct {
pb.UnimplementedEchoServer
}

func workOnMessage(ctx context.Context, msg string) {
fmt.Printf("starting work on message: %q\n", msg)
i := 0
for {
if err := ctx.Err(); err != nil {
cause := context.Cause(ctx)
fmt.Printf("'%v' with cause '%v', message worker for message %q stopping.\n", err, cause, msg)
var httpErr *transport.HTTP2CodeError
if errors.As(cause, &httpErr) {
switch httpErr.Code {
case http2.ErrCodeNo:
return
default:
fmt.Printf("unexpected HTTP/2 error: %v", httpErr)
}
}
return
}
// simulate work on message but don't flood the logs
i++
}
}

func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
ctx := stream.Context()
for {
recv, err := stream.Recv()
if err != nil {
fmt.Printf("server: error receiving from stream: %v\n", err)
if err == io.EOF {
return nil
}
return err
}
msg := recv.Message
go workOnMessage(ctx, msg)
stream.Send(&pb.EchoResponse{Message: msg})
}
}

func main() {
flag.Parse()

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
fmt.Printf("server listening at port %v\n", lis.Addr())
s := grpc.NewServer()
pb.RegisterEchoServer(s, &server{})
s.Serve(lis)
}
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/prometheus/client_golang v1.20.5
go.opentelemetry.io/otel/exporters/prometheus v0.53.0
go.opentelemetry.io/otel/sdk/metric v1.31.0
golang.org/x/net v0.30.0
golang.org/x/oauth2 v0.23.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53
google.golang.org/grpc v1.67.1
Expand Down Expand Up @@ -69,7 +70,6 @@ require (
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
Expand Down

0 comments on commit a9a5167

Please sign in to comment.