diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 30b84c2197..35ab450c7a 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -22,7 +22,10 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/extprom" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/storage/bucket" @@ -789,6 +792,10 @@ func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) e if lastErr == nil { return nil } + if c.isCausedByPermissionDenied(lastErr) { + level.Warn(c.logger).Log("msg", "skipping compactUser due to PermissionDenied", "user", userID, "err", lastErr) + return nil + } retries.Wait() } @@ -1024,3 +1031,30 @@ func (c *Compactor) listTenantsWithMetaSyncDirectories() map[string]struct{} { return result } + +func (c *Compactor) isCausedByPermissionDenied(err error) bool { + cause := errors.Cause(err) + if compact.IsRetryError(cause) || compact.IsHaltError(cause) { + cause = errors.Unwrap(cause) + } + if multiErr, ok := cause.(errutil.NonNilMultiRootError); ok { + for _, err := range multiErr { + if c.isPermissionDeniedErr(err) { + return true + } + } + return false + } + return c.isPermissionDeniedErr(cause) +} + +func (c *Compactor) isPermissionDeniedErr(err error) bool { + if c.bucketClient.IsAccessDeniedErr(err) { + return true + } + s, ok := status.FromError(err) + if !ok { + return false + } + return s.Code() == codes.PermissionDenied +} diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 3a22981379..989bcf9c43 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -2028,3 +2028,103 @@ func TestCompactor_ShouldNotTreatInterruptionsAsErrors(t *testing.T) { require.Contains(t, lines, `level=info component=compactor msg="interrupting compaction of user blocks" user=user-1`) require.NotContains(t, logs.String(), `level=error`) } + +func TestCompactor_ShouldNotFailCompactionIfAccessDeniedErrDuringMetaSync(t *testing.T) { + t.Parallel() + + ss := bucketindex.Status{Status: bucketindex.Ok, Version: bucketindex.SyncStatusFileVersion} + content, err := json.Marshal(ss) + require.NoError(t, err) + + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("__markers__", []string{}, nil) + bucketClient.MockIter("", []string{"user-1"}, nil) + bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil) + bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), bucket.ErrKeyPermissionDenied) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", bucket.ErrKeyPermissionDenied) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", bucket.ErrKeyPermissionDenied) + bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), bucket.ErrKeyPermissionDenied) + bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", bucket.ErrKeyPermissionDenied) + bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", bucket.ErrKeyPermissionDenied) + bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) + bucketClient.MockGet("user-1/bucket-index-sync-status.json", string(content), nil) + bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) + bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil) + + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + cfg := prepareConfig() + cfg.ShardingEnabled = true + cfg.ShardingRing.InstanceID = "compactor-1" + cfg.ShardingRing.InstanceAddr = "1.2.3.4" + cfg.ShardingRing.KVStore.Mock = ringStore + + c, _, tsdbPlanner, _, _ := prepare(t, cfg, bucketClient, nil) + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) + + // Wait until a run has completed. + cortex_testutil.Poll(t, 20*time.Second, 1.0, func() interface{} { + return prom_testutil.ToFloat64(c.compactionRunsCompleted) + }) + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) +} + +func TestCompactor_ShouldNotFailCompactionIfAccessDeniedErrReturnedFromBucket(t *testing.T) { + t.Parallel() + + ss := bucketindex.Status{Status: bucketindex.Ok, Version: bucketindex.SyncStatusFileVersion} + content, err := json.Marshal(ss) + require.NoError(t, err) + + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("__markers__", []string{}, nil) + bucketClient.MockIter("", []string{"user-1"}, nil) + bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil) + bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil) + bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil) + bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) + bucketClient.MockGet("user-1/bucket-index-sync-status.json", string(content), nil) + bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) + bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil) + + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + cfg := prepareConfig() + cfg.ShardingEnabled = true + cfg.ShardingRing.InstanceID = "compactor-1" + cfg.ShardingRing.InstanceAddr = "1.2.3.4" + cfg.ShardingRing.KVStore.Mock = ringStore + + c, _, tsdbPlanner, _, _ := prepare(t, cfg, bucketClient, nil) + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, bucket.ErrKeyPermissionDenied) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) + + // Wait until a run has completed. + cortex_testutil.Poll(t, 20*time.Second, 1.0, func() interface{} { + return prom_testutil.ToFloat64(c.compactionRunsCompleted) + }) + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) +} diff --git a/pkg/storage/bucket/client_mock.go b/pkg/storage/bucket/client_mock.go index ac96370aa9..c5eebbd3f4 100644 --- a/pkg/storage/bucket/client_mock.go +++ b/pkg/storage/bucket/client_mock.go @@ -14,7 +14,7 @@ import ( var ( errObjectDoesNotExist = errors.New("object does not exist") - errKeyPermissionDenied = errors.New("object key permission denied") + ErrKeyPermissionDenied = errors.New("object key permission denied") ) // ClientMock mocks objstore.Bucket @@ -188,7 +188,7 @@ func (m *ClientMock) IsObjNotFoundErr(err error) bool { // IsAccessDeniedErr mocks objstore.Bucket.IsAccessDeniedErr() func (m *ClientMock) IsAccessDeniedErr(err error) bool { - return err == errKeyPermissionDenied + return err == ErrKeyPermissionDenied } // ObjectSize mocks objstore.Bucket.Attributes() diff --git a/pkg/storage/bucket/sse_bucket_client_test.go b/pkg/storage/bucket/sse_bucket_client_test.go index 91f5ba21e0..1b2815982c 100644 --- a/pkg/storage/bucket/sse_bucket_client_test.go +++ b/pkg/storage/bucket/sse_bucket_client_test.go @@ -111,7 +111,7 @@ func Test_shouldWrapSSeErrors(t *testing.T) { bkt := &ClientMock{} - bkt.MockGet("Test", "someContent", errKeyPermissionDenied) + bkt.MockGet("Test", "someContent", ErrKeyPermissionDenied) sseBkt := NewSSEBucketClient("user-1", bkt, cfgProvider)