Skip to content

Commit

Permalink
Report an error on unsupported codec or invalid zstd blob
Browse files Browse the repository at this point in the history
  • Loading branch information
ppluciennik committed Jul 29, 2024
1 parent 1082319 commit 7cd94b1
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 0 deletions.
4 changes: 4 additions & 0 deletions go/pkg/client/cas_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,16 @@ func (c *Client) BatchDownloadBlobsWithStats(ctx context.Context, dgs []digest.D
if err != nil {
errDg = r.Digest
errMsg = err.Error()
numErrs++
allRetriable = false
continue
}
r.Data = b
default:
errDg = r.Digest
errMsg = fmt.Sprintf("blob returned with unsupported compressor %s", r.Compressor)
numErrs++
allRetriable = false
continue
}
bi := CompressedBlobInfo{
Expand Down
120 changes: 120 additions & 0 deletions go/pkg/client/cas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1917,3 +1917,123 @@ func TestBatchDownloadBlobsCompressed(t *testing.T) {
t.Errorf("client.BatchDownloadBlobs(ctx, digests) had diff (want -> got):\n%s", diff)
}
}

type readResponseModifier func(idx int, resp *repb.BatchReadBlobsResponse_Response)

type invalidReadServer struct {
repb.ContentAddressableStorageServer
readResponseModifier
}

func (s *invalidReadServer) setModifier(modifier readResponseModifier) {
s.readResponseModifier = modifier
}

func (s *invalidReadServer) BatchReadBlobs(ctx context.Context, req *repb.BatchReadBlobsRequest) (*repb.BatchReadBlobsResponse, error) {
resp, err := s.ContentAddressableStorageServer.BatchReadBlobs(ctx, req)

if s.readResponseModifier == nil {
return resp, err
}

for idx, r := range resp.GetResponses() {
s.readResponseModifier(idx, r)
}

return resp, err
}

func TestBatchDownloadBlobsBrokenCompression(t *testing.T) {
t.Parallel()
ctx := context.Background()
listener, err := net.Listen("tcp", ":0")

Check failure on line 1949 in go/pkg/client/cas_test.go

View workflow job for this annotation

GitHub Actions / lint

error returned from interface method should be wrapped: sig: func (github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2.ContentAddressableStorageServer).BatchReadBlobs(context.Context, *github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2.BatchReadBlobsRequest) (*github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2.BatchReadBlobsResponse, error) (wrapcheck)
if err != nil {
t.Fatalf("Cannot listen: %v", err)
}
fakeCAS := fakes.NewCAS()
defer listener.Close()
server := grpc.NewServer()

Check failure on line 1956 in go/pkg/client/cas_test.go

View workflow job for this annotation

GitHub Actions / lint

error returned from interface method should be wrapped: sig: func (github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2.ContentAddressableStorageServer).BatchReadBlobs(context.Context, *github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2.BatchReadBlobsRequest) (*github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2.BatchReadBlobsResponse, error) (wrapcheck)
s := invalidReadServer{
ContentAddressableStorageServer: fakeCAS,
}
regrpc.RegisterContentAddressableStorageServer(server, &s)
go server.Serve(listener)
defer server.Stop()
c, err := client.NewClient(ctx, instance, client.DialParams{
Service: listener.Addr().String(),
NoSecurity: true,
}, client.StartupCapabilities(false))
if err != nil {
t.Fatalf("Error connecting to server: %v", err)
}
defer c.Close()

fooDigest := fakeCAS.Put([]byte("foo"))
barDigest := fakeCAS.Put([]byte("bar"))
fakeDigest := fakeCAS.Put([]byte("fake"))

Check failure on line 1974 in go/pkg/client/cas_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `server.Serve` is not checked (errcheck)
digests := []digest.Digest{fooDigest, barDigest, fakeDigest}
client.UseBatchCompression(true).Apply(c)

type testCase struct {
name string
expected map[digest.Digest]client.CompressedBlobInfo
modifier readResponseModifier
errorContains string
}
tests := []testCase{
{
name: "invalid compressor",
modifier: func(idx int, resp *repb.BatchReadBlobsResponse_Response) {
resp.Compressor = repb.Compressor_DEFLATE
},
errorContains: "with unsupported compressor DEFLATE",
},
{
name: "invalid data",
modifier: func(idx int, resp *repb.BatchReadBlobsResponse_Response) {
resp.Data = []byte{1, 3, 5, 9}
},
errorContains: "magic number mismatch",
},
{
name: "mixed errors",
modifier: func(idx int, resp *repb.BatchReadBlobsResponse_Response) {
if idx == 0 {
resp.Data[0] = 1 // Corrupt compressed data
} else if idx == 1 {
resp.Compressor = repb.Compressor_DEFLATE
}
},
expected: map[digest.Digest]client.CompressedBlobInfo{
fakeDigest: {
CompressedSize: 17,
Data: []byte("fake"),
},
},
errorContains: "with unsupported compressor DEFLATE",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
wantBlobs := test.expected
if wantBlobs == nil {
wantBlobs = map[digest.Digest]client.CompressedBlobInfo{}
}
s.setModifier(test.modifier)
defer s.setModifier(nil)
gotBlobs, err := c.BatchDownloadBlobsWithStats(ctx, digests)
if err == nil {
t.Error("client.BatchDownloadBlobs(ctx, digests) should return download error")
}
errMsg := err.Error()
if !strings.Contains(errMsg, test.errorContains) {
t.Errorf("client.BatchDownloadBlobs(ctx, digests) should report %s: %s", test.errorContains, errMsg)
}
if diff := cmp.Diff(wantBlobs, gotBlobs); diff != "" {
t.Errorf("client.BatchDownloadBlobs(ctx, digests) had diff (want -> got):\n%s", diff)
}
})
}
}

0 comments on commit 7cd94b1

Please sign in to comment.