From a54da24f66a7d8d0a7ab62407eec0e1418bd39cf Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 26 Sep 2024 04:31:30 +0900 Subject: [PATCH] Add tsdb wal compression type zstd (#6232) --- CHANGELOG.md | 1 + docs/blocks-storage/querier.md | 8 +++++- docs/blocks-storage/store-gateway.md | 8 +++++- docs/configuration/config-file-reference.md | 8 +++++- pkg/ingester/ingester.go | 7 +++--- pkg/storage/tsdb/config.go | 28 ++++++++++++++------- pkg/storage/tsdb/config_test.go | 24 ++++++++++++++++++ 7 files changed, 69 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c692798ac..fd2b0a3038 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [CHANGE] Enable Compactor and Alertmanager in target all. #6204 * [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151 * [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129 +* [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232 * [ENHANCEMENT] Query Frontend: Add info field to query response. #6207 * [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188 * [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 7fdfaf89a4..00605f60e4 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1423,10 +1423,16 @@ blocks_storage: # CLI flag: -blocks-storage.tsdb.stripe-size [stripe_size: | default = 16384] - # True to enable TSDB WAL compression. + # Deprecated (use blocks-storage.tsdb.wal-compression-type instead): True to + # enable TSDB WAL compression. # CLI flag: -blocks-storage.tsdb.wal-compression-enabled [wal_compression_enabled: | default = false] + # TSDB WAL type. Supported values are: 'snappy', 'zstd' and '' (disable + # compression) + # CLI flag: -blocks-storage.tsdb.wal-compression-type + [wal_compression_type: | default = ""] + # TSDB WAL segments files max size (bytes). # CLI flag: -blocks-storage.tsdb.wal-segment-size-bytes [wal_segment_size_bytes: | default = 134217728] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 6c31046b1f..3303ec9cb9 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1538,10 +1538,16 @@ blocks_storage: # CLI flag: -blocks-storage.tsdb.stripe-size [stripe_size: | default = 16384] - # True to enable TSDB WAL compression. + # Deprecated (use blocks-storage.tsdb.wal-compression-type instead): True to + # enable TSDB WAL compression. # CLI flag: -blocks-storage.tsdb.wal-compression-enabled [wal_compression_enabled: | default = false] + # TSDB WAL type. Supported values are: 'snappy', 'zstd' and '' (disable + # compression) + # CLI flag: -blocks-storage.tsdb.wal-compression-type + [wal_compression_type: | default = ""] + # TSDB WAL segments files max size (bytes). # CLI flag: -blocks-storage.tsdb.wal-segment-size-bytes [wal_segment_size_bytes: | default = 134217728] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 58c3fdec63..7b34c88bbb 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1968,10 +1968,16 @@ tsdb: # CLI flag: -blocks-storage.tsdb.stripe-size [stripe_size: | default = 16384] - # True to enable TSDB WAL compression. + # Deprecated (use blocks-storage.tsdb.wal-compression-type instead): True to + # enable TSDB WAL compression. # CLI flag: -blocks-storage.tsdb.wal-compression-enabled [wal_compression_enabled: | default = false] + # TSDB WAL type. Supported values are: 'snappy', 'zstd' and '' (disable + # compression) + # CLI flag: -blocks-storage.tsdb.wal-compression-type + [wal_compression_type: | default = ""] + # TSDB WAL segments files max size (bytes). # CLI flag: -blocks-storage.tsdb.wal-segment-size-bytes [wal_segment_size_bytes: | default = 134217728] diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index e9a436ec16..7093138802 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -2172,11 +2172,12 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { enableExemplars = true } oooTimeWindow := i.limits.OutOfOrderTimeWindow(userID) + walCompressType := wlog.CompressionNone - // TODO(yeya24): expose zstd compression for WAL. - if i.cfg.BlocksStorageConfig.TSDB.WALCompressionEnabled { - walCompressType = wlog.CompressionSnappy + if i.cfg.BlocksStorageConfig.TSDB.WALCompressionType != "" { + walCompressType = wlog.CompressionType(i.cfg.BlocksStorageConfig.TSDB.WALCompressionType) } + // Create a new user database db, err := tsdb.Open(udir, userLogger, tsdbPromReg, &tsdb.Options{ RetentionDuration: i.cfg.BlocksStorageConfig.TSDB.Retention.Milliseconds(), diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index bd3099dba9..5a33115182 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -42,14 +42,15 @@ const ( // Validation errors var ( - errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency") - errInvalidOpeningConcurrency = errors.New("invalid TSDB opening concurrency") - errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval") - errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency") - errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes") - errInvalidStripeSize = errors.New("invalid TSDB stripe size") - errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)") - errEmptyBlockranges = errors.New("empty block ranges for TSDB") + errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency") + errInvalidOpeningConcurrency = errors.New("invalid TSDB opening concurrency") + errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval") + errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency") + errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes") + errInvalidStripeSize = errors.New("invalid TSDB stripe size") + errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)") + errEmptyBlockranges = errors.New("empty block ranges for TSDB") + errUnSupportedWALCompressionType = errors.New("unsupported WAL compression type, valid types are (zstd, snappy and '')") ErrInvalidBucketIndexBlockDiscoveryStrategy = errors.New("bucket index block discovery strategy can only be enabled when bucket index is enabled") ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy") @@ -137,6 +138,7 @@ type TSDBConfig struct { HeadChunksWriteBufferSize int `yaml:"head_chunks_write_buffer_size_bytes"` StripeSize int `yaml:"stripe_size"` WALCompressionEnabled bool `yaml:"wal_compression_enabled"` + WALCompressionType string `yaml:"wal_compression_type"` WALSegmentSizeBytes int `yaml:"wal_segment_size_bytes"` FlushBlocksOnShutdown bool `yaml:"flush_blocks_on_shutdown"` CloseIdleTSDBTimeout time.Duration `yaml:"close_idle_tsdb_timeout"` @@ -183,7 +185,8 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.HeadCompactionIdleTimeout, "blocks-storage.tsdb.head-compaction-idle-timeout", 1*time.Hour, "If TSDB head is idle for this duration, it is compacted. Note that up to 25% jitter is added to the value to avoid ingesters compacting concurrently. 0 means disabled.") f.IntVar(&cfg.HeadChunksWriteBufferSize, "blocks-storage.tsdb.head-chunks-write-buffer-size-bytes", chunks.DefaultWriteBufferSize, "The write buffer size used by the head chunks mapper. Lower values reduce memory utilisation on clusters with a large number of tenants at the cost of increased disk I/O operations.") f.IntVar(&cfg.StripeSize, "blocks-storage.tsdb.stripe-size", 16384, "The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance.") - f.BoolVar(&cfg.WALCompressionEnabled, "blocks-storage.tsdb.wal-compression-enabled", false, "True to enable TSDB WAL compression.") + f.BoolVar(&cfg.WALCompressionEnabled, "blocks-storage.tsdb.wal-compression-enabled", false, "Deprecated (use blocks-storage.tsdb.wal-compression-type instead): True to enable TSDB WAL compression.") + f.StringVar(&cfg.WALCompressionType, "blocks-storage.tsdb.wal-compression-type", "", "TSDB WAL type. Supported values are: 'snappy', 'zstd' and '' (disable compression)") f.IntVar(&cfg.WALSegmentSizeBytes, "blocks-storage.tsdb.wal-segment-size-bytes", wlog.DefaultSegmentSize, "TSDB WAL segments files max size (bytes).") f.BoolVar(&cfg.FlushBlocksOnShutdown, "blocks-storage.tsdb.flush-blocks-on-shutdown", false, "True to flush blocks to storage on shutdown. If false, incomplete blocks will be reused after restart.") f.DurationVar(&cfg.CloseIdleTSDBTimeout, "blocks-storage.tsdb.close-idle-tsdb-timeout", 0, "If TSDB has not received any data for this duration, and all blocks from TSDB have been shipped, TSDB is closed and deleted from local disk. If set to positive value, this value should be equal or higher than -querier.query-ingesters-within flag to make sure that TSDB is not closed prematurely, which could cause partial query results. 0 or negative value disables closing of idle TSDB.") @@ -232,6 +235,13 @@ func (cfg *TSDBConfig) Validate() error { return errInvalidOutOfOrderCapMax } + switch cfg.WALCompressionType { + case "snappy", "zstd", "": + // valid + default: + return errUnSupportedWALCompressionType + } + return nil } diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index d8c19849d2..35f8284d49 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -121,6 +121,30 @@ func TestConfig_Validate(t *testing.T) { }, expectedErr: errInvalidOutOfOrderCapMax, }, + "should pass on valid wal compression type (snappy)": { + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.WALCompressionType = "snappy" + }, + expectedErr: nil, + }, + "should pass on valid wal compression type (zstd)": { + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.WALCompressionType = "zstd" + }, + expectedErr: nil, + }, + "should pass on valid wal compression type ('')": { + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.WALCompressionType = "" + }, + expectedErr: nil, + }, + "should fail on invalid wal compression type": { + setup: func(cfg *BlocksStorageConfig) { + cfg.TSDB.WALCompressionType = "dummy" + }, + expectedErr: errUnSupportedWALCompressionType, + }, } for testName, testData := range tests {