diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 7e24cfc038eb..2efd6886edbd 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -174,7 +174,6 @@ go_library( "@com_github_gogo_protobuf//types", "@com_github_google_btree//:btree", "@com_github_ibm_sarama//:sarama", - "@com_github_klauspost_compress//gzip", "@com_github_klauspost_compress//zstd", "@com_github_klauspost_pgzip//:pgzip", "@com_github_lib_pq//:pq", diff --git a/pkg/ccl/changefeedccl/sink_kafka_v2.go b/pkg/ccl/changefeedccl/sink_kafka_v2.go index 31ae991fccd1..2bf3053a5e22 100644 --- a/pkg/ccl/changefeedccl/sink_kafka_v2.go +++ b/pkg/ccl/changefeedccl/sink_kafka_v2.go @@ -6,6 +6,7 @@ package changefeedccl import ( + "compress/gzip" "context" "crypto/tls" "crypto/x509" @@ -28,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" - "github.com/klauspost/compress/gzip" "github.com/klauspost/compress/zstd" "github.com/rcrowley/go-metrics" "github.com/twmb/franz-go/pkg/kadm" @@ -569,7 +569,7 @@ func validateCompressionLevel(compressionType compressionCodec, level int) error case sarama.CompressionNone: return nil case sarama.CompressionGZIP: - if level < gzip.NoCompression || level > gzip.BestCompression { + if level < gzip.HuffmanOnly || level > gzip.BestCompression { return errors.Errorf(`invalid gzip compression level: %d`, level) } case sarama.CompressionSnappy: diff --git a/pkg/ccl/changefeedccl/sink_kafka_v2_test.go b/pkg/ccl/changefeedccl/sink_kafka_v2_test.go index 4730dfe49e2d..d52460a8418e 100644 --- a/pkg/ccl/changefeedccl/sink_kafka_v2_test.go +++ b/pkg/ccl/changefeedccl/sink_kafka_v2_test.go @@ -448,6 +448,18 @@ func TestKafkaSinkClientV2_CompressionOpts(t *testing.T) { level: "9", expected: kgo.GzipCompression().WithLevel(9), }, + { + name: "gzip level -1", + codec: "GZIP", + level: "-1", + expected: kgo.GzipCompression().WithLevel(-1), + }, + { + name: "gzip level -2", + codec: "GZIP", + level: "-2", + expected: kgo.GzipCompression().WithLevel(-2), + }, { name: "snappy no level", codec: "SNAPPY", @@ -481,6 +493,12 @@ func TestKafkaSinkClientV2_CompressionOpts(t *testing.T) { level: "100", shouldErr: true, }, + { + name: "invalid gzip level '-3'", + codec: "GZIP", + level: "-3", + shouldErr: true, + }, { name: "invalid snappy level", codec: "SNAPPY",