Skip to content

Commit

Permalink
Adjust kafka v2 sink to support negative GZIP compression level
Browse files Browse the repository at this point in the history
Fixes: #13649
Release note (bug fix): fixed a bug introduced in kafka v2 sink.
Previously, the Kafka v2 sink configuration did not accept negative
values for the compression level setting, despite -1 and -2 having
special meanings within the system.

Action required: Please update the corresponding documentation
to reflect the correct compression level with range [-2,9]
  • Loading branch information
yaothao committed Dec 18, 2024
1 parent d085f07 commit 45d36eb
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 3 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ go_library(
"@com_github_gogo_protobuf//types",
"@com_github_google_btree//:btree",
"@com_github_ibm_sarama//:sarama",
"@com_github_klauspost_compress//gzip",
"@com_github_klauspost_compress//zstd",
"@com_github_klauspost_pgzip//:pgzip",
"@com_github_lib_pq//:pq",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/sink_kafka_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package changefeedccl

import (
"compress/gzip"
"context"
"crypto/tls"
"crypto/x509"
Expand All @@ -28,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/zstd"
"github.com/rcrowley/go-metrics"
"github.com/twmb/franz-go/pkg/kadm"
Expand Down Expand Up @@ -569,7 +569,7 @@ func validateCompressionLevel(compressionType compressionCodec, level int) error
case sarama.CompressionNone:
return nil
case sarama.CompressionGZIP:
if level < gzip.NoCompression || level > gzip.BestCompression {
if level < gzip.HuffmanOnly || level > gzip.BestCompression {
return errors.Errorf(`invalid gzip compression level: %d`, level)
}
case sarama.CompressionSnappy:
Expand Down
18 changes: 18 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,18 @@ func TestKafkaSinkClientV2_CompressionOpts(t *testing.T) {
level: "9",
expected: kgo.GzipCompression().WithLevel(9),
},
{
name: "gzip level -1",
codec: "GZIP",
level: "-1",
expected: kgo.GzipCompression().WithLevel(-1),
},
{
name: "gzip level -2",
codec: "GZIP",
level: "-2",
expected: kgo.GzipCompression().WithLevel(-2),
},
{
name: "snappy no level",
codec: "SNAPPY",
Expand Down Expand Up @@ -481,6 +493,12 @@ func TestKafkaSinkClientV2_CompressionOpts(t *testing.T) {
level: "100",
shouldErr: true,
},
{
name: "invalid gzip level '-3'",
codec: "GZIP",
level: "-3",
shouldErr: true,
},
{
name: "invalid snappy level",
codec: "SNAPPY",
Expand Down

0 comments on commit 45d36eb

Please sign in to comment.