Skip to content

Commit

Permalink
Compactor should skip retry without failure if there is permission de…
Browse files Browse the repository at this point in the history
…nied error (cortexproject#5727)

* Compactor should skip retry without failure if there is permission denied error

Signed-off-by: Alex Le <[email protected]>

* factored code

Signed-off-by: Alex Le <[email protected]>

---------

Signed-off-by: Alex Le <[email protected]>
  • Loading branch information
alexqyle authored Jan 16, 2024
1 parent 8282a2f commit 289c71e
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 3 deletions.
34 changes: 34 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/extprom"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/storage/bucket"
Expand Down Expand Up @@ -789,6 +792,10 @@ func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) e
if lastErr == nil {
return nil
}
if c.isCausedByPermissionDenied(lastErr) {
level.Warn(c.logger).Log("msg", "skipping compactUser due to PermissionDenied", "user", userID, "err", lastErr)
return nil
}

retries.Wait()
}
Expand Down Expand Up @@ -1024,3 +1031,30 @@ func (c *Compactor) listTenantsWithMetaSyncDirectories() map[string]struct{} {

return result
}

func (c *Compactor) isCausedByPermissionDenied(err error) bool {
cause := errors.Cause(err)
if compact.IsRetryError(cause) || compact.IsHaltError(cause) {
cause = errors.Unwrap(cause)
}
if multiErr, ok := cause.(errutil.NonNilMultiRootError); ok {
for _, err := range multiErr {
if c.isPermissionDeniedErr(err) {
return true
}
}
return false
}
return c.isPermissionDeniedErr(cause)
}

func (c *Compactor) isPermissionDeniedErr(err error) bool {
if c.bucketClient.IsAccessDeniedErr(err) {
return true
}
s, ok := status.FromError(err)
if !ok {
return false
}
return s.Code() == codes.PermissionDenied
}
100 changes: 100 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2028,3 +2028,103 @@ func TestCompactor_ShouldNotTreatInterruptionsAsErrors(t *testing.T) {
require.Contains(t, lines, `level=info component=compactor msg="interrupting compaction of user blocks" user=user-1`)
require.NotContains(t, logs.String(), `level=error`)
}

func TestCompactor_ShouldNotFailCompactionIfAccessDeniedErrDuringMetaSync(t *testing.T) {
t.Parallel()

ss := bucketindex.Status{Status: bucketindex.Ok, Version: bucketindex.SyncStatusFileVersion}
content, err := json.Marshal(ss)
require.NoError(t, err)

bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockIter("", []string{"user-1"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil)
bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil)
bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), bucket.ErrKeyPermissionDenied)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", bucket.ErrKeyPermissionDenied)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", bucket.ErrKeyPermissionDenied)
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), bucket.ErrKeyPermissionDenied)
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", bucket.ErrKeyPermissionDenied)
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", bucket.ErrKeyPermissionDenied)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", string(content), nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)

ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

cfg := prepareConfig()
cfg.ShardingEnabled = true
cfg.ShardingRing.InstanceID = "compactor-1"
cfg.ShardingRing.InstanceAddr = "1.2.3.4"
cfg.ShardingRing.KVStore.Mock = ringStore

c, _, tsdbPlanner, _, _ := prepare(t, cfg, bucketClient, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

// Wait until a run has completed.
cortex_testutil.Poll(t, 20*time.Second, 1.0, func() interface{} {
return prom_testutil.ToFloat64(c.compactionRunsCompleted)
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
}

func TestCompactor_ShouldNotFailCompactionIfAccessDeniedErrReturnedFromBucket(t *testing.T) {
t.Parallel()

ss := bucketindex.Status{Status: bucketindex.Ok, Version: bucketindex.SyncStatusFileVersion}
content, err := json.Marshal(ss)
require.NoError(t, err)

bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockIter("", []string{"user-1"}, nil)
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil)
bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil)
bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil)
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil)
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil)
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", string(content), nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)

ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

cfg := prepareConfig()
cfg.ShardingEnabled = true
cfg.ShardingRing.InstanceID = "compactor-1"
cfg.ShardingRing.InstanceAddr = "1.2.3.4"
cfg.ShardingRing.KVStore.Mock = ringStore

c, _, tsdbPlanner, _, _ := prepare(t, cfg, bucketClient, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, bucket.ErrKeyPermissionDenied)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

// Wait until a run has completed.
cortex_testutil.Poll(t, 20*time.Second, 1.0, func() interface{} {
return prom_testutil.ToFloat64(c.compactionRunsCompleted)
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
}
4 changes: 2 additions & 2 deletions pkg/storage/bucket/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

var (
errObjectDoesNotExist = errors.New("object does not exist")
errKeyPermissionDenied = errors.New("object key permission denied")
ErrKeyPermissionDenied = errors.New("object key permission denied")
)

// ClientMock mocks objstore.Bucket
Expand Down Expand Up @@ -188,7 +188,7 @@ func (m *ClientMock) IsObjNotFoundErr(err error) bool {

// IsAccessDeniedErr mocks objstore.Bucket.IsAccessDeniedErr()
func (m *ClientMock) IsAccessDeniedErr(err error) bool {
return err == errKeyPermissionDenied
return err == ErrKeyPermissionDenied
}

// ObjectSize mocks objstore.Bucket.Attributes()
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bucket/sse_bucket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func Test_shouldWrapSSeErrors(t *testing.T) {

bkt := &ClientMock{}

bkt.MockGet("Test", "someContent", errKeyPermissionDenied)
bkt.MockGet("Test", "someContent", ErrKeyPermissionDenied)

sseBkt := NewSSEBucketClient("user-1", bkt, cfgProvider)

Expand Down

0 comments on commit 289c71e

Please sign in to comment.