Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement partition compaction grouper #6172

Merged
merged 12 commits into from
Dec 31, 2024
18 changes: 9 additions & 9 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
@@ -286,18 +286,18 @@ compactor:
[wait_active_instance_timeout: <duration> | default = 10m]

# The compaction strategy to use. Supported values are: default, partitioning.
# CLI flag: -compactor.compaction-mode
[compaction_mode: <string> | default = "default"]
# CLI flag: -compactor.compaction-strategy
[compaction_strategy: <string> | default = "default"]

# How long block visit marker file should be considered as expired and able to
# be picked up by compactor again.
# CLI flag: -compactor.block-visit-marker-timeout
[block_visit_marker_timeout: <duration> | default = 5m]
# How long compaction visit marker file should be considered as expired and
# able to be picked up by compactor again.
# CLI flag: -compactor.compaction-visit-marker-timeout
[compaction_visit_marker_timeout: <duration> | default = 1m30s]

# How frequently block visit marker file should be updated duration
# How frequently compaction visit marker file should be updated duration
# compaction.
# CLI flag: -compactor.block-visit-marker-file-update-interval
[block_visit_marker_file_update_interval: <duration> | default = 1m]
# CLI flag: -compactor.compaction-visit-marker-file-update-interval
[compaction_visit_marker_file_update_interval: <duration> | default = 1m]

# How long cleaner visit marker file should be considered as expired and able
# to be picked up by cleaner again. The value should be smaller than
27 changes: 18 additions & 9 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
@@ -2217,17 +2217,18 @@ sharding_ring:
[wait_active_instance_timeout: <duration> | default = 10m]

# The compaction strategy to use. Supported values are: default, partitioning.
# CLI flag: -compactor.compaction-mode
[compaction_mode: <string> | default = "default"]
# CLI flag: -compactor.compaction-strategy
[compaction_strategy: <string> | default = "default"]

# How long block visit marker file should be considered as expired and able to
# be picked up by compactor again.
# CLI flag: -compactor.block-visit-marker-timeout
[block_visit_marker_timeout: <duration> | default = 5m]
# How long compaction visit marker file should be considered as expired and able
# to be picked up by compactor again.
# CLI flag: -compactor.compaction-visit-marker-timeout
[compaction_visit_marker_timeout: <duration> | default = 1m30s]
alexqyle marked this conversation as resolved.
Show resolved Hide resolved

# How frequently block visit marker file should be updated duration compaction.
# CLI flag: -compactor.block-visit-marker-file-update-interval
[block_visit_marker_file_update_interval: <duration> | default = 1m]
# How frequently compaction visit marker file should be updated duration
# compaction.
# CLI flag: -compactor.compaction-visit-marker-file-update-interval
[compaction_visit_marker_file_update_interval: <duration> | default = 1m]

# How long cleaner visit marker file should be considered as expired and able to
# be picked up by cleaner again. The value should be smaller than
@@ -3385,6 +3386,14 @@ query_rejection:
# CLI flag: -compactor.tenant-shard-size
[compactor_tenant_shard_size: <int> | default = 0]

# Index size limit in bytes for each compaction partition. 0 means no limit
# CLI flag: -compactor.partition-index-size-limit-in-bytes
[compactor_partition_index_size_limit_in_bytes: <int> | default = 0]

# Time series count limit for each compaction partition. 0 means no limit
# CLI flag: -compactor.partition-series-count-limit
[compactor_partition_series_count_limit: <int> | default = 0]

# S3 server-side encryption type. Required to enable server-side encryption
# overrides for a specific tenant. If not set, the default S3 client settings
# are used.
73 changes: 54 additions & 19 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ var (
errInvalidCompactionStrategy = errors.New("invalid compaction strategy")
errInvalidCompactionStrategyPartitioning = errors.New("compaction strategy partitioning can only be enabled when shuffle sharding is enabled")

DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper {
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter, _ int) compact.Grouper {
return compact.NewDefaultGrouperWithMetrics(
logger,
bkt,
@@ -79,9 +79,31 @@ var (
cfg.BlocksFetchConcurrency)
}

ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper {
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, blocksMarkedForNoCompaction prometheus.Counter, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, syncerMetrics *compact.SyncerMetrics, compactorMetrics *compactorMetrics, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ingestionReplicationFactor int) compact.Grouper {
if cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
return NewPartitionCompactionGrouper(ctx, logger, bkt)
return NewPartitionCompactionGrouper(
ctx,
logger,
bkt,
cfg.AcceptMalformedIndex,
true, // Enable vertical compaction
blocksMarkedForNoCompaction,
syncerMetrics,
compactorMetrics,
metadata.NoneFunc,
cfg,
ring,
ringLifecycle.Addr,
ringLifecycle.ID,
limits,
userID,
cfg.BlockFilesConcurrency,
cfg.BlocksFetchConcurrency,
cfg.CompactionConcurrency,
true,
cfg.CompactionVisitMarkerTimeout,
noCompactionMarkFilter.NoCompactMarkedBlocks,
ingestionReplicationFactor)
} else {
return NewShuffleShardingGrouper(
ctx,
@@ -102,7 +124,7 @@ var (
cfg.BlockFilesConcurrency,
cfg.BlocksFetchConcurrency,
cfg.CompactionConcurrency,
cfg.BlockVisitMarkerTimeout,
cfg.CompactionVisitMarkerTimeout,
blockVisitMarkerReadFailed,
blockVisitMarkerWriteFailed,
noCompactionMarkFilter.NoCompactMarkedBlocks)
@@ -133,7 +155,7 @@ var (
if cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
return NewPartitionCompactionPlanner(ctx, bkt, logger)
} else {
return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
}
}
return compactor, plannerFactory, nil
@@ -156,6 +178,7 @@ type BlocksGrouperFactory func(
limit Limits,
userID string,
noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
ingestionReplicationFactor int,
) compact.Grouper

// BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.
@@ -182,6 +205,8 @@ type PlannerFactory func(
// Limits defines limits used by the Compactor.
type Limits interface {
CompactorTenantShardSize(userID string) int
CompactorPartitionIndexSizeLimitInBytes(userID string) int64
CompactorPartitionSeriesCountLimit(userID string) int64
}

// Config holds the Compactor config.
@@ -213,8 +238,8 @@ type Config struct {
ShardingStrategy string `yaml:"sharding_strategy"`
ShardingRing RingConfig `yaml:"sharding_ring"`

// Compaction mode.
CompactionStrategy string `yaml:"compaction_mode"`
// Compaction strategy.
CompactionStrategy string `yaml:"compaction_strategy"`
alexqyle marked this conversation as resolved.
Show resolved Hide resolved

// No need to add options to customize the retry backoff,
// given the defaults should be fine, but allow to override
@@ -226,9 +251,9 @@ type Config struct {
BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"`
BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"`

// Block visit marker file config
BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"`
BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"`
// Compaction visit marker file config
CompactionVisitMarkerTimeout time.Duration `yaml:"compaction_visit_marker_timeout"`
CompactionVisitMarkerFileUpdateInterval time.Duration `yaml:"compaction_visit_marker_file_update_interval"`

// Cleaner visit marker file config
CleanerVisitMarkerTimeout time.Duration `yaml:"cleaner_visit_marker_timeout"`
@@ -258,7 +283,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks cleanup and maintenance should run concurrently.")
f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.")
f.StringVar(&cfg.ShardingStrategy, "compactor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
f.StringVar(&cfg.CompactionStrategy, "compactor.compaction-mode", util.CompactionStrategyDefault, fmt.Sprintf("The compaction strategy to use. Supported values are: %s.", strings.Join(supportedCompactionStrategies, ", ")))
f.StringVar(&cfg.CompactionStrategy, "compactor.compaction-strategy", util.CompactionStrategyDefault, fmt.Sprintf("The compaction strategy to use. Supported values are: %s.", strings.Join(supportedCompactionStrategies, ", ")))
f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+
"If not 0, blocks will be marked for deletion and compactor component will permanently delete blocks marked for deletion from the bucket. "+
"If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.")
@@ -271,8 +296,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.")
f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.")

f.DurationVar(&cfg.BlockVisitMarkerTimeout, "compactor.block-visit-marker-timeout", 5*time.Minute, "How long block visit marker file should be considered as expired and able to be picked up by compactor again.")
f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.")
f.DurationVar(&cfg.CompactionVisitMarkerTimeout, "compactor.compaction-visit-marker-timeout", 90*time.Second, "How long compaction visit marker file should be considered as expired and able to be picked up by compactor again.")
f.DurationVar(&cfg.CompactionVisitMarkerFileUpdateInterval, "compactor.compaction-visit-marker-file-update-interval", 1*time.Minute, "How frequently compaction visit marker file should be updated duration compaction.")

f.DurationVar(&cfg.CleanerVisitMarkerTimeout, "compactor.cleaner-visit-marker-timeout", 10*time.Minute, "How long cleaner visit marker file should be considered as expired and able to be picked up by cleaner again. The value should be smaller than -compactor.cleanup-interval")
f.DurationVar(&cfg.CleanerVisitMarkerFileUpdateInterval, "compactor.cleaner-visit-marker-file-update-interval", 5*time.Minute, "How frequently cleaner visit marker file should be updated when cleaning user.")
@@ -305,7 +330,7 @@ func (cfg *Config) Validate(limits validation.Limits) error {
}
}

// Make sure a valid compaction mode is being used
// Make sure a valid compaction strategy is being used
if !util.StringsContain(supportedCompactionStrategies, cfg.CompactionStrategy) {
return errInvalidCompactionStrategy
}
@@ -379,10 +404,13 @@ type Compactor struct {

// Thanos compactor metrics per user
compactorMetrics *compactorMetrics

// Replication factor of ingester ring
ingestionReplicationFactor int
}

// NewCompactor makes a new Compactor.
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Compactor, error) {
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides, ingestionReplicationFactor int) (*Compactor, error) {
bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) {
return bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer)
}
@@ -405,7 +433,11 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi
}
}

cortexCompactor, err := newCompactor(compactorCfg, storageCfg, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, limits)
if ingestionReplicationFactor <= 0 {
ingestionReplicationFactor = 1
}

cortexCompactor, err := newCompactor(compactorCfg, storageCfg, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, limits, ingestionReplicationFactor)
if err != nil {
return nil, errors.Wrap(err, "failed to create Cortex blocks compactor")
}
@@ -422,6 +454,7 @@ func newCompactor(
blocksGrouperFactory BlocksGrouperFactory,
blocksCompactorFactory BlocksCompactorFactory,
limits *validation.Overrides,
ingestionReplicationFactor int,
) (*Compactor, error) {
var compactorMetrics *compactorMetrics
if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle {
@@ -496,8 +529,9 @@ func newCompactor(
Name: "cortex_compactor_block_visit_marker_write_failed",
Help: "Number of block visit marker file failed to be written.",
}),
limits: limits,
compactorMetrics: compactorMetrics,
limits: limits,
compactorMetrics: compactorMetrics,
ingestionReplicationFactor: ingestionReplicationFactor,
}

if len(compactorCfg.EnabledTenants) > 0 {
@@ -761,6 +795,7 @@ func (c *Compactor) compactUsers(ctx context.Context) {
continue
} else if markedForDeletion {
c.CompactionRunSkippedTenants.Inc()
c.compactorMetrics.deleteMetricsForDeletedTenant(userID)
level.Debug(c.logger).Log("msg", "skipping user because it is marked for deletion", "user", userID)
continue
}
@@ -928,7 +963,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
compactor, err := compact.NewBucketCompactor(
ulogger,
syncer,
c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, c.BlocksMarkedForNoCompaction, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, syncerMetrics, c.compactorMetrics, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter),
c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, c.BlocksMarkedForNoCompaction, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, syncerMetrics, c.compactorMetrics, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter, c.ingestionReplicationFactor),
c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, userID, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.compactorMetrics),
c.blocksCompactor,
c.compactDirForUser(userID),
30 changes: 30 additions & 0 deletions pkg/compactor/compactor_metrics.go
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ type compactorMetrics struct {
verticalCompactions *prometheus.CounterVec
remainingPlannedCompactions *prometheus.GaugeVec
compactionErrorsCount *prometheus.CounterVec
partitionCount *prometheus.GaugeVec
}

const (
@@ -169,6 +170,10 @@ func newCompactorMetricsWithLabels(reg prometheus.Registerer, commonLabels []str
Name: "cortex_compactor_compaction_error_total",
Help: "Total number of errors from compactions.",
}, append(commonLabels, compactionErrorTypesLabelName))
m.partitionCount = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_compact_group_partition_count",
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
Help: "Number of partitions.",
alexqyle marked this conversation as resolved.
Show resolved Hide resolved
}, compactionLabels)

return &m
}
@@ -207,3 +212,28 @@ func (m *compactorMetrics) getCommonLabelValues(userID string) []string {
}
return labelValues
}

func (m *compactorMetrics) initMetricWithCompactionLabelValues(labelValue ...string) {
if len(m.compactionLabels) != len(commonLabels)+len(compactionLabels) {
return
}

m.compactions.WithLabelValues(labelValue...)
m.compactionPlanned.WithLabelValues(labelValue...)
m.compactionRunsStarted.WithLabelValues(labelValue...)
m.compactionRunsCompleted.WithLabelValues(labelValue...)
m.compactionFailures.WithLabelValues(labelValue...)
m.verticalCompactions.WithLabelValues(labelValue...)
m.partitionCount.WithLabelValues(labelValue...)
}

func (m *compactorMetrics) deleteMetricsForDeletedTenant(userID string) {
m.syncerBlocksMarkedForDeletion.DeleteLabelValues(userID)
m.compactions.DeleteLabelValues(userID)
m.compactionPlanned.DeleteLabelValues(userID)
m.compactionRunsStarted.DeleteLabelValues(userID)
m.compactionRunsCompleted.DeleteLabelValues(userID)
m.compactionFailures.DeleteLabelValues(userID)
m.verticalCompactions.DeleteLabelValues(userID)
m.partitionCount.DeleteLabelValues(userID)
}
8 changes: 8 additions & 0 deletions pkg/compactor/compactor_metrics_test.go
Original file line number Diff line number Diff line change
@@ -130,6 +130,11 @@ func TestSyncerMetrics(t *testing.T) {
cortex_compactor_compaction_error_total{type="unauthorized",user="aaa"} 477730
cortex_compactor_compaction_error_total{type="unauthorized",user="bbb"} 488840
cortex_compactor_compaction_error_total{type="unauthorized",user="ccc"} 499950
# HELP cortex_compact_group_partition_count Number of partitions.
# TYPE cortex_compact_group_partition_count gauge
cortex_compact_group_partition_count{user="aaa"} 511060
cortex_compact_group_partition_count{user="bbb"} 522170
cortex_compact_group_partition_count{user="ccc"} 533280
`))
require.NoError(t, err)

@@ -183,4 +188,7 @@ func generateTestData(cm *compactorMetrics, base float64) {
cm.compactionErrorsCount.WithLabelValues("aaa", unauthorizedError).Add(43 * base)
cm.compactionErrorsCount.WithLabelValues("bbb", unauthorizedError).Add(44 * base)
cm.compactionErrorsCount.WithLabelValues("ccc", unauthorizedError).Add(45 * base)
cm.partitionCount.WithLabelValues("aaa").Add(46 * base)
cm.partitionCount.WithLabelValues("bbb").Add(47 * base)
cm.partitionCount.WithLabelValues("ccc").Add(48 * base)
}
Loading