Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix timeout in stream calls #569

Merged
merged 1 commit into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions go/pkg/client/bytestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@
return writtenBytes, nil
}

// withCtx makes the niladic function f behaves like one that accepts a ctx.
func withCtx(ctx context.Context, f func() error) error {
errChan := make(chan error, 1)
go func() {
errChan <- f()
}()
select {
case <-ctx.Done():
return ctx.Err()

Check failure on line 53 in go/pkg/client/bytestream.go

View workflow job for this annotation

GitHub Actions / lint

error returned from interface method should be wrapped: sig: func (context.Context).Err() error (wrapcheck)
case err := <-errChan:
return err
}
}

// writeChunked uploads chunked data with a given resource name to the CAS.
func (c *Client) writeChunked(ctx context.Context, name string, ch *chunker.Chunker, doNotFinalize bool, initialOffset int64) (int64, error) {
var totalBytes int64
Expand All @@ -54,6 +68,8 @@
// TODO(olaola): implement resumable uploads. initialOffset passed in allows to
// start writing data at an arbitrary offset, but retries still restart from initialOffset.

ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := c.Write(ctx)
if err != nil {
return err
Expand All @@ -70,7 +86,11 @@
if !ch.HasNext() && !doNotFinalize {
req.FinishWrite = true
}
err = c.CallWithTimeout(ctx, "Write", func(_ context.Context) error { return stream.Send(req) })
err = c.CallWithTimeout(ctx, "Write", func(ctx context.Context) error {
return withCtx(ctx, func() error {
return stream.Send(req)
})
})
if err == io.EOF {
break
}
Expand All @@ -79,7 +99,12 @@
}
totalBytes += int64(len(req.Data))
}
if _, err := stream.CloseAndRecv(); err != nil {
if err := c.CallWithTimeout(ctx, "Write", func(ctx context.Context) error {
return withCtx(ctx, func() error {
_, err := stream.CloseAndRecv()
return err

Check failure on line 105 in go/pkg/client/bytestream.go

View workflow job for this annotation

GitHub Actions / lint

error returned from interface method should be wrapped: sig: func (google.golang.org/genproto/googleapis/bytestream.ByteStream_WriteClient).CloseAndRecv() (*google.golang.org/genproto/googleapis/bytestream.WriteResponse, error) (wrapcheck)
})
}); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -132,6 +157,8 @@
// stream. The limit must be non-negative, although offset+limit may exceed the length of the
// stream.
func (c *Client) readStreamed(ctx context.Context, name string, offset, limit int64, w io.Writer) (int64, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := c.Read(ctx, &bspb.ReadRequest{
ResourceName: name,
ReadOffset: offset,
Expand All @@ -144,10 +171,12 @@
var n int64
for {
var resp *bspb.ReadResponse
err := c.CallWithTimeout(ctx, "Read", func(_ context.Context) error {
r, err := stream.Recv()
resp = r
return err
err := c.CallWithTimeout(ctx, "Read", func(ctx context.Context) error {
return withCtx(ctx, func() error {
r, err := stream.Recv()
resp = r
return err

Check failure on line 178 in go/pkg/client/bytestream.go

View workflow job for this annotation

GitHub Actions / lint

error returned from interface method should be wrapped: sig: func (google.golang.org/genproto/googleapis/bytestream.ByteStream_ReadClient).Recv() (*google.golang.org/genproto/googleapis/bytestream.ReadResponse, error) (wrapcheck)
})
})
if err == io.EOF {
break
Expand Down
52 changes: 50 additions & 2 deletions go/pkg/client/bytestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import (
"context"
"errors"
"fmt"
"net"
"testing"
"time"

"google.golang.org/grpc"

Expand All @@ -21,6 +23,40 @@
finalized bool
}

func TestReadTimeout(t *testing.T) {
s := newServer(t)
defer s.shutDown()

s.client.Retrier = nil
s.client.rpcTimeouts["Read"] = 100 * time.Millisecond
s.fake.read = func(req *bspb.ReadRequest, stream bsgrpc.ByteStream_ReadServer) error {
time.Sleep(1 * time.Second)
return stream.Send(&bspb.ReadResponse{})
}

_, err := s.client.ReadBytes(context.Background(), "test")
if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("expected error %v, but got %v", context.DeadlineExceeded, err)
}
}

func TestWriteTimeout(t *testing.T) {
s := newServer(t)
defer s.shutDown()

s.client.Retrier = nil
s.client.rpcTimeouts["Write"] = 100 * time.Millisecond
s.fake.write = func(stream bsgrpc.ByteStream_WriteServer) error {
time.Sleep(1 * time.Second)
return fmt.Errorf("write should have timed out")
}

err := s.client.WriteBytes(context.Background(), "test", []byte("hello"))
if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("expected error %v, but got %v", context.DeadlineExceeded, err)
}
}

func TestWriteBytesAtRemoteOffsetSuccess_LogStream(t *testing.T) {
tests := []struct {
description string
Expand Down Expand Up @@ -117,6 +153,9 @@
}

func TestWriteBytesAtRemoteOffsetErrors_LogStream(t *testing.T) {
if testing.Short() {
t.Skip("skipping slow test because short is set")
}
tests := []struct {
description string
ls *logStream
Expand Down Expand Up @@ -180,6 +219,8 @@

type ByteStream struct {
logStreams map[string]*logStream
read func(req *bspb.ReadRequest, stream bsgrpc.ByteStream_ReadServer) error
write func(stream bsgrpc.ByteStream_WriteServer) error
}

type Server struct {
Expand All @@ -202,7 +243,7 @@
bsgrpc.RegisterByteStreamServer(s.server, s.fake)

go s.server.Serve(s.listener)
s.client, err = NewClient(s.ctx, instance, DialParams{
s.client, err = NewClient(s.ctx, "test", DialParams{
Service: s.listener.Addr().String(),
NoSecurity: true,
}, StartupCapabilities(false), ChunkMaxSize(2))
Expand All @@ -223,11 +264,18 @@
}

func (b *ByteStream) Read(req *bspb.ReadRequest, stream bsgrpc.ByteStream_ReadServer) error {
return nil
if b.read != nil {
return b.read(req, stream)
}
return stream.Send(&bspb.ReadResponse{Data: logStreamData})

Check failure on line 270 in go/pkg/client/bytestream_test.go

View workflow job for this annotation

GitHub Actions / lint

error returned from interface method should be wrapped: sig: func (google.golang.org/genproto/googleapis/bytestream.ByteStream_ReadServer).Send(*google.golang.org/genproto/googleapis/bytestream.ReadResponse) error (wrapcheck)
}

// Write implements the write operation for LogStream Write API.
func (b *ByteStream) Write(stream bsgrpc.ByteStream_WriteServer) error {
if b.write != nil {
return b.write(stream)
}

defer stream.SendAndClose(&bspb.WriteResponse{})
req, err := stream.Recv()
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions go/pkg/client/cas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ func TestMissingBlobs(t *testing.T) {
}

func TestUploadConcurrent(t *testing.T) {
if testing.Short() {
t.Skip("skipping slow test because short is set")
}
t.Parallel()
blobs := make([][]byte, 50)
for i := range blobs {
Expand Down
6 changes: 6 additions & 0 deletions go/pkg/client/retries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,9 @@ func TestWriteRetries(t *testing.T) {
}

func TestRetryWriteBytesAtRemoteOffset(t *testing.T) {
if testing.Short() {
t.Skip("skipping slow test because short is set")
}
tests := []struct {
description string
initialOffset int64
Expand Down Expand Up @@ -441,6 +444,9 @@ func TestBatchWriteBlobsRpcRetriesExhausted(t *testing.T) {
}

func TestGetTreeRetries(t *testing.T) {
if testing.Short() {
t.Skip("skipping slow test because short is set")
}
t.Parallel()
f := setup(t)
defer f.shutDown()
Expand Down
Loading