From 01f4d9d522e957417fef286de38fb267a81f6c49 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 23 Jul 2024 16:46:40 -0700 Subject: [PATCH] Split cleaner cycle for active and deleted tenants (#6112) * Split cleaner cycle for active and deleted tenants Signed-off-by: Alex Le * update CHANGELOG Signed-off-by: Alex Le * refactor code Signed-off-by: Alex Le * rename label Signed-off-by: Alex Le --------- Signed-off-by: Alex Le --- CHANGELOG.md | 1 + pkg/compactor/blocks_cleaner.go | 257 ++++++++++++++++++++------- pkg/compactor/blocks_cleaner_test.go | 78 +++++--- pkg/compactor/compactor_test.go | 97 ++++------ 4 files changed, 281 insertions(+), 152 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a9c050e98..f04808f267 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ * [ENHANCEMENT] Compactor: Centralize metrics used by compactor and add user label to compactor metrics. #6096 * [ENHANCEMENT] Compactor: Add unique execution ID for each compaction cycle in log for easy debugging. #6097 * [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040 +* [ENHANCEMENT] Compactor: Split cleaner cycle for active and deleted tenants. #6112 * [BUGFIX] Configsdb: Fix endline issue in db password. #5920 * [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952 * [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018 diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index a9c4fa7f3d..a447231b64 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -15,6 +15,7 @@ import ( "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" @@ -28,6 +29,8 @@ import ( const ( defaultDeleteBlocksConcurrency = 16 reasonValueRetention = "retention" + activeStatus = "active" + deletedStatus = "deleted" ) type BlocksCleanerConfig struct { @@ -51,10 +54,10 @@ type BlocksCleaner struct { lastOwnedUsers []string // Metrics. - runsStarted prometheus.Counter - runsCompleted prometheus.Counter - runsFailed prometheus.Counter - runsLastSuccess prometheus.Gauge + runsStarted *prometheus.CounterVec + runsCompleted *prometheus.CounterVec + runsFailed *prometheus.CounterVec + runsLastSuccess *prometheus.GaugeVec blocksCleanedTotal prometheus.Counter blocksFailedTotal prometheus.Counter blocksMarkedForDeletion *prometheus.CounterVec @@ -63,6 +66,8 @@ type BlocksCleaner struct { tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec tenantPartialBlocks *prometheus.GaugeVec tenantBucketIndexLastUpdate *prometheus.GaugeVec + tenantBlocksCleanedTotal *prometheus.CounterVec + tenantCleanDuration *prometheus.GaugeVec } func NewBlocksCleaner( @@ -80,22 +85,22 @@ func NewBlocksCleaner( usersScanner: usersScanner, cfgProvider: cfgProvider, logger: log.With(logger, "component", "cleaner"), - runsStarted: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + runsStarted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_compactor_block_cleanup_started_total", Help: "Total number of blocks cleanup runs started.", - }), - runsCompleted: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + }, []string{"user_status"}), + runsCompleted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_compactor_block_cleanup_completed_total", Help: "Total number of blocks cleanup runs successfully completed.", - }), - runsFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + }, []string{"user_status"}), + runsFailed: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_compactor_block_cleanup_failed_total", Help: "Total number of blocks cleanup runs failed.", - }), - runsLastSuccess: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + }, []string{"user_status"}), + runsLastSuccess: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_compactor_block_cleanup_last_successful_run_timestamp_seconds", Help: "Unix timestamp of the last successful blocks cleanup run.", - }), + }, []string{"user_status"}), blocksCleanedTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_blocks_cleaned_total", Help: "Total number of blocks deleted.", @@ -129,54 +134,162 @@ func NewBlocksCleaner( Name: "cortex_bucket_index_last_successful_update_timestamp_seconds", Help: "Timestamp of the last successful update of a tenant's bucket index.", }, []string{"user"}), + tenantBlocksCleanedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_bucket_blocks_cleaned_total", + Help: "Total number of blocks deleted for a tenant.", + }, commonLabels), + tenantCleanDuration: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_bucket_clean_duration_seconds", + Help: "Duration of cleaner runtime for a tenant in seconds", + }, commonLabels), } - c.Service = services.NewTimerService(cfg.CleanupInterval, c.starting, c.ticker, nil) + c.Service = services.NewBasicService(c.starting, c.loop, nil) return c } +type cleanerJob struct { + users []string + timestamp int64 +} + func (c *BlocksCleaner) starting(ctx context.Context) error { // Run a cleanup so that any other service depending on this service // is guaranteed to start once the initial cleanup has been done. - c.runCleanup(ctx, true) + activeUsers, deletedUsers, err := c.scanUsers(ctx) + if err != nil { + level.Error(c.logger).Log("msg", "failed to scan users on startup", "err", err.Error()) + c.runsFailed.WithLabelValues(deletedStatus).Inc() + c.runsFailed.WithLabelValues(activeStatus).Inc() + return nil + } + err = c.cleanUpActiveUsers(ctx, activeUsers, true) + c.checkRunError(activeStatus, err) + err = c.cleanDeletedUsers(ctx, deletedUsers) + c.checkRunError(deletedStatus, err) return nil } -func (c *BlocksCleaner) ticker(ctx context.Context) error { - c.runCleanup(ctx, false) +func (c *BlocksCleaner) loop(ctx context.Context) error { + t := time.NewTicker(c.cfg.CleanupInterval) + defer t.Stop() - return nil -} + usersChan := make(chan *cleanerJob) + deleteChan := make(chan *cleanerJob) + defer close(usersChan) + defer close(deleteChan) + + go func() { + c.runActiveUserCleanup(ctx, usersChan) + }() + go func() { + c.runDeleteUserCleanup(ctx, deleteChan) + }() + + for { + select { + case <-t.C: + activeUsers, deletedUsers, err := c.scanUsers(ctx) + if err != nil { + level.Error(c.logger).Log("msg", "failed to scan users blocks cleanup and maintenance", "err", err.Error()) + c.runsFailed.WithLabelValues(deletedStatus).Inc() + c.runsFailed.WithLabelValues(activeStatus).Inc() + continue + } + cleanJobTimestamp := time.Now().Unix() + usersChan <- &cleanerJob{ + users: activeUsers, + timestamp: cleanJobTimestamp, + } + deleteChan <- &cleanerJob{ + users: deletedUsers, + timestamp: cleanJobTimestamp, + } -func (c *BlocksCleaner) runCleanup(ctx context.Context, firstRun bool) { - level.Info(c.logger).Log("msg", "started blocks cleanup and maintenance") - c.runsStarted.Inc() + case <-ctx.Done(): + return nil + } + } +} - if err := c.cleanUsers(ctx, firstRun); err == nil { - level.Info(c.logger).Log("msg", "successfully completed blocks cleanup and maintenance") - c.runsCompleted.Inc() - c.runsLastSuccess.SetToCurrentTime() +func (c *BlocksCleaner) checkRunError(runType string, err error) { + if err == nil { + level.Info(c.logger).Log("msg", fmt.Sprintf("successfully completed blocks cleanup and maintenance for %s users", runType)) + c.runsCompleted.WithLabelValues(runType).Inc() + c.runsLastSuccess.WithLabelValues(runType).SetToCurrentTime() } else if errors.Is(err, context.Canceled) { - level.Info(c.logger).Log("msg", "canceled blocks cleanup and maintenance", "err", err) - return + level.Info(c.logger).Log("msg", fmt.Sprintf("canceled blocks cleanup and maintenance for %s users", runType), "err", err) } else { - level.Error(c.logger).Log("msg", "failed to run blocks cleanup and maintenance", "err", err.Error()) - c.runsFailed.Inc() + level.Error(c.logger).Log("msg", fmt.Sprintf("failed to run blocks cleanup and maintenance for %s users", runType), "err", err.Error()) + c.runsFailed.WithLabelValues(runType).Inc() } } -func (c *BlocksCleaner) cleanUsers(ctx context.Context, firstRun bool) error { +func (c *BlocksCleaner) runActiveUserCleanup(ctx context.Context, jobChan chan *cleanerJob) { + for job := range jobChan { + if job.timestamp < time.Now().Add(-c.cfg.CleanupInterval).Unix() { + level.Warn(c.logger).Log("Active user cleaner job too old. Ignoring to get recent data") + continue + } + err := c.cleanUpActiveUsers(ctx, job.users, false) + + c.checkRunError(activeStatus, err) + } +} + +func (c *BlocksCleaner) cleanUpActiveUsers(ctx context.Context, users []string, firstRun bool) error { + level.Info(c.logger).Log("msg", "started blocks cleanup and maintenance for active users") + c.runsStarted.WithLabelValues(activeStatus).Inc() + + return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error { + userLogger := util_log.WithUserID(userID, c.logger) + userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider) + errChan := make(chan error, 1) + defer func() { + errChan <- nil + }() + return errors.Wrapf(c.cleanUser(ctx, userLogger, userBucket, userID, firstRun), "failed to delete blocks for user: %s", userID) + }) +} + +func (c *BlocksCleaner) runDeleteUserCleanup(ctx context.Context, jobChan chan *cleanerJob) { + for job := range jobChan { + if job.timestamp < time.Now().Add(-c.cfg.CleanupInterval).Unix() { + level.Warn(c.logger).Log("Delete users cleaner job too old. Ignoring to get recent data") + continue + } + err := c.cleanDeletedUsers(ctx, job.users) + + c.checkRunError(deletedStatus, err) + } +} + +func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) error { + level.Info(c.logger).Log("msg", "started blocks cleanup and maintenance for deleted users") + c.runsStarted.WithLabelValues(deletedStatus).Inc() + + return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error { + userLogger := util_log.WithUserID(userID, c.logger) + userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider) + errChan := make(chan error, 1) + defer func() { + errChan <- nil + }() + return errors.Wrapf(c.deleteUserMarkedForDeletion(ctx, userLogger, userBucket, userID), "failed to delete user marked for deletion: %s", userID) + }) +} + +func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, error) { users, deleted, err := c.usersScanner.ScanUsers(ctx) if err != nil { - return errors.Wrap(err, "failed to discover users from bucket") + return nil, nil, errors.Wrap(err, "failed to discover users from bucket") } isActive := util.StringsMap(users) isDeleted := util.StringsMap(deleted) allUsers := append(users, deleted...) - // Delete per-tenant metrics for all tenants not belonging anymore to this shard. // Such tenants have been moved to a different shard, so their updated metrics will // be exported by the new shard. @@ -191,18 +304,11 @@ func (c *BlocksCleaner) cleanUsers(ctx context.Context, firstRun bool) error { } c.lastOwnedUsers = allUsers - return concurrency.ForEachUser(ctx, allUsers, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error { - if isDeleted[userID] { - return errors.Wrapf(c.deleteUserMarkedForDeletion(ctx, userID), "failed to delete user marked for deletion: %s", userID) - } - return errors.Wrapf(c.cleanUser(ctx, userID, firstRun), "failed to delete blocks for user: %s", userID) - }) + return users, deleted, nil } // Remove blocks and remaining data for tenant marked for deletion. -func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID string) error { - userLogger := util_log.WithUserID(userID, c.logger) - userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider) +func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) error { level.Info(userLogger).Log("msg", "deleting blocks for tenant marked for deletion") @@ -211,51 +317,58 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID if err := bucketindex.DeleteIndex(ctx, c.bucketClient, userID, c.cfgProvider); err != nil { return err } - // Delete the bucket sync status if err := bucketindex.DeleteIndexSyncStatus(ctx, c.bucketClient, userID); err != nil { return err } c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID) - var deletedBlocks, failed int + var blocksToDelete []interface{} err := userBucket.Iter(ctx, "", func(name string) error { if err := ctx.Err(); err != nil { return err } - id, ok := block.IsBlockDir(name) if !ok { return nil } + blocksToDelete = append(blocksToDelete, id) + return nil + }) + if err != nil { + return err + } - err := block.Delete(ctx, userLogger, userBucket, id) + var deletedBlocks, failed atomic.Int64 + err = concurrency.ForEach(ctx, blocksToDelete, defaultDeleteBlocksConcurrency, func(ctx context.Context, job interface{}) error { + blockID := job.(ulid.ULID) + err := block.Delete(ctx, userLogger, userBucket, blockID) if err != nil { - failed++ + failed.Add(1) c.blocksFailedTotal.Inc() - level.Warn(userLogger).Log("msg", "failed to delete block", "block", id, "err", err) + level.Warn(userLogger).Log("msg", "failed to delete block", "block", blockID, "err", err) return nil // Continue with other blocks. } - deletedBlocks++ + deletedBlocks.Add(1) c.blocksCleanedTotal.Inc() - level.Info(userLogger).Log("msg", "deleted block", "block", id) + c.tenantBlocksCleanedTotal.WithLabelValues(userID).Inc() + level.Info(userLogger).Log("msg", "deleted block", "block", blockID) return nil }) - if err != nil { return err } - if failed > 0 { + if failed.Load() > 0 { // The number of blocks left in the storage is equal to the number of blocks we failed // to delete. We also consider them all marked for deletion given the next run will try // to delete them again. - c.tenantBlocks.WithLabelValues(userID).Set(float64(failed)) - c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(failed)) + c.tenantBlocks.WithLabelValues(userID).Set(float64(failed.Load())) + c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(failed.Load())) c.tenantPartialBlocks.WithLabelValues(userID).Set(0) - return errors.Errorf("failed to delete %d blocks", failed) + return errors.Errorf("failed to delete %d blocks", failed.Load()) } // Given all blocks have been deleted, we can also remove the metrics. @@ -264,8 +377,8 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID) c.tenantPartialBlocks.DeleteLabelValues(userID) - if deletedBlocks > 0 { - level.Info(userLogger).Log("msg", "deleted blocks for tenant marked for deletion", "deletedBlocks", deletedBlocks) + if deletedBlocks.Load() > 0 { + level.Info(userLogger).Log("msg", "deleted blocks for tenant marked for deletion", "deletedBlocks", deletedBlocks.Load()) } mark, err := cortex_tsdb.ReadTenantDeletionMark(ctx, c.bucketClient, userID) @@ -275,22 +388,18 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID if mark == nil { return errors.Wrap(err, "cannot find tenant deletion mark anymore") } - // If we have just deleted some blocks, update "finished" time. Also update "finished" time if it wasn't set yet, but there are no blocks. // Note: this UPDATES the tenant deletion mark. Components that use caching bucket will NOT SEE this update, // but that is fine -- they only check whether tenant deletion marker exists or not. - if deletedBlocks > 0 || mark.FinishedTime == 0 { + if deletedBlocks.Load() > 0 || mark.FinishedTime == 0 { level.Info(userLogger).Log("msg", "updating finished time in tenant deletion mark") mark.FinishedTime = time.Now().Unix() return errors.Wrap(cortex_tsdb.WriteTenantDeletionMark(ctx, c.bucketClient, userID, mark), "failed to update tenant deletion mark") } - if time.Since(time.Unix(mark.FinishedTime, 0)) < c.cfg.TenantCleanupDelay { return nil } - level.Info(userLogger).Log("msg", "cleaning up remaining blocks data for tenant marked for deletion") - // Let's do final cleanup of tenant. if deleted, err := bucket.DeletePrefix(ctx, userBucket, block.DebugMetas, userLogger); err != nil { return errors.Wrap(err, "failed to delete "+block.DebugMetas) @@ -303,17 +412,14 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID } else if deleted > 0 { level.Info(userLogger).Log("msg", "deleted marker files for tenant marked for deletion", "count", deleted) } - if err := cortex_tsdb.DeleteTenantDeletionMark(ctx, c.bucketClient, userID); err != nil { return errors.Wrap(err, "failed to delete tenant deletion mark") } - return nil } -func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun bool) (returnErr error) { - userLogger := util_log.WithUserID(userID, c.logger) - userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider) +func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string, firstRun bool) (returnErr error) { + c.blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention) startTime := time.Now() level.Info(userLogger).Log("msg", "started blocks cleanup and maintenance") @@ -323,6 +429,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b } else { level.Info(userLogger).Log("msg", "completed blocks cleanup and maintenance", "duration", time.Since(startTime)) } + c.tenantCleanDuration.WithLabelValues(userID).Set(time.Since(startTime).Seconds()) }() // Migrate block deletion marks to the global markers location. This operation is a best-effort. @@ -346,6 +453,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b idxs.SyncTime = time.Now().Unix() // Read the bucket index. + begin := time.Now() idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger) defer func() { @@ -370,6 +478,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b idxs.Status = bucketindex.GenericError return err } + level.Info(userLogger).Log("msg", "finish reading index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) // Mark blocks for future deletion based on the retention period for the user. // Note doing this before UpdateIndex, so it reads in the deletion marks. @@ -383,15 +492,18 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b } // Generate an updated in-memory version of the bucket index. + begin = time.Now() w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger) idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx) if err != nil { idxs.Status = bucketindex.GenericError return err } + level.Info(userLogger).Log("msg", "finish updating index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) // Delete blocks marked for deletion. We iterate over a copy of deletion marks because // we'll need to manipulate the index (removing blocks which get deleted). + begin = time.Now() blocksToDelete := make([]interface{}, 0, len(idx.BlockDeletionMarks)) var mux sync.Mutex for _, mark := range idx.BlockDeletionMarks.Clone() { @@ -400,8 +512,10 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b } blocksToDelete = append(blocksToDelete, mark.ID) } + level.Info(userLogger).Log("msg", "finish getting blocks to be deleted", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) // Concurrently deletes blocks marked for deletion, and removes blocks from index. + begin = time.Now() _ = concurrency.ForEach(ctx, blocksToDelete, defaultDeleteBlocksConcurrency, func(ctx context.Context, job interface{}) error { blockID := job.(ulid.ULID) @@ -417,20 +531,26 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b mux.Unlock() c.blocksCleanedTotal.Inc() + c.tenantBlocksCleanedTotal.WithLabelValues(userID).Inc() level.Info(userLogger).Log("msg", "deleted block marked for deletion", "block", blockID) return nil }) + level.Info(userLogger).Log("msg", "finish deleting blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) // Partial blocks with a deletion mark can be cleaned up. This is a best effort, so we don't return // error if the cleanup of partial blocks fail. if len(partials) > 0 { - c.cleanUserPartialBlocks(ctx, partials, idx, userBucket, userLogger) + begin = time.Now() + c.cleanUserPartialBlocks(ctx, userID, partials, idx, userBucket, userLogger) + level.Info(userLogger).Log("msg", "finish cleaning partial blocks", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) } // Upload the updated index to the storage. + begin = time.Now() if err := bucketindex.WriteIndex(ctx, c.bucketClient, userID, c.cfgProvider, idx); err != nil { return err } + level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks))) c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks))) @@ -442,7 +562,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b // cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map // and index are updated accordingly. -func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) { +func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, userID string, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) { // Collect all blocks with missing meta.json into buffered channel. blocks := make([]interface{}, 0, len(partials)) @@ -497,6 +617,7 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map mux.Unlock() c.blocksCleanedTotal.Inc() + c.tenantBlocksCleanedTotal.WithLabelValues(userID).Inc() level.Info(userLogger).Log("msg", "deleted partial block marked for deletion", "block", blockID) return nil }) diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index e33994df8b..b582fceb7c 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -20,9 +20,11 @@ import ( "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" + util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -89,7 +91,9 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { // Clean User with no error cleaner.bucketClient = bkt - err := cleaner.cleanUser(ctx, userID, false) + userLogger := util_log.WithUserID(userID, cleaner.logger) + userBucket := bucket.NewUserBucketClient(userID, cleaner.bucketClient, cleaner.cfgProvider) + err := cleaner.cleanUser(ctx, userLogger, userBucket, userID, false) require.NoError(t, err) s, err := bucketindex.ReadSyncStatus(ctx, bkt, userID, logger) require.NoError(t, err) @@ -98,7 +102,9 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { // Clean with cmk error cleaner.bucketClient = mbucket - err = cleaner.cleanUser(ctx, userID, false) + userLogger = util_log.WithUserID(userID, cleaner.logger) + userBucket = bucket.NewUserBucketClient(userID, cleaner.bucketClient, cleaner.cfgProvider) + err = cleaner.cleanUser(ctx, userLogger, userBucket, userID, false) require.NoError(t, err) s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger) require.NoError(t, err) @@ -107,7 +113,9 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { // Re grant access to the key cleaner.bucketClient = bkt - err = cleaner.cleanUser(ctx, userID, false) + userLogger = util_log.WithUserID(userID, cleaner.logger) + userBucket = bucket.NewUserBucketClient(userID, cleaner.bucketClient, cleaner.cfgProvider) + err = cleaner.cleanUser(ctx, userLogger, userBucket, userID, false) require.NoError(t, err) s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger) require.NoError(t, err) @@ -238,9 +246,9 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions assert.Equal(t, tc.expectedExists, exists, tc.user) } - assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsStarted)) - assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsCompleted)) - assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.runsFailed)) + assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsStarted.WithLabelValues(activeStatus))) + assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsCompleted.WithLabelValues(activeStatus))) + assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.runsFailed.WithLabelValues(activeStatus))) assert.Equal(t, float64(7), testutil.ToFloat64(cleaner.blocksCleanedTotal)) assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.blocksFailedTotal)) @@ -365,9 +373,9 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { assert.Equal(t, tc.expectedExists, exists, tc.path) } - assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsStarted)) - assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsCompleted)) - assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.runsFailed)) + assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsStarted.WithLabelValues(activeStatus))) + assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsCompleted.WithLabelValues(activeStatus))) + assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.runsFailed.WithLabelValues(activeStatus))) assert.Equal(t, float64(2), testutil.ToFloat64(cleaner.blocksCleanedTotal)) assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.blocksFailedTotal)) @@ -428,9 +436,9 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { assert.Equal(t, tc.expectedExists, exists, tc.path) } - assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsStarted)) - assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsCompleted)) - assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.runsFailed)) + assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsStarted.WithLabelValues(activeStatus))) + assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsCompleted.WithLabelValues(activeStatus))) + assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.runsFailed.WithLabelValues(activeStatus))) assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.blocksCleanedTotal)) assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.blocksFailedTotal)) @@ -470,7 +478,10 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar }, append(commonLabels, ReasonLabelName)) cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion) - require.NoError(t, cleaner.cleanUsers(ctx, true)) + activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) + require.NoError(t, err) + require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true)) + require.NoError(t, cleaner.cleanDeletedUsers(ctx, deleteUsers)) assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_bucket_blocks_count Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks. @@ -498,7 +509,10 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar createTSDBBlock(t, bucketClient, "user-1", 40, 50, nil) createTSDBBlock(t, bucketClient, "user-2", 50, 60, nil) - require.NoError(t, cleaner.cleanUsers(ctx, false)) + activeUsers, deleteUsers, err = cleaner.scanUsers(ctx) + require.NoError(t, err) + require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, false)) + require.NoError(t, cleaner.cleanDeletedUsers(ctx, deleteUsers)) assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_bucket_blocks_count Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks. @@ -617,7 +631,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { cfgProvider.userRetentionPeriods["user-1"] = 0 cfgProvider.userRetentionPeriods["user-2"] = 0 - require.NoError(t, cleaner.cleanUsers(ctx, true)) + activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) + require.NoError(t, err) + require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true)) + require.NoError(t, cleaner.cleanDeletedUsers(ctx, deleteUsers)) assertBlockExists("user-1", block1, true) assertBlockExists("user-1", block2, true) assertBlockExists("user-2", block3, true) @@ -632,6 +649,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { # TYPE cortex_bucket_blocks_marked_for_deletion_count gauge cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 0 cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0 + # HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor. + # TYPE cortex_compactor_blocks_marked_for_deletion_total counter + cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-1"} 0 + cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-2"} 0 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", @@ -643,7 +664,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { { cfgProvider.userRetentionPeriods["user-1"] = 9 * time.Hour - require.NoError(t, cleaner.cleanUsers(ctx, false)) + activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) + require.NoError(t, err) + require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, false)) + require.NoError(t, cleaner.cleanDeletedUsers(ctx, deleteUsers)) assertBlockExists("user-1", block1, true) assertBlockExists("user-1", block2, true) assertBlockExists("user-2", block3, true) @@ -655,7 +679,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { { cfgProvider.userRetentionPeriods["user-1"] = 7 * time.Hour - require.NoError(t, cleaner.cleanUsers(ctx, false)) + activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) + require.NoError(t, err) + require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, false)) + require.NoError(t, cleaner.cleanDeletedUsers(ctx, deleteUsers)) assertBlockExists("user-1", block1, true) assertBlockExists("user-1", block2, true) assertBlockExists("user-2", block3, true) @@ -673,6 +700,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { # HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor. # TYPE cortex_compactor_blocks_marked_for_deletion_total counter cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-1"} 1 + cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-2"} 0 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", @@ -682,7 +710,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { // Marking the block again, before the deletion occurs, should not cause an error. { - require.NoError(t, cleaner.cleanUsers(ctx, false)) + activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) + require.NoError(t, err) + require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, false)) + require.NoError(t, cleaner.cleanDeletedUsers(ctx, deleteUsers)) assertBlockExists("user-1", block1, true) assertBlockExists("user-1", block2, true) assertBlockExists("user-2", block3, true) @@ -693,7 +724,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { { cleaner.cfg.DeletionDelay = 0 - require.NoError(t, cleaner.cleanUsers(ctx, false)) + activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) + require.NoError(t, err) + require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, false)) + require.NoError(t, cleaner.cleanDeletedUsers(ctx, deleteUsers)) assertBlockExists("user-1", block1, false) assertBlockExists("user-1", block2, true) assertBlockExists("user-2", block3, true) @@ -711,6 +745,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { # HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor. # TYPE cortex_compactor_blocks_marked_for_deletion_total counter cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-1"} 1 + cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-2"} 0 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", @@ -722,7 +757,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { { cfgProvider.userRetentionPeriods["user-2"] = 5 * time.Hour - require.NoError(t, cleaner.cleanUsers(ctx, false)) + activeUsers, deleteUsers, err := cleaner.scanUsers(ctx) + require.NoError(t, err) + require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, false)) + require.NoError(t, cleaner.cleanDeletedUsers(ctx, deleteUsers)) assertBlockExists("user-1", block1, false) assertBlockExists("user-1", block2, true) assertBlockExists("user-2", block3, false) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 6015adff86..8a77b6b551 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -214,8 +214,6 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { assert.Equal(t, prom_testutil.ToFloat64(c.CompactionRunInterval), cfg.CompactionInterval.Seconds()) assert.ElementsMatch(t, []string{ - `level=info component=cleaner msg="started blocks cleanup and maintenance"`, - `level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`, `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=info component=compactor msg="discovered users from bucket" users=0`, @@ -252,15 +250,13 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { # TYPE cortex_compactor_block_cleanup_started_total counter # HELP cortex_compactor_block_cleanup_started_total Total number of blocks cleanup runs started. - cortex_compactor_block_cleanup_started_total 1 + cortex_compactor_block_cleanup_started_total{user_status="active"} 1 + cortex_compactor_block_cleanup_started_total{user_status="deleted"} 1 # TYPE cortex_compactor_block_cleanup_completed_total counter # HELP cortex_compactor_block_cleanup_completed_total Total number of blocks cleanup runs successfully completed. - cortex_compactor_block_cleanup_completed_total 1 - - # TYPE cortex_compactor_block_cleanup_failed_total counter - # HELP cortex_compactor_block_cleanup_failed_total Total number of blocks cleanup runs failed. - cortex_compactor_block_cleanup_failed_total 0 + cortex_compactor_block_cleanup_completed_total{user_status="active"} 1 + cortex_compactor_block_cleanup_completed_total{user_status="deleted"} 1 `), "cortex_compactor_runs_started_total", "cortex_compactor_runs_completed_total", @@ -310,8 +306,7 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket bucketClient.AssertNumberOfCalls(t, "Iter", 1+3) assert.ElementsMatch(t, []string{ - `level=info component=cleaner msg="started blocks cleanup and maintenance"`, - `level=error component=cleaner msg="failed to run blocks cleanup and maintenance" err="failed to discover users from bucket: failed to iterate the bucket"`, + `level=error component=cleaner msg="failed to scan users on startup" err="failed to discover users from bucket: failed to iterate the bucket"`, `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=error component=compactor msg="failed to discover users from bucket" err="failed to iterate the bucket"`, @@ -338,10 +333,6 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket # TYPE cortex_compactor_blocks_cleaned_total counter cortex_compactor_blocks_cleaned_total 0 - # TYPE cortex_compactor_block_cleanup_started_total counter - # HELP cortex_compactor_block_cleanup_started_total Total number of blocks cleanup runs started. - cortex_compactor_block_cleanup_started_total 1 - # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter cortex_compactor_blocks_marked_for_no_compaction_total 0 @@ -350,13 +341,10 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket # TYPE cortex_compactor_meta_sync_consistency_delay_seconds gauge cortex_compactor_meta_sync_consistency_delay_seconds 0 - # TYPE cortex_compactor_block_cleanup_completed_total counter - # HELP cortex_compactor_block_cleanup_completed_total Total number of blocks cleanup runs successfully completed. - cortex_compactor_block_cleanup_completed_total 0 - # TYPE cortex_compactor_block_cleanup_failed_total counter # HELP cortex_compactor_block_cleanup_failed_total Total number of blocks cleanup runs failed. - cortex_compactor_block_cleanup_failed_total 1 + cortex_compactor_block_cleanup_failed_total{user_status="active"} 1 + cortex_compactor_block_cleanup_failed_total{user_status="deleted"} 1 `), "cortex_compactor_runs_started_total", "cortex_compactor_runs_completed_total", @@ -549,12 +537,6 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { assert.Len(t, tsdbPlanner.getNoCompactBlocks(), 0) assert.ElementsMatch(t, []string{ - `level=info component=cleaner msg="started blocks cleanup and maintenance"`, - `level=info component=cleaner org_id=user-1 msg="started blocks cleanup and maintenance"`, - `level=info component=cleaner org_id=user-1 msg="completed blocks cleanup and maintenance"`, - `level=info component=cleaner org_id=user-2 msg="started blocks cleanup and maintenance"`, - `level=info component=cleaner org_id=user-2 msg="completed blocks cleanup and maintenance"`, - `level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`, `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=info component=compactor msg="discovered users from bucket" users=2`, @@ -605,18 +587,18 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { # TYPE cortex_compactor_blocks_marked_for_deletion_total counter cortex_compactor_blocks_marked_for_deletion_total{reason="compaction",user="user-1"} 0 cortex_compactor_blocks_marked_for_deletion_total{reason="compaction",user="user-2"} 0 + cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-1"} 0 + cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-2"} 0 # TYPE cortex_compactor_block_cleanup_started_total counter # HELP cortex_compactor_block_cleanup_started_total Total number of blocks cleanup runs started. - cortex_compactor_block_cleanup_started_total 1 + cortex_compactor_block_cleanup_started_total{user_status="active"} 1 + cortex_compactor_block_cleanup_started_total{user_status="deleted"} 1 # TYPE cortex_compactor_block_cleanup_completed_total counter # HELP cortex_compactor_block_cleanup_completed_total Total number of blocks cleanup runs successfully completed. - cortex_compactor_block_cleanup_completed_total 1 - - # TYPE cortex_compactor_block_cleanup_failed_total counter - # HELP cortex_compactor_block_cleanup_failed_total Total number of blocks cleanup runs failed. - cortex_compactor_block_cleanup_failed_total 0 + cortex_compactor_block_cleanup_completed_total{user_status="active"} 1 + cortex_compactor_block_cleanup_completed_total{user_status="deleted"} 1 # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter @@ -683,13 +665,6 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { tsdbPlanner.AssertNumberOfCalls(t, "Plan", 0) assert.ElementsMatch(t, []string{ - `level=info component=cleaner msg="started blocks cleanup and maintenance"`, - `level=info component=cleaner org_id=user-1 msg="started blocks cleanup and maintenance"`, - `level=debug component=cleaner org_id=user-1 msg="deleted file" file=01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json bucket=mock`, - `level=debug component=cleaner org_id=user-1 msg="deleted file" file=01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json bucket=mock`, - `level=info component=cleaner org_id=user-1 msg="deleted block marked for deletion" block=01DTW0ZCPDDNV4BV83Q2SV4QAZ`, - `level=info component=cleaner org_id=user-1 msg="completed blocks cleanup and maintenance"`, - `level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`, `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=info component=compactor msg="discovered users from bucket" users=1`, @@ -733,18 +708,17 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { # HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor. # TYPE cortex_compactor_blocks_marked_for_deletion_total counter cortex_compactor_blocks_marked_for_deletion_total{reason="compaction",user="user-1"} 0 + cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-1"} 0 # TYPE cortex_compactor_block_cleanup_started_total counter # HELP cortex_compactor_block_cleanup_started_total Total number of blocks cleanup runs started. - cortex_compactor_block_cleanup_started_total 1 + cortex_compactor_block_cleanup_started_total{user_status="active"} 1 + cortex_compactor_block_cleanup_started_total{user_status="deleted"} 1 # TYPE cortex_compactor_block_cleanup_completed_total counter # HELP cortex_compactor_block_cleanup_completed_total Total number of blocks cleanup runs successfully completed. - cortex_compactor_block_cleanup_completed_total 1 - - # TYPE cortex_compactor_block_cleanup_failed_total counter - # HELP cortex_compactor_block_cleanup_failed_total Total number of blocks cleanup runs failed. - cortex_compactor_block_cleanup_failed_total 0 + cortex_compactor_block_cleanup_completed_total{user_status="active"} 1 + cortex_compactor_block_cleanup_completed_total{user_status="deleted"} 1 # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter @@ -876,14 +850,6 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) tsdbPlanner.AssertNumberOfCalls(t, "Plan", 0) assert.ElementsMatch(t, []string{ - `level=info component=cleaner msg="started blocks cleanup and maintenance"`, - `level=info component=cleaner org_id=user-1 msg="deleting blocks for tenant marked for deletion"`, - `level=debug component=cleaner org_id=user-1 msg="deleted file" file=01DTVP434PA9VFXSW2JKB3392D/meta.json bucket=mock`, - `level=debug component=cleaner org_id=user-1 msg="deleted file" file=01DTVP434PA9VFXSW2JKB3392D/index bucket=mock`, - `level=info component=cleaner org_id=user-1 msg="deleted block" block=01DTVP434PA9VFXSW2JKB3392D`, - `level=info component=cleaner org_id=user-1 msg="deleted blocks for tenant marked for deletion" deletedBlocks=1`, - `level=info component=cleaner org_id=user-1 msg="updating finished time in tenant deletion mark"`, - `level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`, `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=info component=compactor msg="discovered users from bucket" users=1`, @@ -922,15 +888,13 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) # TYPE cortex_compactor_block_cleanup_started_total counter # HELP cortex_compactor_block_cleanup_started_total Total number of blocks cleanup runs started. - cortex_compactor_block_cleanup_started_total 1 + cortex_compactor_block_cleanup_started_total{user_status="active"} 1 + cortex_compactor_block_cleanup_started_total{user_status="deleted"} 1 # TYPE cortex_compactor_block_cleanup_completed_total counter # HELP cortex_compactor_block_cleanup_completed_total Total number of blocks cleanup runs successfully completed. - cortex_compactor_block_cleanup_completed_total 1 - - # TYPE cortex_compactor_block_cleanup_failed_total counter - # HELP cortex_compactor_block_cleanup_failed_total Total number of blocks cleanup runs failed. - cortex_compactor_block_cleanup_failed_total 0 + cortex_compactor_block_cleanup_completed_total{user_status="active"} 1 + cortex_compactor_block_cleanup_completed_total{user_status="deleted"} 1 # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter @@ -1078,12 +1042,6 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni assert.ElementsMatch(t, []string{ `level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`, `level=info component=compactor msg="compactor is ACTIVE in the ring"`, - `level=info component=cleaner msg="started blocks cleanup and maintenance"`, - `level=info component=cleaner org_id=user-1 msg="started blocks cleanup and maintenance"`, - `level=info component=cleaner org_id=user-1 msg="completed blocks cleanup and maintenance"`, - `level=info component=cleaner org_id=user-2 msg="started blocks cleanup and maintenance"`, - `level=info component=cleaner org_id=user-2 msg="completed blocks cleanup and maintenance"`, - `level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`, `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=info component=compactor msg="discovered users from bucket" users=2`, @@ -1507,10 +1465,15 @@ func removeIgnoredLogs(input []string) []string { `level=info component=compactor msg="compactor stopped"`: {}, } + ignoredLogStringsRegexList := []*regexp.Regexp{ + regexp.MustCompile(`^level=(info|debug|warn) component=cleaner .+$`), + } + out := make([]string, 0, len(input)) durationRe := regexp.MustCompile(`\s?duration(_ms)?=\S+`) executionIDRe := regexp.MustCompile(`\s?execution_id=\S+`) +main: for i := 0; i < len(input); i++ { log := input[i] @@ -1528,6 +1491,12 @@ func removeIgnoredLogs(input []string) []string { continue } + for _, ignoreRegex := range ignoredLogStringsRegexList { + if ignoreRegex.MatchString(log) { + continue main + } + } + out = append(out, log) }