From 3237249dac428daf2593ddb752d579da7b8803f7 Mon Sep 17 00:00:00 2001 From: yaothao Date: Tue, 17 Dec 2024 18:47:10 -0500 Subject: [PATCH] Adjust kafka v2 sink to support negative GZIP compression level Fixes: #136492 Release note (bug fix): fixed a bug introduced in kafka v2 sink. Previously, the Kafka v2 sink configuration did not accept negative values for the compression level setting, despite -1 and -2 having special meanings within the system. Action required: Please update the corresponding documentation to reflect the correct compression level with range [-2,9] --- pkg/ccl/changefeedccl/BUILD.bazel | 1 - pkg/ccl/changefeedccl/sink_kafka_v2.go | 4 ++-- pkg/ccl/changefeedccl/sink_kafka_v2_test.go | 18 ++++++++++++++++++ 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index f4cf62c100da..b379dd1f01f4 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -173,7 +173,6 @@ go_library( "@com_github_gogo_protobuf//types", "@com_github_google_btree//:btree", "@com_github_ibm_sarama//:sarama", - "@com_github_klauspost_compress//gzip", "@com_github_klauspost_compress//zstd", "@com_github_klauspost_pgzip//:pgzip", "@com_github_lib_pq//:pq", diff --git a/pkg/ccl/changefeedccl/sink_kafka_v2.go b/pkg/ccl/changefeedccl/sink_kafka_v2.go index 31ae991fccd1..2bf3053a5e22 100644 --- a/pkg/ccl/changefeedccl/sink_kafka_v2.go +++ b/pkg/ccl/changefeedccl/sink_kafka_v2.go @@ -6,6 +6,7 @@ package changefeedccl import ( + "compress/gzip" "context" "crypto/tls" "crypto/x509" @@ -28,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" - "github.com/klauspost/compress/gzip" "github.com/klauspost/compress/zstd" "github.com/rcrowley/go-metrics" "github.com/twmb/franz-go/pkg/kadm" @@ -569,7 +569,7 @@ func validateCompressionLevel(compressionType compressionCodec, level int) error case sarama.CompressionNone: return nil case sarama.CompressionGZIP: - if level < gzip.NoCompression || level > gzip.BestCompression { + if level < gzip.HuffmanOnly || level > gzip.BestCompression { return errors.Errorf(`invalid gzip compression level: %d`, level) } case sarama.CompressionSnappy: diff --git a/pkg/ccl/changefeedccl/sink_kafka_v2_test.go b/pkg/ccl/changefeedccl/sink_kafka_v2_test.go index 4730dfe49e2d..d52460a8418e 100644 --- a/pkg/ccl/changefeedccl/sink_kafka_v2_test.go +++ b/pkg/ccl/changefeedccl/sink_kafka_v2_test.go @@ -448,6 +448,18 @@ func TestKafkaSinkClientV2_CompressionOpts(t *testing.T) { level: "9", expected: kgo.GzipCompression().WithLevel(9), }, + { + name: "gzip level -1", + codec: "GZIP", + level: "-1", + expected: kgo.GzipCompression().WithLevel(-1), + }, + { + name: "gzip level -2", + codec: "GZIP", + level: "-2", + expected: kgo.GzipCompression().WithLevel(-2), + }, { name: "snappy no level", codec: "SNAPPY", @@ -481,6 +493,12 @@ func TestKafkaSinkClientV2_CompressionOpts(t *testing.T) { level: "100", shouldErr: true, }, + { + name: "invalid gzip level '-3'", + codec: "GZIP", + level: "-3", + shouldErr: true, + }, { name: "invalid snappy level", codec: "SNAPPY",