From 3ae12a298dc52ce062e9505ab3b44f1f34e09530 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 23 Jul 2024 11:54:18 -0700 Subject: [PATCH] Centralize metrics used by compactor and add user label to compactor metrics (#6096) * Centralize metrics used by compactor and add user label to compactor metrics. Signed-off-by: Alex Le * Updated CHANGELOG Signed-off-by: Alex Le * addressed comments Signed-off-by: Alex Le * Added back missing metric Signed-off-by: Alex Le --------- Signed-off-by: Alex Le --- CHANGELOG.md | 1 + pkg/compactor/blocks_cleaner.go | 25 +- pkg/compactor/blocks_cleaner_test.go | 47 +++- pkg/compactor/compactor.go | 173 +++++++------ pkg/compactor/compactor_metrics.go | 199 +++++++++++++++ pkg/compactor/compactor_metrics_test.go | 166 +++++++++++++ pkg/compactor/compactor_test.go | 180 +++----------- pkg/compactor/shuffle_sharding_grouper.go | 95 +++---- .../shuffle_sharding_grouper_test.go | 66 +++-- pkg/compactor/syncer_metrics.go | 124 ---------- pkg/compactor/syncer_metrics_test.go | 232 ------------------ 11 files changed, 596 insertions(+), 712 deletions(-) create mode 100644 pkg/compactor/compactor_metrics.go create mode 100644 pkg/compactor/compactor_metrics_test.go delete mode 100644 pkg/compactor/syncer_metrics.go delete mode 100644 pkg/compactor/syncer_metrics_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index efa0a139f7..1a9c050e98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ * [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040 * [ENHANCEMENT] Ruler: Add support for filtering by `match` field on Rules API. #6083 * [ENHANCEMENT] Distributor: Reduce memory usage when error volume is high. #6095 +* [ENHANCEMENT] Compactor: Centralize metrics used by compactor and add user label to compactor metrics. #6096 * [ENHANCEMENT] Compactor: Add unique execution ID for each compaction cycle in log for easy debugging. #6097 * [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040 * [BUGFIX] Configsdb: Fix endline issue in db password. #5920 diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index d1a81f401c..a9c4fa7f3d 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -27,6 +27,7 @@ import ( const ( defaultDeleteBlocksConcurrency = 16 + reasonValueRetention = "retention" ) type BlocksCleanerConfig struct { @@ -56,7 +57,7 @@ type BlocksCleaner struct { runsLastSuccess prometheus.Gauge blocksCleanedTotal prometheus.Counter blocksFailedTotal prometheus.Counter - blocksMarkedForDeletion prometheus.Counter + blocksMarkedForDeletion *prometheus.CounterVec tenantBlocks *prometheus.GaugeVec tenantBlocksMarkedForDelete *prometheus.GaugeVec tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec @@ -64,7 +65,15 @@ type BlocksCleaner struct { tenantBucketIndexLastUpdate *prometheus.GaugeVec } -func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.InstrumentedBucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner { +func NewBlocksCleaner( + cfg BlocksCleanerConfig, + bucketClient objstore.InstrumentedBucket, + usersScanner *cortex_tsdb.UsersScanner, + cfgProvider ConfigProvider, + logger log.Logger, + reg prometheus.Registerer, + blocksMarkedForDeletion *prometheus.CounterVec, +) *BlocksCleaner { c := &BlocksCleaner{ cfg: cfg, bucketClient: bucketClient, @@ -95,11 +104,7 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Instrumente Name: "cortex_compactor_block_cleanup_failures_total", Help: "Total number of blocks failed to be deleted.", }), - blocksMarkedForDeletion: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: blocksMarkedForDeletionName, - Help: blocksMarkedForDeletionHelp, - ConstLabels: prometheus.Labels{"reason": "retention"}, - }), + blocksMarkedForDeletion: blocksMarkedForDeletion, // The following metrics don't have the "cortex_compactor" prefix because not strictly related to // the compactor. They're just tracked by the compactor because it's the most logical place where these @@ -374,7 +379,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b // We do not want to stop the remaining work in the cleaner if an // error occurs here. Errors are logged in the function. retention := c.cfgProvider.CompactorBlocksRetentionPeriod(userID) - c.applyUserRetentionPeriod(ctx, idx, retention, userBucket, userLogger) + c.applyUserRetentionPeriod(ctx, idx, retention, userBucket, userLogger, userID) } // Generate an updated in-memory version of the bucket index. @@ -498,7 +503,7 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map } // applyUserRetentionPeriod marks blocks for deletion which have aged past the retention period. -func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucketindex.Index, retention time.Duration, userBucket objstore.Bucket, userLogger log.Logger) { +func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucketindex.Index, retention time.Duration, userBucket objstore.Bucket, userLogger log.Logger, userID string) { // The retention period of zero is a special value indicating to never delete. if retention <= 0 { return @@ -511,7 +516,7 @@ func (c *BlocksCleaner) applyUserRetentionPeriod(ctx context.Context, idx *bucke // the cleaner will retry applying the retention in its next cycle. for _, b := range blocks { level.Info(userLogger).Log("msg", "applied retention: marking block for deletion", "block", b.ID, "maxTime", b.MaxTime) - if err := block.MarkForDeletion(ctx, userLogger, userBucket, b.ID, fmt.Sprintf("block exceeding retention of %v", retention), c.blocksMarkedForDeletion); err != nil { + if err := block.MarkForDeletion(ctx, userLogger, userBucket, b.ID, fmt.Sprintf("block exceeding retention of %v", retention), c.blocksMarkedForDeletion.WithLabelValues(userID, reasonValueRetention)); err != nil { level.Warn(userLogger).Log("msg", "failed to mark block for deletion", "block", b.ID, "err", err) } } diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 98f9565fd1..e33994df8b 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/testutil" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" @@ -79,8 +80,12 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { logger := log.NewNopLogger() scanner := tsdb.NewUsersScanner(mbucket, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() + blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: blocksMarkedForDeletionName, + Help: blocksMarkedForDeletionHelp, + }, append(commonLabels, ReasonLabelName)) - cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil) + cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion) // Clean User with no error cleaner.bucketClient = bkt @@ -176,8 +181,12 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions logger := log.NewNopLogger() scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() + blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: blocksMarkedForDeletionName, + Help: blocksMarkedForDeletionHelp, + }, append(commonLabels, ReasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -333,8 +342,12 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { logger := log.NewNopLogger() scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() + blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: blocksMarkedForDeletionName, + Help: blocksMarkedForDeletionHelp, + }, append(commonLabels, ReasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -393,8 +406,12 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { logger := log.NewNopLogger() scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() + blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: blocksMarkedForDeletionName, + Help: blocksMarkedForDeletionHelp, + }, append(commonLabels, ReasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -447,8 +464,12 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar reg := prometheus.NewPedanticRegistry() scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() + blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: blocksMarkedForDeletionName, + Help: blocksMarkedForDeletionHelp, + }, append(commonLabels, ReasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion) require.NoError(t, cleaner.cleanUsers(ctx, true)) assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(` @@ -578,8 +599,12 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { reg := prometheus.NewPedanticRegistry() scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() + blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: blocksMarkedForDeletionName, + Help: blocksMarkedForDeletionHelp, + }, append(commonLabels, ReasonLabelName)) - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion) assertBlockExists := func(user string, block ulid.ULID, expectExists bool) { exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename)) @@ -607,9 +632,6 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { # TYPE cortex_bucket_blocks_marked_for_deletion_count gauge cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 0 cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0 - # HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor. - # TYPE cortex_compactor_blocks_marked_for_deletion_total counter - cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 0 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", @@ -650,7 +672,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0 # HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor. # TYPE cortex_compactor_blocks_marked_for_deletion_total counter - cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 1 + cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-1"} 1 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", @@ -688,7 +710,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0 # HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor. # TYPE cortex_compactor_blocks_marked_for_deletion_total counter - cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 1 + cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-1"} 1 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", @@ -717,7 +739,8 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0 # HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor. # TYPE cortex_compactor_blocks_marked_for_deletion_total counter - cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 3 + cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-1"} 1 + cortex_compactor_blocks_marked_for_deletion_total{reason="retention",user="user-2"} 2 `), "cortex_bucket_blocks_count", "cortex_bucket_blocks_marked_for_deletion_count", diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 3bbe673a0b..7eac246c4d 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -57,34 +57,36 @@ var ( errInvalidShardingStrategy = errors.New("invalid sharding strategy") errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") - DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper { - return compact.NewDefaultGrouper( + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper { + return compact.NewDefaultGrouperWithMetrics( logger, bkt, cfg.AcceptMalformedIndex, true, // Enable vertical compaction - reg, - blocksMarkedForDeletion, - garbageCollectedBlocks, + compactorMetrics.compactions, + compactorMetrics.compactionRunsStarted, + compactorMetrics.compactionRunsCompleted, + compactorMetrics.compactionFailures, + compactorMetrics.verticalCompactions, + syncerMetrics.BlocksMarkedForDeletion, + syncerMetrics.GarbageCollectedBlocks, blocksMarkedForNoCompaction, metadata.NoneFunc, cfg.BlockFilesConcurrency, cfg.BlocksFetchConcurrency) } - ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper { + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper { return NewShuffleShardingGrouper( ctx, logger, bkt, cfg.AcceptMalformedIndex, true, // Enable vertical compaction - reg, - blocksMarkedForDeletion, blocksMarkedForNoCompaction, - garbageCollectedBlocks, - remainingPlannedCompactions, metadata.NoneFunc, + syncerMetrics, + compactorMetrics, cfg, ring, ringLifecycle.Addr, @@ -106,7 +108,7 @@ var ( return nil, nil, err } - plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner { + plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics) compact.Planner { return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter) } @@ -119,7 +121,7 @@ var ( return nil, nil, err } - plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter) compact.Planner { + plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner { return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed) } @@ -133,13 +135,11 @@ type BlocksGrouperFactory func( cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, - reg prometheus.Registerer, - blocksMarkedForDeletion prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, - garbageCollectedBlocks prometheus.Counter, - remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, + syncerMetrics *compact.SyncerMetrics, + compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycler *ring.Lifecycler, limit Limits, @@ -162,8 +162,10 @@ type PlannerFactory func( cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, + userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, + compactorMetrics *compactorMetrics, ) compact.Planner // Limits defines limits used by the Compactor. @@ -330,25 +332,22 @@ type Compactor struct { // Metrics. CompactorStartDurationSeconds prometheus.Gauge - compactionRunsStarted prometheus.Counter - compactionRunsInterrupted prometheus.Counter - compactionRunsCompleted prometheus.Counter - compactionRunsFailed prometheus.Counter - compactionRunsLastSuccess prometheus.Gauge - compactionRunDiscoveredTenants prometheus.Gauge - compactionRunSkippedTenants prometheus.Gauge - compactionRunSucceededTenants prometheus.Gauge - compactionRunFailedTenants prometheus.Gauge - compactionRunInterval prometheus.Gauge - blocksMarkedForDeletion prometheus.Counter - blocksMarkedForNoCompaction prometheus.Counter - garbageCollectedBlocks prometheus.Counter - remainingPlannedCompactions prometheus.Gauge + CompactionRunsStarted prometheus.Counter + CompactionRunsInterrupted prometheus.Counter + CompactionRunsCompleted prometheus.Counter + CompactionRunsFailed prometheus.Counter + CompactionRunsLastSuccess prometheus.Gauge + CompactionRunDiscoveredTenants prometheus.Gauge + CompactionRunSkippedTenants prometheus.Gauge + CompactionRunSucceededTenants prometheus.Gauge + CompactionRunFailedTenants prometheus.Gauge + CompactionRunInterval prometheus.Gauge + BlocksMarkedForNoCompaction prometheus.Counter blockVisitMarkerReadFailed prometheus.Counter blockVisitMarkerWriteFailed prometheus.Counter - // TSDB syncer metrics - syncerMetrics *syncerMetrics + // Thanos compactor metrics per user + compactorMetrics *compactorMetrics } // NewCompactor makes a new Compactor. @@ -393,12 +392,11 @@ func newCompactor( blocksCompactorFactory BlocksCompactorFactory, limits *validation.Overrides, ) (*Compactor, error) { - var remainingPlannedCompactions prometheus.Gauge + var compactorMetrics *compactorMetrics if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { - remainingPlannedCompactions = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ - Name: "cortex_compactor_remaining_planned_compactions", - Help: "Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy", - }) + compactorMetrics = newCompactorMetrics(registerer) + } else { + compactorMetrics = newDefaultCompactorMetrics(registerer) } c := &Compactor{ compactorCfg: compactorCfg, @@ -406,7 +404,6 @@ func newCompactor( parentLogger: logger, logger: log.With(logger, "component", "compactor"), registerer: registerer, - syncerMetrics: newSyncerMetrics(registerer), bucketClientFactory: bucketClientFactory, blocksGrouperFactory: blocksGrouperFactory, blocksCompactorFactory: blocksCompactorFactory, @@ -416,59 +413,50 @@ func newCompactor( Name: "cortex_compactor_start_duration_seconds", Help: "Time in seconds spent by compactor running start function", }), - compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + CompactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_runs_started_total", Help: "Total number of compaction runs started.", }), - compactionRunsInterrupted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + CompactionRunsInterrupted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_runs_interrupted_total", Help: "Total number of compaction runs interrupted.", }), - compactionRunsCompleted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + CompactionRunsCompleted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_runs_completed_total", Help: "Total number of compaction runs successfully completed.", }), - compactionRunsFailed: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + CompactionRunsFailed: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_runs_failed_total", Help: "Total number of compaction runs failed.", }), - compactionRunsLastSuccess: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + CompactionRunsLastSuccess: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ Name: "cortex_compactor_last_successful_run_timestamp_seconds", Help: "Unix timestamp of the last successful compaction run.", }), - compactionRunDiscoveredTenants: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + CompactionRunDiscoveredTenants: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ Name: "cortex_compactor_tenants_discovered", Help: "Number of tenants discovered during the current compaction run. Reset to 0 when compactor is idle.", }), - compactionRunSkippedTenants: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + CompactionRunSkippedTenants: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ Name: "cortex_compactor_tenants_skipped", Help: "Number of tenants skipped during the current compaction run. Reset to 0 when compactor is idle.", }), - compactionRunSucceededTenants: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + CompactionRunSucceededTenants: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ Name: "cortex_compactor_tenants_processing_succeeded", Help: "Number of tenants successfully processed during the current compaction run. Reset to 0 when compactor is idle.", }), - compactionRunFailedTenants: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + CompactionRunFailedTenants: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ Name: "cortex_compactor_tenants_processing_failed", Help: "Number of tenants failed processing during the current compaction run. Reset to 0 when compactor is idle.", }), - compactionRunInterval: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + CompactionRunInterval: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ Name: "cortex_compactor_compaction_interval_seconds", Help: "The configured interval on which compaction is run in seconds. Useful when compared to the last successful run metric to accurately detect multiple failed compaction runs.", }), - blocksMarkedForDeletion: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ - Name: blocksMarkedForDeletionName, - Help: blocksMarkedForDeletionHelp, - ConstLabels: prometheus.Labels{"reason": "compaction"}, - }), - blocksMarkedForNoCompaction: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + BlocksMarkedForNoCompaction: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_blocks_marked_for_no_compaction_total", Help: "Total number of blocks marked for no compact during a compaction run.", }), - garbageCollectedBlocks: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ - Name: "cortex_compactor_garbage_collected_blocks_total", - Help: "Total number of blocks marked for deletion by compactor.", - }), blockVisitMarkerReadFailed: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_block_visit_marker_read_failed", Help: "Number of block visit marker file failed to be read.", @@ -477,8 +465,8 @@ func newCompactor( Name: "cortex_compactor_block_visit_marker_write_failed", Help: "Number of block visit marker file failed to be written.", }), - remainingPlannedCompactions: remainingPlannedCompactions, - limits: limits, + limits: limits, + compactorMetrics: compactorMetrics, } if len(compactorCfg.EnabledTenants) > 0 { @@ -490,8 +478,18 @@ func newCompactor( c.Service = services.NewBasicService(c.starting, c.running, c.stopping) + if c.registerer != nil { + // Copied from Thanos, pkg/block/fetcher.go + promauto.With(c.registerer).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_compactor_meta_sync_consistency_delay_seconds", + Help: "Configured consistency delay in seconds.", + }, func() float64 { + return c.compactorCfg.ConsistencyDelay.Seconds() + }) + } + // The last successful compaction run metric is exposed as seconds since epoch, so we need to use seconds for this metric. - c.compactionRunInterval.Set(c.compactorCfg.CompactionInterval.Seconds()) + c.CompactionRunInterval.Set(c.compactorCfg.CompactionInterval.Seconds()) return c, nil } @@ -531,7 +529,7 @@ func (c *Compactor) starting(ctx context.Context) error { CleanupConcurrency: c.compactorCfg.CleanupConcurrency, BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled, TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay, - }, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, c.registerer) + }, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, c.registerer, c.compactorMetrics.syncerBlocksMarkedForDeletion) // Initialize the compactors ring if sharding is enabled. if c.compactorCfg.ShardingEnabled { @@ -648,28 +646,28 @@ func (c *Compactor) compactUsers(ctx context.Context) { failed := false interrupted := false - c.compactionRunsStarted.Inc() + c.CompactionRunsStarted.Inc() defer func() { // interruptions and successful runs are considered // mutually exclusive but we consider a run failed if any // tenant runs failed even if later runs are interrupted if !interrupted && !failed { - c.compactionRunsCompleted.Inc() - c.compactionRunsLastSuccess.SetToCurrentTime() + c.CompactionRunsCompleted.Inc() + c.CompactionRunsLastSuccess.SetToCurrentTime() } if interrupted { - c.compactionRunsInterrupted.Inc() + c.CompactionRunsInterrupted.Inc() } if failed { - c.compactionRunsFailed.Inc() + c.CompactionRunsFailed.Inc() } // Reset progress metrics once done. - c.compactionRunDiscoveredTenants.Set(0) - c.compactionRunSkippedTenants.Set(0) - c.compactionRunSucceededTenants.Set(0) - c.compactionRunFailedTenants.Set(0) + c.CompactionRunDiscoveredTenants.Set(0) + c.CompactionRunSkippedTenants.Set(0) + c.CompactionRunSucceededTenants.Set(0) + c.CompactionRunFailedTenants.Set(0) }() level.Info(c.logger).Log("msg", "discovering users from bucket") @@ -681,7 +679,7 @@ func (c *Compactor) compactUsers(ctx context.Context) { } level.Info(c.logger).Log("msg", "discovered users from bucket", "users", len(users)) - c.compactionRunDiscoveredTenants.Set(float64(len(users))) + c.CompactionRunDiscoveredTenants.Set(float64(len(users))) // When starting multiple compactor replicas nearly at the same time, running in a cluster with // a large number of tenants, we may end up in a situation where the 1st user is compacted by @@ -702,11 +700,11 @@ func (c *Compactor) compactUsers(ctx context.Context) { // Ensure the user ID belongs to our shard. if owned, err := c.ownUserForCompaction(userID); err != nil { - c.compactionRunSkippedTenants.Inc() + c.CompactionRunSkippedTenants.Inc() level.Warn(c.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err) continue } else if !owned { - c.compactionRunSkippedTenants.Inc() + c.CompactionRunSkippedTenants.Inc() level.Debug(c.logger).Log("msg", "skipping user because it is not owned by this shard", "user", userID) continue } @@ -714,7 +712,7 @@ func (c *Compactor) compactUsers(ctx context.Context) { // Skipping compaction if the bucket index failed to sync due to CMK errors. if idxs, err := bucketindex.ReadSyncStatus(ctx, c.bucketClient, userID, util_log.WithUserID(userID, c.logger)); err == nil { if idxs.Status == bucketindex.CustomerManagedKeyError { - c.compactionRunSkippedTenants.Inc() + c.CompactionRunSkippedTenants.Inc() level.Info(c.logger).Log("msg", "skipping compactUser due CustomerManagedKeyError", "user", userID) continue } @@ -723,11 +721,11 @@ func (c *Compactor) compactUsers(ctx context.Context) { ownedUsers[userID] = struct{}{} if markedForDeletion, err := cortex_tsdb.TenantDeletionMarkExists(ctx, c.bucketClient, userID); err != nil { - c.compactionRunSkippedTenants.Inc() + c.CompactionRunSkippedTenants.Inc() level.Warn(c.logger).Log("msg", "unable to check if user is marked for deletion", "user", userID, "err", err) continue } else if markedForDeletion { - c.compactionRunSkippedTenants.Inc() + c.CompactionRunSkippedTenants.Inc() level.Debug(c.logger).Log("msg", "skipping user because it is marked for deletion", "user", userID) continue } @@ -742,13 +740,13 @@ func (c *Compactor) compactUsers(ctx context.Context) { return } - c.compactionRunFailedTenants.Inc() + c.CompactionRunFailedTenants.Inc() failed = true level.Error(c.logger).Log("msg", "failed to compact user blocks", "user", userID, "err", err) continue } - c.compactionRunSucceededTenants.Inc() + c.CompactionRunSucceededTenants.Inc() level.Info(c.logger).Log("msg", "successfully compacted user blocks", "user", userID) } @@ -809,7 +807,6 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { bucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.limits) reg := prometheus.NewRegistry() - defer c.syncerMetrics.gatherThanosSyncerMetrics(reg) ulogger := util_log.WithUserID(userID, c.logger) ulogger = util_log.WithExecutionID(ulid.MustNew(ulid.Now(), crypto_rand.Reader).String(), ulogger) @@ -845,13 +842,14 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { return cortex_tsdb.ErrBlockDiscoveryStrategy } - fetcher, err := block.NewMetaFetcher( + fetcher, err := block.NewMetaFetcherWithMetrics( ulogger, c.compactorCfg.MetaSyncConcurrency, bucket, blockLister, c.metaSyncDirForUser(userID), - reg, + c.compactorMetrics.getBaseFetcherMetrics(), + c.compactorMetrics.getMetaFetcherMetrics(), // List of filters to apply (order matters). []block.MetadataFilter{ // Remove the ingester ID because we don't shard blocks anymore, while still @@ -867,15 +865,14 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { return err } - syncer, err := compact.NewMetaSyncer( + syncerMetrics := c.compactorMetrics.getSyncerMetrics(userID) + syncer, err := compact.NewMetaSyncerWithMetrics( ulogger, - reg, + syncerMetrics, bucket, fetcher, deduplicateBlocksFilter, ignoreDeletionMarkFilter, - c.blocksMarkedForDeletion, - c.garbageCollectedBlocks, ) if err != nil { return errors.Wrap(err, "failed to create syncer") @@ -886,8 +883,8 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { compactor, err := compact.NewBucketCompactor( ulogger, syncer, - c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter), - c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed), + c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, c.BlocksMarkedForNoCompaction, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, syncerMetrics, c.compactorMetrics, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter), + c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, userID, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.compactorMetrics), c.blocksCompactor, c.compactDirForUser(userID), bucket, diff --git a/pkg/compactor/compactor_metrics.go b/pkg/compactor/compactor_metrics.go new file mode 100644 index 0000000000..3567225919 --- /dev/null +++ b/pkg/compactor/compactor_metrics.go @@ -0,0 +1,199 @@ +package compactor + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/extprom" +) + +type compactorMetrics struct { + reg prometheus.Registerer + commonLabels []string + compactionLabels []string + + // block.BaseFetcherMetrics + baseFetcherSyncs *prometheus.CounterVec + + // block.FetcherMetrics + metaFetcherSyncs *prometheus.CounterVec + metaFetcherSyncFailures *prometheus.CounterVec + metaFetcherSyncDuration *prometheus.HistogramVec + metaFetcherSynced *extprom.TxGaugeVec + metaFetcherModified *extprom.TxGaugeVec + + // compact.SyncerMetrics + syncerGarbageCollectedBlocks *prometheus.CounterVec + syncerGarbageCollections *prometheus.CounterVec + syncerGarbageCollectionFailures *prometheus.CounterVec + syncerGarbageCollectionDuration *prometheus.HistogramVec + syncerBlocksMarkedForDeletion *prometheus.CounterVec + + compactions *prometheus.CounterVec + compactionPlanned *prometheus.CounterVec + compactionRunsStarted *prometheus.CounterVec + compactionRunsCompleted *prometheus.CounterVec + compactionFailures *prometheus.CounterVec + verticalCompactions *prometheus.CounterVec + remainingPlannedCompactions *prometheus.GaugeVec +} + +const ( + UserLabelName = "user" + TimeRangeLabelName = "time_range_milliseconds" + ReasonLabelName = "reason" +) + +var ( + commonLabels = []string{UserLabelName} + compactionLabels = []string{TimeRangeLabelName} +) + +func newDefaultCompactorMetrics(reg prometheus.Registerer) *compactorMetrics { + return newCompactorMetricsWithLabels(reg, commonLabels, []string{"resolution"}) +} + +func newCompactorMetrics(reg prometheus.Registerer) *compactorMetrics { + return newCompactorMetricsWithLabels(reg, commonLabels, append(commonLabels, compactionLabels...)) +} + +func newCompactorMetricsWithLabels(reg prometheus.Registerer, commonLabels []string, compactionLabels []string) *compactorMetrics { + var m compactorMetrics + m.reg = reg + m.commonLabels = commonLabels + m.compactionLabels = compactionLabels + + // Copied from Thanos, pkg/block/fetcher.go + m.baseFetcherSyncs = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: block.FetcherSubSys, + Name: "cortex_compactor_meta_base_syncs_total", + Help: "Total blocks metadata synchronization attempts by base Fetcher.", + }, nil) + + // Copied from Thanos, pkg/block/fetcher.go + m.metaFetcherSyncs = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: block.FetcherSubSys, + Name: "cortex_compactor_meta_syncs_total", + Help: "Total blocks metadata synchronization attempts.", + }, nil) + m.metaFetcherSyncFailures = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: block.FetcherSubSys, + Name: "cortex_compactor_meta_sync_failures_total", + Help: "Total blocks metadata synchronization failures.", + }, nil) + m.metaFetcherSyncDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Subsystem: block.FetcherSubSys, + Name: "cortex_compactor_meta_sync_duration_seconds", + Help: "Duration of the blocks metadata synchronization in seconds.", + Buckets: []float64{0.01, 1, 10, 100, 300, 600, 1000}, + }, nil) + m.metaFetcherSynced = extprom.NewTxGaugeVec( + reg, + prometheus.GaugeOpts{ + Subsystem: block.FetcherSubSys, + Name: "cortex_compactor_meta_synced", + Help: "Number of block metadata synced", + }, + []string{"state"}, + block.DefaultSyncedStateLabelValues()..., + ) + m.metaFetcherModified = extprom.NewTxGaugeVec( + reg, + prometheus.GaugeOpts{ + Subsystem: block.FetcherSubSys, + Name: "cortex_compactor_meta_modified", + Help: "Number of blocks whose metadata changed", + }, + []string{"modified"}, + block.DefaultModifiedLabelValues()..., + ) + + // Copied from Thanos, pkg/compact/compact.go. + m.syncerGarbageCollectedBlocks = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_compactor_garbage_collected_blocks_total", + Help: "Total number of blocks marked for deletion by compactor.", + }, nil) + m.syncerGarbageCollections = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_compactor_garbage_collection_total", + Help: "Total number of garbage collection operations.", + }, nil) + m.syncerGarbageCollectionFailures = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_compactor_garbage_collection_failures_total", + Help: "Total number of failed garbage collection operations.", + }, nil) + m.syncerGarbageCollectionDuration = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_compactor_garbage_collection_duration_seconds", + Help: "Time it took to perform garbage collection iteration.", + }, nil) + m.syncerBlocksMarkedForDeletion = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: blocksMarkedForDeletionName, + Help: blocksMarkedForDeletionHelp, + }, append(commonLabels, ReasonLabelName)) + + m.compactions = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_compactor_group_compactions_total", + Help: "Total number of group compaction attempts that resulted in a new block.", + }, compactionLabels) + m.compactionPlanned = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_compact_group_compaction_planned_total", + Help: "Total number of compaction planned.", + }, compactionLabels) + m.compactionRunsStarted = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_compactor_group_compaction_runs_started_total", + Help: "Total number of group compaction attempts.", + }, compactionLabels) + m.compactionRunsCompleted = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_compactor_group_compaction_runs_completed_total", + Help: "Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction.", + }, compactionLabels) + m.compactionFailures = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_compactor_group_compactions_failures_total", + Help: "Total number of failed group compactions.", + }, compactionLabels) + m.verticalCompactions = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_compactor_group_vertical_compactions_total", + Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", + }, compactionLabels) + m.remainingPlannedCompactions = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_compactor_remaining_planned_compactions", + Help: "Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy", + }, commonLabels) + + return &m +} + +func (m *compactorMetrics) getBaseFetcherMetrics() *block.BaseFetcherMetrics { + var baseFetcherMetrics block.BaseFetcherMetrics + baseFetcherMetrics.Syncs = m.baseFetcherSyncs.WithLabelValues() + return &baseFetcherMetrics +} + +func (m *compactorMetrics) getMetaFetcherMetrics() *block.FetcherMetrics { + var fetcherMetrics block.FetcherMetrics + fetcherMetrics.Syncs = m.metaFetcherSyncs.WithLabelValues() + fetcherMetrics.SyncFailures = m.metaFetcherSyncFailures.WithLabelValues() + fetcherMetrics.SyncDuration = m.metaFetcherSyncDuration.WithLabelValues() + fetcherMetrics.Synced = m.metaFetcherSynced + fetcherMetrics.Modified = m.metaFetcherModified + return &fetcherMetrics +} + +func (m *compactorMetrics) getSyncerMetrics(userID string) *compact.SyncerMetrics { + var syncerMetrics compact.SyncerMetrics + labelValues := m.getCommonLabelValues(userID) + syncerMetrics.GarbageCollectedBlocks = m.syncerGarbageCollectedBlocks.WithLabelValues() + syncerMetrics.GarbageCollections = m.syncerGarbageCollections.WithLabelValues() + syncerMetrics.GarbageCollectionFailures = m.syncerGarbageCollectionFailures.WithLabelValues() + syncerMetrics.GarbageCollectionDuration = m.syncerGarbageCollectionDuration.WithLabelValues() + syncerMetrics.BlocksMarkedForDeletion = m.syncerBlocksMarkedForDeletion.WithLabelValues(append(labelValues, "compaction")...) + return &syncerMetrics +} + +func (m *compactorMetrics) getCommonLabelValues(userID string) []string { + var labelValues []string + if len(m.commonLabels) > 0 { + labelValues = append(labelValues, userID) + } + return labelValues +} diff --git a/pkg/compactor/compactor_metrics_test.go b/pkg/compactor/compactor_metrics_test.go new file mode 100644 index 0000000000..8667d892cf --- /dev/null +++ b/pkg/compactor/compactor_metrics_test.go @@ -0,0 +1,166 @@ +package compactor + +import ( + "bytes" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func TestSyncerMetrics(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + cm := newCompactorMetricsWithLabels(reg, commonLabels, commonLabels) + + generateTestData(cm, 1234) + generateTestData(cm, 7654) + generateTestData(cm, 2222) + + err := testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP blocks_meta_cortex_compactor_meta_base_syncs_total Total blocks metadata synchronization attempts by base Fetcher. + # TYPE blocks_meta_cortex_compactor_meta_base_syncs_total counter + blocks_meta_cortex_compactor_meta_base_syncs_total 11110 + # HELP blocks_meta_cortex_compactor_meta_modified Number of blocks whose metadata changed + # TYPE blocks_meta_cortex_compactor_meta_modified gauge + blocks_meta_cortex_compactor_meta_modified{modified="replica-label-removed"} 0 + # HELP blocks_meta_cortex_compactor_meta_sync_duration_seconds Duration of the blocks metadata synchronization in seconds. + # TYPE blocks_meta_cortex_compactor_meta_sync_duration_seconds histogram + blocks_meta_cortex_compactor_meta_sync_duration_seconds_bucket{le="0.01"} 0 + blocks_meta_cortex_compactor_meta_sync_duration_seconds_bucket{le="1"} 2 + blocks_meta_cortex_compactor_meta_sync_duration_seconds_bucket{le="10"} 3 + blocks_meta_cortex_compactor_meta_sync_duration_seconds_bucket{le="100"} 3 + blocks_meta_cortex_compactor_meta_sync_duration_seconds_bucket{le="300"} 3 + blocks_meta_cortex_compactor_meta_sync_duration_seconds_bucket{le="600"} 3 + blocks_meta_cortex_compactor_meta_sync_duration_seconds_bucket{le="1000"} 3 + blocks_meta_cortex_compactor_meta_sync_duration_seconds_bucket{le="+Inf"} 3 + blocks_meta_cortex_compactor_meta_sync_duration_seconds_sum 4.444 + blocks_meta_cortex_compactor_meta_sync_duration_seconds_count 3 + # HELP blocks_meta_cortex_compactor_meta_sync_failures_total Total blocks metadata synchronization failures. + # TYPE blocks_meta_cortex_compactor_meta_sync_failures_total counter + blocks_meta_cortex_compactor_meta_sync_failures_total 33330 + # HELP blocks_meta_cortex_compactor_meta_synced Number of block metadata synced + # TYPE blocks_meta_cortex_compactor_meta_synced gauge + blocks_meta_cortex_compactor_meta_synced{state="corrupted-meta-json"} 0 + blocks_meta_cortex_compactor_meta_synced{state="duplicate"} 0 + blocks_meta_cortex_compactor_meta_synced{state="failed"} 0 + blocks_meta_cortex_compactor_meta_synced{state="label-excluded"} 0 + blocks_meta_cortex_compactor_meta_synced{state="loaded"} 0 + blocks_meta_cortex_compactor_meta_synced{state="marked-for-deletion"} 0 + blocks_meta_cortex_compactor_meta_synced{state="marked-for-no-compact"} 0 + blocks_meta_cortex_compactor_meta_synced{state="no-meta-json"} 0 + blocks_meta_cortex_compactor_meta_synced{state="time-excluded"} 0 + blocks_meta_cortex_compactor_meta_synced{state="too-fresh"} 0 + # HELP blocks_meta_cortex_compactor_meta_syncs_total Total blocks metadata synchronization attempts. + # TYPE blocks_meta_cortex_compactor_meta_syncs_total counter + blocks_meta_cortex_compactor_meta_syncs_total 22220 + # HELP cortex_compact_group_compaction_planned_total Total number of compaction planned. + # TYPE cortex_compact_group_compaction_planned_total counter + cortex_compact_group_compaction_planned_total{user="aaa"} 211090 + cortex_compact_group_compaction_planned_total{user="bbb"} 222200 + cortex_compact_group_compaction_planned_total{user="ccc"} 233310 + # HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor. + # TYPE cortex_compactor_blocks_marked_for_deletion_total counter + cortex_compactor_blocks_marked_for_deletion_total{reason="compaction",user="aaa"} 144430 + cortex_compactor_blocks_marked_for_deletion_total{reason="compaction",user="bbb"} 155540 + cortex_compactor_blocks_marked_for_deletion_total{reason="compaction",user="ccc"} 166650 + # HELP cortex_compactor_garbage_collected_blocks_total Total number of blocks marked for deletion by compactor. + # TYPE cortex_compactor_garbage_collected_blocks_total counter + cortex_compactor_garbage_collected_blocks_total 99990 + # HELP cortex_compactor_garbage_collection_duration_seconds Time it took to perform garbage collection iteration. + # TYPE cortex_compactor_garbage_collection_duration_seconds histogram + cortex_compactor_garbage_collection_duration_seconds_bucket{le="0.005"} 0 + cortex_compactor_garbage_collection_duration_seconds_bucket{le="0.01"} 0 + cortex_compactor_garbage_collection_duration_seconds_bucket{le="0.025"} 0 + cortex_compactor_garbage_collection_duration_seconds_bucket{le="0.05"} 0 + cortex_compactor_garbage_collection_duration_seconds_bucket{le="0.1"} 0 + cortex_compactor_garbage_collection_duration_seconds_bucket{le="0.25"} 0 + cortex_compactor_garbage_collection_duration_seconds_bucket{le="0.5"} 0 + cortex_compactor_garbage_collection_duration_seconds_bucket{le="1"} 0 + cortex_compactor_garbage_collection_duration_seconds_bucket{le="2.5"} 1 + cortex_compactor_garbage_collection_duration_seconds_bucket{le="5"} 2 + cortex_compactor_garbage_collection_duration_seconds_bucket{le="10"} 3 + cortex_compactor_garbage_collection_duration_seconds_bucket{le="+Inf"} 3 + cortex_compactor_garbage_collection_duration_seconds_sum 13.331999999999999 + cortex_compactor_garbage_collection_duration_seconds_count 3 + # HELP cortex_compactor_garbage_collection_failures_total Total number of failed garbage collection operations. + # TYPE cortex_compactor_garbage_collection_failures_total counter + cortex_compactor_garbage_collection_failures_total 122210 + # HELP cortex_compactor_garbage_collection_total Total number of garbage collection operations. + # TYPE cortex_compactor_garbage_collection_total counter + cortex_compactor_garbage_collection_total 111100 + # HELP cortex_compactor_group_compaction_runs_completed_total Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction. + # TYPE cortex_compactor_group_compaction_runs_completed_total counter + cortex_compactor_group_compaction_runs_completed_total{user="aaa"} 277750 + cortex_compactor_group_compaction_runs_completed_total{user="bbb"} 288860 + cortex_compactor_group_compaction_runs_completed_total{user="ccc"} 299970 + # HELP cortex_compactor_group_compaction_runs_started_total Total number of group compaction attempts. + # TYPE cortex_compactor_group_compaction_runs_started_total counter + cortex_compactor_group_compaction_runs_started_total{user="aaa"} 244420 + cortex_compactor_group_compaction_runs_started_total{user="bbb"} 255530 + cortex_compactor_group_compaction_runs_started_total{user="ccc"} 266640 + # HELP cortex_compactor_group_compactions_failures_total Total number of failed group compactions. + # TYPE cortex_compactor_group_compactions_failures_total counter + cortex_compactor_group_compactions_failures_total{user="aaa"} 311080 + cortex_compactor_group_compactions_failures_total{user="bbb"} 322190 + cortex_compactor_group_compactions_failures_total{user="ccc"} 333300 + # HELP cortex_compactor_group_compactions_total Total number of group compaction attempts that resulted in a new block. + # TYPE cortex_compactor_group_compactions_total counter + cortex_compactor_group_compactions_total{user="aaa"} 177760 + cortex_compactor_group_compactions_total{user="bbb"} 188870 + cortex_compactor_group_compactions_total{user="ccc"} 199980 + # HELP cortex_compactor_group_vertical_compactions_total Total number of group compaction attempts that resulted in a new block based on overlapping blocks. + # TYPE cortex_compactor_group_vertical_compactions_total counter + cortex_compactor_group_vertical_compactions_total{user="aaa"} 344410 + cortex_compactor_group_vertical_compactions_total{user="bbb"} 355520 + cortex_compactor_group_vertical_compactions_total{user="ccc"} 366630 + # HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy + # TYPE cortex_compactor_remaining_planned_compactions gauge + cortex_compactor_remaining_planned_compactions{user="aaa"} 377740 + cortex_compactor_remaining_planned_compactions{user="bbb"} 388850 + cortex_compactor_remaining_planned_compactions{user="ccc"} 399960 + `)) + require.NoError(t, err) + +} + +func generateTestData(cm *compactorMetrics, base float64) { + cm.baseFetcherSyncs.WithLabelValues().Add(1 * base) + cm.metaFetcherSyncs.WithLabelValues().Add(2 * base) + cm.metaFetcherSyncFailures.WithLabelValues().Add(3 * base) + cm.metaFetcherSyncDuration.WithLabelValues().Observe(4 * base / 10000) + cm.metaFetcherSynced.WithLabelValues("loaded").Add(5 * base) + cm.metaFetcherSynced.WithLabelValues("no-meta-json").Add(6 * base) + cm.metaFetcherSynced.WithLabelValues("failed").Add(7 * base) + cm.metaFetcherModified.WithLabelValues("replica-label-removed").Add(8 * base) + + cm.syncerGarbageCollectedBlocks.WithLabelValues().Add(9 * base) + cm.syncerGarbageCollections.WithLabelValues().Add(10 * base) + cm.syncerGarbageCollectionFailures.WithLabelValues().Add(11 * base) + cm.syncerGarbageCollectionDuration.WithLabelValues().Observe(12 * base / 10000) + cm.syncerBlocksMarkedForDeletion.WithLabelValues("aaa", "compaction").Add(13 * base) + cm.syncerBlocksMarkedForDeletion.WithLabelValues("bbb", "compaction").Add(14 * base) + cm.syncerBlocksMarkedForDeletion.WithLabelValues("ccc", "compaction").Add(15 * base) + + cm.compactions.WithLabelValues("aaa").Add(16 * base) + cm.compactions.WithLabelValues("bbb").Add(17 * base) + cm.compactions.WithLabelValues("ccc").Add(18 * base) + cm.compactionPlanned.WithLabelValues("aaa").Add(19 * base) + cm.compactionPlanned.WithLabelValues("bbb").Add(20 * base) + cm.compactionPlanned.WithLabelValues("ccc").Add(21 * base) + cm.compactionRunsStarted.WithLabelValues("aaa").Add(22 * base) + cm.compactionRunsStarted.WithLabelValues("bbb").Add(23 * base) + cm.compactionRunsStarted.WithLabelValues("ccc").Add(24 * base) + cm.compactionRunsCompleted.WithLabelValues("aaa").Add(25 * base) + cm.compactionRunsCompleted.WithLabelValues("bbb").Add(26 * base) + cm.compactionRunsCompleted.WithLabelValues("ccc").Add(27 * base) + cm.compactionFailures.WithLabelValues("aaa").Add(28 * base) + cm.compactionFailures.WithLabelValues("bbb").Add(29 * base) + cm.compactionFailures.WithLabelValues("ccc").Add(30 * base) + cm.verticalCompactions.WithLabelValues("aaa").Add(31 * base) + cm.verticalCompactions.WithLabelValues("bbb").Add(32 * base) + cm.verticalCompactions.WithLabelValues("ccc").Add(33 * base) + cm.remainingPlannedCompactions.WithLabelValues("aaa").Add(34 * base) + cm.remainingPlannedCompactions.WithLabelValues("bbb").Add(35 * base) + cm.remainingPlannedCompactions.WithLabelValues("ccc").Add(36 * base) +} diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 9cc92ae63e..6015adff86 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -186,7 +186,7 @@ func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) { // Wait until a run has completed. cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c.compactionRunsCompleted) + return prom_testutil.ToFloat64(c.CompactionRunsCompleted) }) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) @@ -206,12 +206,12 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { // Wait until a run has completed. cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c.compactionRunsCompleted) + return prom_testutil.ToFloat64(c.CompactionRunsCompleted) }) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) - assert.Equal(t, prom_testutil.ToFloat64(c.compactionRunInterval), cfg.CompactionInterval.Seconds()) + assert.Equal(t, prom_testutil.ToFloat64(c.CompactionRunInterval), cfg.CompactionInterval.Seconds()) assert.ElementsMatch(t, []string{ `level=info component=cleaner msg="started blocks cleanup and maintenance"`, @@ -234,62 +234,6 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { # HELP cortex_compactor_runs_failed_total Total number of compaction runs failed. cortex_compactor_runs_failed_total 0 - # HELP cortex_compactor_garbage_collected_blocks_total Total number of blocks marked for deletion by compactor. - # TYPE cortex_compactor_garbage_collected_blocks_total counter - cortex_compactor_garbage_collected_blocks_total 0 - - # HELP cortex_compactor_garbage_collection_duration_seconds Time it took to perform garbage collection iteration. - # TYPE cortex_compactor_garbage_collection_duration_seconds histogram - cortex_compactor_garbage_collection_duration_seconds_bucket{le="+Inf"} 0 - cortex_compactor_garbage_collection_duration_seconds_sum 0 - cortex_compactor_garbage_collection_duration_seconds_count 0 - - # HELP cortex_compactor_garbage_collection_failures_total Total number of failed garbage collection operations. - # TYPE cortex_compactor_garbage_collection_failures_total counter - cortex_compactor_garbage_collection_failures_total 0 - - # HELP cortex_compactor_garbage_collection_total Total number of garbage collection operations. - # TYPE cortex_compactor_garbage_collection_total counter - cortex_compactor_garbage_collection_total 0 - - # HELP cortex_compactor_meta_sync_consistency_delay_seconds Configured consistency delay in seconds. - # TYPE cortex_compactor_meta_sync_consistency_delay_seconds gauge - cortex_compactor_meta_sync_consistency_delay_seconds 0 - - # HELP cortex_compactor_meta_sync_duration_seconds Duration of the blocks metadata synchronization in seconds. - # TYPE cortex_compactor_meta_sync_duration_seconds histogram - cortex_compactor_meta_sync_duration_seconds_bucket{le="+Inf"} 0 - cortex_compactor_meta_sync_duration_seconds_sum 0 - cortex_compactor_meta_sync_duration_seconds_count 0 - - # HELP cortex_compactor_meta_sync_failures_total Total blocks metadata synchronization failures. - # TYPE cortex_compactor_meta_sync_failures_total counter - cortex_compactor_meta_sync_failures_total 0 - - # HELP cortex_compactor_meta_syncs_total Total blocks metadata synchronization attempts. - # TYPE cortex_compactor_meta_syncs_total counter - cortex_compactor_meta_syncs_total 0 - - # HELP cortex_compactor_group_compaction_runs_completed_total Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction. - # TYPE cortex_compactor_group_compaction_runs_completed_total counter - cortex_compactor_group_compaction_runs_completed_total 0 - - # HELP cortex_compactor_group_compaction_runs_started_total Total number of group compaction attempts. - # TYPE cortex_compactor_group_compaction_runs_started_total counter - cortex_compactor_group_compaction_runs_started_total 0 - - # HELP cortex_compactor_group_compactions_failures_total Total number of failed group compactions. - # TYPE cortex_compactor_group_compactions_failures_total counter - cortex_compactor_group_compactions_failures_total 0 - - # HELP cortex_compactor_group_compactions_total Total number of group compaction attempts that resulted in a new block. - # TYPE cortex_compactor_group_compactions_total counter - cortex_compactor_group_compactions_total 0 - - # HELP cortex_compactor_group_vertical_compactions_total Total number of group compaction attempts that resulted in a new block based on overlapping blocks. - # TYPE cortex_compactor_group_vertical_compactions_total counter - cortex_compactor_group_vertical_compactions_total 0 - # TYPE cortex_compactor_block_cleanup_failures_total counter # HELP cortex_compactor_block_cleanup_failures_total Total number of blocks failed to be deleted. cortex_compactor_block_cleanup_failures_total 0 @@ -298,15 +242,14 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { # TYPE cortex_compactor_blocks_cleaned_total counter cortex_compactor_blocks_cleaned_total 0 - # HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor. - # TYPE cortex_compactor_blocks_marked_for_deletion_total counter - cortex_compactor_blocks_marked_for_deletion_total{reason="compaction"} 0 - cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 0 - # HELP cortex_compactor_blocks_marked_for_no_compaction_total Total number of blocks marked for no compact during a compaction run. # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter cortex_compactor_blocks_marked_for_no_compaction_total 0 + # HELP cortex_compactor_meta_sync_consistency_delay_seconds Configured consistency delay in seconds. + # TYPE cortex_compactor_meta_sync_consistency_delay_seconds gauge + cortex_compactor_meta_sync_consistency_delay_seconds 0 + # TYPE cortex_compactor_block_cleanup_started_total counter # HELP cortex_compactor_block_cleanup_started_total Total number of blocks cleanup runs started. cortex_compactor_block_cleanup_started_total 1 @@ -358,7 +301,7 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket // Wait until all retry attempts have completed. cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c.compactionRunsFailed) + return prom_testutil.ToFloat64(c.CompactionRunsFailed) }) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) @@ -387,62 +330,6 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket # HELP cortex_compactor_runs_failed_total Total number of compaction runs failed. cortex_compactor_runs_failed_total 1 - # HELP cortex_compactor_garbage_collected_blocks_total Total number of blocks marked for deletion by compactor. - # TYPE cortex_compactor_garbage_collected_blocks_total counter - cortex_compactor_garbage_collected_blocks_total 0 - - # HELP cortex_compactor_garbage_collection_duration_seconds Time it took to perform garbage collection iteration. - # TYPE cortex_compactor_garbage_collection_duration_seconds histogram - cortex_compactor_garbage_collection_duration_seconds_bucket{le="+Inf"} 0 - cortex_compactor_garbage_collection_duration_seconds_sum 0 - cortex_compactor_garbage_collection_duration_seconds_count 0 - - # HELP cortex_compactor_garbage_collection_failures_total Total number of failed garbage collection operations. - # TYPE cortex_compactor_garbage_collection_failures_total counter - cortex_compactor_garbage_collection_failures_total 0 - - # HELP cortex_compactor_garbage_collection_total Total number of garbage collection operations. - # TYPE cortex_compactor_garbage_collection_total counter - cortex_compactor_garbage_collection_total 0 - - # HELP cortex_compactor_meta_sync_consistency_delay_seconds Configured consistency delay in seconds. - # TYPE cortex_compactor_meta_sync_consistency_delay_seconds gauge - cortex_compactor_meta_sync_consistency_delay_seconds 0 - - # HELP cortex_compactor_meta_sync_duration_seconds Duration of the blocks metadata synchronization in seconds. - # TYPE cortex_compactor_meta_sync_duration_seconds histogram - cortex_compactor_meta_sync_duration_seconds_bucket{le="+Inf"} 0 - cortex_compactor_meta_sync_duration_seconds_sum 0 - cortex_compactor_meta_sync_duration_seconds_count 0 - - # HELP cortex_compactor_meta_sync_failures_total Total blocks metadata synchronization failures. - # TYPE cortex_compactor_meta_sync_failures_total counter - cortex_compactor_meta_sync_failures_total 0 - - # HELP cortex_compactor_meta_syncs_total Total blocks metadata synchronization attempts. - # TYPE cortex_compactor_meta_syncs_total counter - cortex_compactor_meta_syncs_total 0 - - # HELP cortex_compactor_group_compaction_runs_completed_total Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction. - # TYPE cortex_compactor_group_compaction_runs_completed_total counter - cortex_compactor_group_compaction_runs_completed_total 0 - - # HELP cortex_compactor_group_compaction_runs_started_total Total number of group compaction attempts. - # TYPE cortex_compactor_group_compaction_runs_started_total counter - cortex_compactor_group_compaction_runs_started_total 0 - - # HELP cortex_compactor_group_compactions_failures_total Total number of failed group compactions. - # TYPE cortex_compactor_group_compactions_failures_total counter - cortex_compactor_group_compactions_failures_total 0 - - # HELP cortex_compactor_group_compactions_total Total number of group compaction attempts that resulted in a new block. - # TYPE cortex_compactor_group_compactions_total counter - cortex_compactor_group_compactions_total 0 - - # HELP cortex_compactor_group_vertical_compactions_total Total number of group compaction attempts that resulted in a new block based on overlapping blocks. - # TYPE cortex_compactor_group_vertical_compactions_total counter - cortex_compactor_group_vertical_compactions_total 0 - # TYPE cortex_compactor_block_cleanup_failures_total counter # HELP cortex_compactor_block_cleanup_failures_total Total number of blocks failed to be deleted. cortex_compactor_block_cleanup_failures_total 0 @@ -451,11 +338,6 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket # TYPE cortex_compactor_blocks_cleaned_total counter cortex_compactor_blocks_cleaned_total 0 - # HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor. - # TYPE cortex_compactor_blocks_marked_for_deletion_total counter - cortex_compactor_blocks_marked_for_deletion_total{reason="compaction"} 0 - cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 0 - # TYPE cortex_compactor_block_cleanup_started_total counter # HELP cortex_compactor_block_cleanup_started_total Total number of blocks cleanup runs started. cortex_compactor_block_cleanup_started_total 1 @@ -464,6 +346,10 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket # TYPE cortex_compactor_blocks_marked_for_no_compaction_total counter cortex_compactor_blocks_marked_for_no_compaction_total 0 + # HELP cortex_compactor_meta_sync_consistency_delay_seconds Configured consistency delay in seconds. + # TYPE cortex_compactor_meta_sync_consistency_delay_seconds gauge + cortex_compactor_meta_sync_consistency_delay_seconds 0 + # TYPE cortex_compactor_block_cleanup_completed_total counter # HELP cortex_compactor_block_cleanup_completed_total Total number of blocks cleanup runs successfully completed. cortex_compactor_block_cleanup_completed_total 0 @@ -530,7 +416,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( // Wait until all retry attempts have completed. cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c.compactionRunsFailed) + return prom_testutil.ToFloat64(c.CompactionRunsFailed) }) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) @@ -590,7 +476,7 @@ func TestCompactor_ShouldCompactAndRemoveUserFolder(t *testing.T) { // Wait until a run has completed. cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c.compactionRunsCompleted) + return prom_testutil.ToFloat64(c.CompactionRunsCompleted) }) _, err := os.Stat(c.compactDirForUser("user-1")) @@ -652,7 +538,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { // Wait until a run has completed. cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c.compactionRunsCompleted) + return prom_testutil.ToFloat64(c.CompactionRunsCompleted) }) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) @@ -717,8 +603,8 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { # HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor. # TYPE cortex_compactor_blocks_marked_for_deletion_total counter - cortex_compactor_blocks_marked_for_deletion_total{reason="compaction"} 0 - cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 0 + cortex_compactor_blocks_marked_for_deletion_total{reason="compaction",user="user-1"} 0 + cortex_compactor_blocks_marked_for_deletion_total{reason="compaction",user="user-2"} 0 # TYPE cortex_compactor_block_cleanup_started_total counter # HELP cortex_compactor_block_cleanup_started_total Total number of blocks cleanup runs started. @@ -788,7 +674,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { // Wait until a run has completed. cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c.compactionRunsCompleted) + return prom_testutil.ToFloat64(c.CompactionRunsCompleted) }) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) @@ -846,8 +732,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { # HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor. # TYPE cortex_compactor_blocks_marked_for_deletion_total counter - cortex_compactor_blocks_marked_for_deletion_total{reason="compaction"} 0 - cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 0 + cortex_compactor_blocks_marked_for_deletion_total{reason="compaction",user="user-1"} 0 # TYPE cortex_compactor_block_cleanup_started_total counter # HELP cortex_compactor_block_cleanup_started_total Total number of blocks cleanup runs started. @@ -923,7 +808,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c.compactionRunsCompleted) + return prom_testutil.ToFloat64(c.CompactionRunsCompleted) }) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) @@ -982,7 +867,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) // Wait until a run has completed. cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c.compactionRunsCompleted) + return prom_testutil.ToFloat64(c.CompactionRunsCompleted) }) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) @@ -1035,11 +920,6 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) # TYPE cortex_compactor_blocks_cleaned_total counter cortex_compactor_blocks_cleaned_total 1 - # HELP cortex_compactor_blocks_marked_for_deletion_total Total number of blocks marked for deletion in compactor. - # TYPE cortex_compactor_blocks_marked_for_deletion_total counter - cortex_compactor_blocks_marked_for_deletion_total{reason="compaction"} 0 - cortex_compactor_blocks_marked_for_deletion_total{reason="retention"} 0 - # TYPE cortex_compactor_block_cleanup_started_total counter # HELP cortex_compactor_block_cleanup_started_total Total number of blocks cleanup runs started. cortex_compactor_block_cleanup_started_total 1 @@ -1187,7 +1067,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni // Wait until a run has completed. cortex_testutil.Poll(t, 5*time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c.compactionRunsCompleted) + return prom_testutil.ToFloat64(c.CompactionRunsCompleted) }) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) @@ -1291,7 +1171,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM // Wait until a run has been completed on each compactor for _, c := range compactors { cortex_testutil.Poll(t, 10*time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c.compactionRunsCompleted) + return prom_testutil.ToFloat64(c.CompactionRunsCompleted) }) } @@ -1431,7 +1311,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit // Wait until a run has been completed on each compactor for _, c := range compactors { cortex_testutil.Poll(t, 60*time.Second, 2.0, func() interface{} { - return prom_testutil.ToFloat64(c.compactionRunsCompleted) + return prom_testutil.ToFloat64(c.CompactionRunsCompleted) }) } @@ -1704,7 +1584,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Instrument blocksCompactorFactory := func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { return tsdbCompactor, - func(ctx context.Context, bkt objstore.InstrumentedBucket, _ log.Logger, _ Config, noCompactMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner { + func(ctx context.Context, bkt objstore.InstrumentedBucket, _ log.Logger, _ Config, noCompactMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics) compact.Planner { tsdbPlanner.noCompactMarkFilters = append(tsdbPlanner.noCompactMarkFilters, noCompactMarkFilter) return tsdbPlanner }, @@ -1917,7 +1797,7 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) { // Wait until a run has been completed on first compactor. This happens as soon as compactor starts. cortex_testutil.Poll(t, 10*time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c1.compactionRunsCompleted) + return prom_testutil.ToFloat64(c1.CompactionRunsCompleted) }) require.NoError(t, os.Mkdir(c1.metaSyncDirForUser("new-user"), 0600)) @@ -1928,7 +1808,7 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) { // Now start second compactor, and wait until it runs compaction. require.NoError(t, services.StartAndAwaitRunning(context.Background(), c2)) cortex_testutil.Poll(t, 10*time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c2.compactionRunsCompleted) + return prom_testutil.ToFloat64(c2.CompactionRunsCompleted) }) // Let's check how many users second compactor has. @@ -2012,7 +1892,7 @@ func TestCompactor_ShouldNotTreatInterruptionsAsErrors(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(ctx, c)) cortex_testutil.Poll(t, 1*time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c.compactionRunsInterrupted) + return prom_testutil.ToFloat64(c.CompactionRunsInterrupted) }) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) @@ -2084,7 +1964,7 @@ func TestCompactor_ShouldNotFailCompactionIfAccessDeniedErrDuringMetaSync(t *tes // Wait until a run has completed. cortex_testutil.Poll(t, 20*time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c.compactionRunsCompleted) + return prom_testutil.ToFloat64(c.CompactionRunsCompleted) }) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) @@ -2134,7 +2014,7 @@ func TestCompactor_ShouldNotFailCompactionIfAccessDeniedErrReturnedFromBucket(t // Wait until a run has completed. cortex_testutil.Poll(t, 20*time.Second, 1.0, func() interface{} { - return prom_testutil.ToFloat64(c.compactionRunsCompleted) + return prom_testutil.ToFloat64(c.CompactionRunsCompleted) }) require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 892ab05398..a041f55b6b 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -13,7 +13,6 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -23,28 +22,21 @@ import ( ) type ShuffleShardingGrouper struct { - ctx context.Context - logger log.Logger - bkt objstore.InstrumentedBucket - acceptMalformedIndex bool - enableVerticalCompaction bool - reg prometheus.Registerer - blocksMarkedForDeletion prometheus.Counter - blocksMarkedForNoCompact prometheus.Counter - garbageCollectedBlocks prometheus.Counter - remainingPlannedCompactions prometheus.Gauge - hashFunc metadata.HashFunc - compactions *prometheus.CounterVec - compactionRunsStarted *prometheus.CounterVec - compactionRunsCompleted *prometheus.CounterVec - compactionFailures *prometheus.CounterVec - verticalCompactions *prometheus.CounterVec - compactorCfg Config - limits Limits - userID string - blockFilesConcurrency int - blocksFetchConcurrency int - compactionConcurrency int + ctx context.Context + logger log.Logger + bkt objstore.InstrumentedBucket + acceptMalformedIndex bool + enableVerticalCompaction bool + blocksMarkedForNoCompact prometheus.Counter + syncerMetrics *compact.SyncerMetrics + compactorMetrics *compactorMetrics + hashFunc metadata.HashFunc + compactorCfg Config + limits Limits + userID string + blockFilesConcurrency int + blocksFetchConcurrency int + compactionConcurrency int ring ring.ReadRing ringLifecyclerAddr string @@ -63,12 +55,10 @@ func NewShuffleShardingGrouper( bkt objstore.InstrumentedBucket, acceptMalformedIndex bool, enableVerticalCompaction bool, - reg prometheus.Registerer, - blocksMarkedForDeletion prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, - garbageCollectedBlocks prometheus.Counter, - remainingPlannedCompactions prometheus.Gauge, hashFunc metadata.HashFunc, + syncerMetrics *compact.SyncerMetrics, + compactorMetrics *compactorMetrics, compactorCfg Config, ring ring.ReadRing, ringLifecyclerAddr string, @@ -93,33 +83,10 @@ func NewShuffleShardingGrouper( bkt: bkt, acceptMalformedIndex: acceptMalformedIndex, enableVerticalCompaction: enableVerticalCompaction, - reg: reg, - blocksMarkedForDeletion: blocksMarkedForDeletion, blocksMarkedForNoCompact: blocksMarkedForNoCompact, - garbageCollectedBlocks: garbageCollectedBlocks, - remainingPlannedCompactions: remainingPlannedCompactions, hashFunc: hashFunc, - // Metrics are copied from Thanos DefaultGrouper constructor - compactions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_compact_group_compactions_total", - Help: "Total number of group compaction attempts that resulted in a new block.", - }, []string{"group"}), - compactionRunsStarted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_compact_group_compaction_runs_started_total", - Help: "Total number of group compaction attempts.", - }, []string{"group"}), - compactionRunsCompleted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_compact_group_compaction_runs_completed_total", - Help: "Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction.", - }, []string{"group"}), - compactionFailures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_compact_group_compactions_failures_total", - Help: "Total number of failed group compactions.", - }, []string{"group"}), - verticalCompactions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_compact_group_vertical_compactions_total", - Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", - }, []string{"group"}), + syncerMetrics: syncerMetrics, + compactorMetrics: compactorMetrics, compactorCfg: compactorCfg, ring: ring, ringLifecyclerAddr: ringLifecyclerAddr, @@ -167,7 +134,9 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re } // Metrics for the remaining planned compactions var remainingCompactions = 0. - defer func() { g.remainingPlannedCompactions.Set(remainingCompactions) }() + defer func() { + g.compactorMetrics.remainingPlannedCompactions.WithLabelValues(g.userID).Set(remainingCompactions) + }() var groups []blocksGroup for _, mainBlocks := range mainGroups { @@ -242,7 +211,11 @@ mainLoop: // resolution and external labels. resolution := group.blocks[0].Thanos.Downsample.Resolution externalLabels := labels.FromMap(group.blocks[0].Thanos.Labels) - + timeRange := group.rangeEnd - group.rangeStart + metricLabelValues := []string{ + g.userID, + fmt.Sprintf("%d", timeRange), + } thanosGroup, err := compact.NewGroup( log.With(g.logger, "groupKey", groupKey, "rangeStart", group.rangeStartTime().String(), "rangeEnd", group.rangeEndTime().String(), "externalLabels", externalLabels, "downsampleResolution", resolution), g.bkt, @@ -251,13 +224,13 @@ mainLoop: resolution, g.acceptMalformedIndex, true, // Enable vertical compaction. - g.compactions.WithLabelValues(groupKey), - g.compactionRunsStarted.WithLabelValues(groupKey), - g.compactionRunsCompleted.WithLabelValues(groupKey), - g.compactionFailures.WithLabelValues(groupKey), - g.verticalCompactions.WithLabelValues(groupKey), - g.garbageCollectedBlocks, - g.blocksMarkedForDeletion, + g.compactorMetrics.compactions.WithLabelValues(metricLabelValues...), + g.compactorMetrics.compactionRunsStarted.WithLabelValues(metricLabelValues...), + g.compactorMetrics.compactionRunsCompleted.WithLabelValues(metricLabelValues...), + g.compactorMetrics.compactionFailures.WithLabelValues(metricLabelValues...), + g.compactorMetrics.verticalCompactions.WithLabelValues(metricLabelValues...), + g.syncerMetrics.GarbageCollectedBlocks, + g.syncerMetrics.BlocksMarkedForDeletion, g.blocksMarkedForNoCompact, g.hashFunc, g.blockFilesConcurrency, diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index fb935c9c9f..c7aaa4a656 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -140,9 +140,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { {block1hto2hExt1Ulid, block0hto1hExt1Ulid}, {block3hto4hExt1Ulid, block2hto3hExt1Ulid}, }, - metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy # TYPE cortex_compactor_remaining_planned_compactions gauge - cortex_compactor_remaining_planned_compactions 3 + cortex_compactor_remaining_planned_compactions{user="test-user"} 3 `, }, "test no compaction": { @@ -150,9 +150,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, blocks: map[ulid.ULID]*metadata.Meta{block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block0hto1hExt2Ulid: blocks[block0hto1hExt2Ulid], block0to1hExt3Ulid: blocks[block0to1hExt3Ulid]}, expected: [][]ulid.ULID{}, - metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy # TYPE cortex_compactor_remaining_planned_compactions gauge - cortex_compactor_remaining_planned_compactions 0 + cortex_compactor_remaining_planned_compactions{user="test-user"} 0 `, }, "test smallest range first": { @@ -164,9 +164,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { {block3hto4hExt1Ulid, block2hto3hExt1Ulid}, {block4hto6hExt2Ulid, block6hto8hExt2Ulid}, }, - metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy # TYPE cortex_compactor_remaining_planned_compactions gauge - cortex_compactor_remaining_planned_compactions 3 + cortex_compactor_remaining_planned_compactions{user="test-user"} 3 `, }, "test oldest min time first": { @@ -177,9 +177,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { {block1hto2hExt1Ulid, block0hto1hExt1Ulid, block1hto2hExt1UlidCopy}, {block3hto4hExt1Ulid, block2hto3hExt1Ulid}, }, - metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy # TYPE cortex_compactor_remaining_planned_compactions gauge - cortex_compactor_remaining_planned_compactions 2 + cortex_compactor_remaining_planned_compactions{user="test-user"} 2 `, }, "test overlapping blocks": { @@ -189,9 +189,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { expected: [][]ulid.ULID{ {block21hto40hExt1Ulid, block21hto40hExt1UlidCopy}, }, - metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy # TYPE cortex_compactor_remaining_planned_compactions gauge - cortex_compactor_remaining_planned_compactions 1 + cortex_compactor_remaining_planned_compactions{user="test-user"} 1 `, }, "test imperfect maxTime blocks": { @@ -201,9 +201,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { expected: [][]ulid.ULID{ {block0hto45mExt1Ulid, block0hto1h30mExt1Ulid}, }, - metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy # TYPE cortex_compactor_remaining_planned_compactions gauge - cortex_compactor_remaining_planned_compactions 1 + cortex_compactor_remaining_planned_compactions{user="test-user"} 1 `, }, "test prematurely created blocks": { @@ -211,9 +211,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { ranges: []time.Duration{2 * time.Hour}, blocks: map[ulid.ULID]*metadata.Meta{blocklast1hExt1UlidCopy: blocks[blocklast1hExt1UlidCopy], blocklast1hExt1Ulid: blocks[blocklast1hExt1Ulid]}, expected: [][]ulid.ULID{}, - metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy # TYPE cortex_compactor_remaining_planned_compactions gauge - cortex_compactor_remaining_planned_compactions 0 + cortex_compactor_remaining_planned_compactions{user="test-user"} 0 `, }, "test group with all blocks visited": { @@ -231,9 +231,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { {id: block1hto2hExt2Ulid, compactorID: otherCompactorID, isExpired: false}, {id: block0hto1hExt2Ulid, compactorID: otherCompactorID, isExpired: false}, }, - metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy # TYPE cortex_compactor_remaining_planned_compactions gauge - cortex_compactor_remaining_planned_compactions 1 + cortex_compactor_remaining_planned_compactions{user="test-user"} 1 `, }, "test group with one block visited": { @@ -250,9 +250,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { }{ {id: block1hto2hExt2Ulid, compactorID: otherCompactorID, isExpired: false}, }, - metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy # TYPE cortex_compactor_remaining_planned_compactions gauge - cortex_compactor_remaining_planned_compactions 1 + cortex_compactor_remaining_planned_compactions{user="test-user"} 1 `, }, "test group block visit marker file expired": { @@ -270,9 +270,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { {id: block1hto2hExt2Ulid, compactorID: otherCompactorID, isExpired: true}, {id: block0hto1hExt2Ulid, compactorID: otherCompactorID, isExpired: true}, }, - metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy # TYPE cortex_compactor_remaining_planned_compactions gauge - cortex_compactor_remaining_planned_compactions 1 + cortex_compactor_remaining_planned_compactions{user="test-user"} 1 `, }, "test group with one block visited by current compactor": { @@ -289,9 +289,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { }{ {id: block1hto2hExt2Ulid, compactorID: testCompactorID, isExpired: false}, }, - metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy # TYPE cortex_compactor_remaining_planned_compactions gauge - cortex_compactor_remaining_planned_compactions 1 + cortex_compactor_remaining_planned_compactions{user="test-user"} 1 `, }, "test basic grouping with concurrency 2": { @@ -302,9 +302,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { {block1hto2hExt2Ulid, block0hto1hExt2Ulid}, {block1hto2hExt1Ulid, block0hto1hExt1Ulid}, }, - metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy # TYPE cortex_compactor_remaining_planned_compactions gauge - cortex_compactor_remaining_planned_compactions 2 + cortex_compactor_remaining_planned_compactions{user="test-user"} 2 `, }, "test should skip block with no compact marker": { @@ -315,9 +315,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { {block1hto2hExt2Ulid, block0hto1hExt2Ulid}, {block1hto2hExt1Ulid, block0hto1hExt1Ulid}, }, - metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy # TYPE cortex_compactor_remaining_planned_compactions gauge - cortex_compactor_remaining_planned_compactions 2 + cortex_compactor_remaining_planned_compactions{user="test-user"} 2 `, noCompactBlocks: map[ulid.ULID]*metadata.NoCompactMark{block2hto3hExt1Ulid: {}}, }, @@ -347,10 +347,6 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { ring.On("ShuffleShard", mock.Anything, mock.Anything).Return(subring, nil) registerer := prometheus.NewPedanticRegistry() - remainingPlannedCompactions := promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ - Name: "cortex_compactor_remaining_planned_compactions", - Help: "Total number of plans that remain to be compacted.", - }) blockVisitMarkerReadFailed := promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_block_visit_marker_read_failed", Help: "Number of block visit marker file failed to be read.", @@ -379,6 +375,8 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { bkt.MockUpload(mock.Anything, nil) bkt.MockGet(mock.Anything, "", nil) + metrics := newCompactorMetrics(registerer) + noCompactFilter := func() map[ulid.ULID]*metadata.NoCompactMark { return testData.noCompactBlocks } @@ -391,18 +389,16 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { objstore.WithNoopInstr(bkt), false, // Do not accept malformed indexes true, // Enable vertical compaction - registerer, - nil, - nil, nil, - remainingPlannedCompactions, metadata.NoneFunc, + metrics.getSyncerMetrics("test-user"), + metrics, *compactorCfg, ring, "test-addr", testCompactorID, overrides, - "", + "test-user", 10, 3, testData.concurrency, diff --git a/pkg/compactor/syncer_metrics.go b/pkg/compactor/syncer_metrics.go deleted file mode 100644 index c171779270..0000000000 --- a/pkg/compactor/syncer_metrics.go +++ /dev/null @@ -1,124 +0,0 @@ -package compactor - -import ( - "github.com/go-kit/log/level" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - - "github.com/cortexproject/cortex/pkg/util" - util_log "github.com/cortexproject/cortex/pkg/util/log" -) - -// Copied from Thanos, pkg/compact/compact.go. -// Here we aggregate metrics from all finished syncers. -type syncerMetrics struct { - metaSync prometheus.Counter - metaSyncFailures prometheus.Counter - metaSyncDuration *util.HistogramDataCollector // was prometheus.Histogram before - metaSyncConsistencyDelay prometheus.Gauge - garbageCollections prometheus.Counter - garbageCollectionFailures prometheus.Counter - garbageCollectionDuration *util.HistogramDataCollector // was prometheus.Histogram before - compactions prometheus.Counter - compactionRunsStarted prometheus.Counter - compactionRunsCompleted prometheus.Counter - compactionFailures prometheus.Counter - verticalCompactions prometheus.Counter -} - -// Copied (and modified with Cortex prefix) from Thanos, pkg/compact/compact.go -// We also ignore "group" label, since we only use a single group. -func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { - var m syncerMetrics - - m.metaSync = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_compactor_meta_syncs_total", - Help: "Total blocks metadata synchronization attempts.", - }) - m.metaSyncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_compactor_meta_sync_failures_total", - Help: "Total blocks metadata synchronization failures.", - }) - m.metaSyncDuration = util.NewHistogramDataCollector(prometheus.NewDesc( - "cortex_compactor_meta_sync_duration_seconds", - "Duration of the blocks metadata synchronization in seconds.", - nil, nil)) - m.metaSyncConsistencyDelay = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "cortex_compactor_meta_sync_consistency_delay_seconds", - Help: "Configured consistency delay in seconds.", - }) - - m.garbageCollections = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_compactor_garbage_collection_total", - Help: "Total number of garbage collection operations.", - }) - m.garbageCollectionFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_compactor_garbage_collection_failures_total", - Help: "Total number of failed garbage collection operations.", - }) - m.garbageCollectionDuration = util.NewHistogramDataCollector(prometheus.NewDesc( - "cortex_compactor_garbage_collection_duration_seconds", - "Time it took to perform garbage collection iteration.", - nil, nil)) - - m.compactions = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_compactor_group_compactions_total", - Help: "Total number of group compaction attempts that resulted in a new block.", - }) - m.compactionRunsStarted = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_compactor_group_compaction_runs_started_total", - Help: "Total number of group compaction attempts.", - }) - m.compactionRunsCompleted = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_compactor_group_compaction_runs_completed_total", - Help: "Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction.", - }) - m.compactionFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_compactor_group_compactions_failures_total", - Help: "Total number of failed group compactions.", - }) - m.verticalCompactions = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_compactor_group_vertical_compactions_total", - Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", - }) - - if reg != nil { - reg.MustRegister(m.metaSyncDuration, m.garbageCollectionDuration) - } - - return &m -} - -func (m *syncerMetrics) gatherThanosSyncerMetrics(reg *prometheus.Registry) { - if m == nil { - return - } - - mf, err := reg.Gather() - if err != nil { - level.Warn(util_log.Logger).Log("msg", "failed to gather metrics from syncer registry after compaction", "err", err) - return - } - - mfm, err := util.NewMetricFamilyMap(mf) - if err != nil { - level.Warn(util_log.Logger).Log("msg", "failed to gather metrics from syncer registry after compaction", "err", err) - return - } - - m.metaSync.Add(mfm.SumCounters("blocks_meta_syncs_total")) - m.metaSyncFailures.Add(mfm.SumCounters("blocks_meta_sync_failures_total")) - m.metaSyncDuration.Add(mfm.SumHistograms("blocks_meta_sync_duration_seconds")) - m.metaSyncConsistencyDelay.Set(mfm.MaxGauges("consistency_delay_seconds")) - - m.garbageCollections.Add(mfm.SumCounters("thanos_compact_garbage_collection_total")) - m.garbageCollectionFailures.Add(mfm.SumCounters("thanos_compact_garbage_collection_failures_total")) - m.garbageCollectionDuration.Add(mfm.SumHistograms("thanos_compact_garbage_collection_duration_seconds")) - - // These metrics have "group" label, but we sum them all together. - m.compactions.Add(mfm.SumCounters("thanos_compact_group_compactions_total")) - m.compactionRunsStarted.Add(mfm.SumCounters("thanos_compact_group_compaction_runs_started_total")) - m.compactionRunsCompleted.Add(mfm.SumCounters("thanos_compact_group_compaction_runs_completed_total")) - m.compactionFailures.Add(mfm.SumCounters("thanos_compact_group_compactions_failures_total")) - m.verticalCompactions.Add(mfm.SumCounters("thanos_compact_group_vertical_compactions_total")) -} diff --git a/pkg/compactor/syncer_metrics_test.go b/pkg/compactor/syncer_metrics_test.go deleted file mode 100644 index 7a21955ebf..0000000000 --- a/pkg/compactor/syncer_metrics_test.go +++ /dev/null @@ -1,232 +0,0 @@ -package compactor - -import ( - "bytes" - "testing" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/require" -) - -func TestSyncerMetrics(t *testing.T) { - reg := prometheus.NewPedanticRegistry() - - sm := newSyncerMetrics(reg) - sm.gatherThanosSyncerMetrics(generateTestData(12345)) - sm.gatherThanosSyncerMetrics(generateTestData(76543)) - sm.gatherThanosSyncerMetrics(generateTestData(22222)) - // total base = 111110 - - err := testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_compactor_meta_sync_consistency_delay_seconds Configured consistency delay in seconds. - # TYPE cortex_compactor_meta_sync_consistency_delay_seconds gauge - cortex_compactor_meta_sync_consistency_delay_seconds 300 - - # HELP cortex_compactor_meta_syncs_total Total blocks metadata synchronization attempts. - # TYPE cortex_compactor_meta_syncs_total counter - cortex_compactor_meta_syncs_total 111110 - - # HELP cortex_compactor_meta_sync_failures_total Total blocks metadata synchronization failures. - # TYPE cortex_compactor_meta_sync_failures_total counter - cortex_compactor_meta_sync_failures_total 222220 - - # HELP cortex_compactor_meta_sync_duration_seconds Duration of the blocks metadata synchronization in seconds. - # TYPE cortex_compactor_meta_sync_duration_seconds histogram - # Observed values: 3.7035, 22.9629, 6.6666 (seconds) - cortex_compactor_meta_sync_duration_seconds_bucket{le="0.01"} 0 - cortex_compactor_meta_sync_duration_seconds_bucket{le="0.1"} 0 - cortex_compactor_meta_sync_duration_seconds_bucket{le="0.3"} 0 - cortex_compactor_meta_sync_duration_seconds_bucket{le="0.6"} 0 - cortex_compactor_meta_sync_duration_seconds_bucket{le="1"} 0 - cortex_compactor_meta_sync_duration_seconds_bucket{le="3"} 0 - cortex_compactor_meta_sync_duration_seconds_bucket{le="6"} 1 - cortex_compactor_meta_sync_duration_seconds_bucket{le="9"} 2 - cortex_compactor_meta_sync_duration_seconds_bucket{le="20"} 2 - cortex_compactor_meta_sync_duration_seconds_bucket{le="30"} 3 - cortex_compactor_meta_sync_duration_seconds_bucket{le="60"} 3 - cortex_compactor_meta_sync_duration_seconds_bucket{le="90"} 3 - cortex_compactor_meta_sync_duration_seconds_bucket{le="120"} 3 - cortex_compactor_meta_sync_duration_seconds_bucket{le="240"} 3 - cortex_compactor_meta_sync_duration_seconds_bucket{le="360"} 3 - cortex_compactor_meta_sync_duration_seconds_bucket{le="720"} 3 - cortex_compactor_meta_sync_duration_seconds_bucket{le="+Inf"} 3 - # rounding error - cortex_compactor_meta_sync_duration_seconds_sum 33.333000000000006 - cortex_compactor_meta_sync_duration_seconds_count 3 - - # HELP cortex_compactor_garbage_collection_total Total number of garbage collection operations. - # TYPE cortex_compactor_garbage_collection_total counter - cortex_compactor_garbage_collection_total 555550 - - # HELP cortex_compactor_garbage_collection_failures_total Total number of failed garbage collection operations. - # TYPE cortex_compactor_garbage_collection_failures_total counter - cortex_compactor_garbage_collection_failures_total 666660 - - # HELP cortex_compactor_garbage_collection_duration_seconds Time it took to perform garbage collection iteration. - # TYPE cortex_compactor_garbage_collection_duration_seconds histogram - # Observed values: 8.6415, 53.5801, 15.5554 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="0.01"} 0 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="0.1"} 0 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="0.3"} 0 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="0.6"} 0 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="1"} 0 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="3"} 0 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="6"} 0 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="9"} 1 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="20"} 2 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="30"} 2 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="60"} 3 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="90"} 3 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="120"} 3 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="240"} 3 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="360"} 3 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="720"} 3 - cortex_compactor_garbage_collection_duration_seconds_bucket{le="+Inf"} 3 - cortex_compactor_garbage_collection_duration_seconds_sum 77.777 - cortex_compactor_garbage_collection_duration_seconds_count 3 - - # HELP cortex_compactor_group_compactions_total Total number of group compaction attempts that resulted in a new block. - # TYPE cortex_compactor_group_compactions_total counter - # Sum across all groups - cortex_compactor_group_compactions_total 2999970 - - # HELP cortex_compactor_group_compaction_runs_started_total Total number of group compaction attempts. - # TYPE cortex_compactor_group_compaction_runs_started_total counter - # Sum across all groups - cortex_compactor_group_compaction_runs_started_total 3999960 - - # HELP cortex_compactor_group_compaction_runs_completed_total Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction. - # TYPE cortex_compactor_group_compaction_runs_completed_total counter - # Sum across all groups - cortex_compactor_group_compaction_runs_completed_total 4999950 - - # HELP cortex_compactor_group_compactions_failures_total Total number of failed group compactions. - # TYPE cortex_compactor_group_compactions_failures_total counter - cortex_compactor_group_compactions_failures_total 5999940 - - # HELP cortex_compactor_group_vertical_compactions_total Total number of group compaction attempts that resulted in a new block based on overlapping blocks. - # TYPE cortex_compactor_group_vertical_compactions_total counter - cortex_compactor_group_vertical_compactions_total 6999930 - `)) - require.NoError(t, err) -} - -func generateTestData(base float64) *prometheus.Registry { - r := prometheus.NewRegistry() - m := newTestSyncerMetrics(r) - m.metaSync.Add(1 * base) - m.metaSyncFailures.Add(2 * base) - m.metaSyncDuration.Observe(3 * base / 10000) - m.metaSyncConsistencyDelay.Set(300) - m.garbageCollections.Add(5 * base) - m.garbageCollectionFailures.Add(6 * base) - m.garbageCollectionDuration.Observe(7 * base / 10000) - m.compactions.WithLabelValues("aaa").Add(8 * base) - m.compactions.WithLabelValues("bbb").Add(9 * base) - m.compactions.WithLabelValues("ccc").Add(10 * base) - m.compactionRunsStarted.WithLabelValues("aaa").Add(11 * base) - m.compactionRunsStarted.WithLabelValues("bbb").Add(12 * base) - m.compactionRunsStarted.WithLabelValues("ccc").Add(13 * base) - m.compactionRunsCompleted.WithLabelValues("aaa").Add(14 * base) - m.compactionRunsCompleted.WithLabelValues("bbb").Add(15 * base) - m.compactionRunsCompleted.WithLabelValues("ccc").Add(16 * base) - m.compactionFailures.WithLabelValues("aaa").Add(17 * base) - m.compactionFailures.WithLabelValues("bbb").Add(18 * base) - m.compactionFailures.WithLabelValues("ccc").Add(19 * base) - m.verticalCompactions.WithLabelValues("aaa").Add(20 * base) - m.verticalCompactions.WithLabelValues("bbb").Add(21 * base) - m.verticalCompactions.WithLabelValues("ccc").Add(22 * base) - return r -} - -// directly copied from Thanos (and renamed syncerMetrics to testSyncerMetrics to avoid conflict) -type testSyncerMetrics struct { - metaSync prometheus.Counter - metaSyncFailures prometheus.Counter - metaSyncDuration prometheus.Histogram - metaSyncConsistencyDelay prometheus.Gauge - garbageCollections prometheus.Counter - garbageCollectionFailures prometheus.Counter - garbageCollectionDuration prometheus.Histogram - compactions *prometheus.CounterVec - compactionRunsStarted *prometheus.CounterVec - compactionRunsCompleted *prometheus.CounterVec - compactionFailures *prometheus.CounterVec - verticalCompactions *prometheus.CounterVec -} - -func newTestSyncerMetrics(reg prometheus.Registerer) *testSyncerMetrics { - var m testSyncerMetrics - - m.metaSync = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "blocks_meta_syncs_total", - Help: "Total blocks metadata synchronization attempts.", - }) - m.metaSyncFailures = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "blocks_meta_sync_failures_total", - Help: "Total blocks metadata synchronization failures.", - }) - m.metaSyncDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "blocks_meta_sync_duration_seconds", - Help: "Duration of the blocks metadata synchronization in seconds.", - Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}, - }) - m.metaSyncConsistencyDelay = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "consistency_delay_seconds", - Help: "Configured consistency delay in seconds.", - }) - - m.garbageCollections = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "thanos_compact_garbage_collection_total", - Help: "Total number of garbage collection operations.", - }) - m.garbageCollectionFailures = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "thanos_compact_garbage_collection_failures_total", - Help: "Total number of failed garbage collection operations.", - }) - m.garbageCollectionDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "thanos_compact_garbage_collection_duration_seconds", - Help: "Time it took to perform garbage collection iteration.", - Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}, - }) - - m.compactions = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_compact_group_compactions_total", - Help: "Total number of group compaction attempts that resulted in a new block.", - }, []string{"group"}) - m.compactionRunsStarted = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_compact_group_compaction_runs_started_total", - Help: "Total number of group compaction attempts.", - }, []string{"group"}) - m.compactionRunsCompleted = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_compact_group_compaction_runs_completed_total", - Help: "Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction.", - }, []string{"group"}) - m.compactionFailures = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_compact_group_compactions_failures_total", - Help: "Total number of failed group compactions.", - }, []string{"group"}) - m.verticalCompactions = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_compact_group_vertical_compactions_total", - Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", - }, []string{"group"}) - - if reg != nil { - reg.MustRegister( - m.metaSync, - m.metaSyncFailures, - m.metaSyncDuration, - m.metaSyncConsistencyDelay, - m.garbageCollections, - m.garbageCollectionFailures, - m.garbageCollectionDuration, - m.compactions, - m.compactionRunsStarted, - m.compactionRunsCompleted, - m.compactionFailures, - m.verticalCompactions, - ) - } - return &m -}