Skip to content

Commit

Permalink
Do not set a fixed Kafka version in sarama config
Browse files Browse the repository at this point in the history
  • Loading branch information
epapbak committed Jan 17, 2024
1 parent cb75f7d commit 35b5c4f
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 8 deletions.
1 change: 0 additions & 1 deletion kafka/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type BrokerConfiguration struct {
// SaramaConfigFromBrokerConfig returns a Config struct from broker.Configuration parameters
func SaramaConfigFromBrokerConfig(cfg *BrokerConfiguration) (*sarama.Config, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V0_10_2_0

if cfg.Timeout > 0 {
saramaConfig.Net.DialTimeout = cfg.Timeout
Expand Down
9 changes: 2 additions & 7 deletions kafka/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@ import (

func TestSaramaConfigFromBrokerConfig(t *testing.T) {
cfg := kafka.BrokerConfiguration{}
saramaConfig, err := kafka.SaramaConfigFromBrokerConfig(&cfg)
_, err := kafka.SaramaConfigFromBrokerConfig(&cfg)
helpers.FailOnError(t, err)
assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version)

cfg = kafka.BrokerConfiguration{
Timeout: time.Second,
}
saramaConfig, err = kafka.SaramaConfigFromBrokerConfig(&cfg)
saramaConfig, err := kafka.SaramaConfigFromBrokerConfig(&cfg)
helpers.FailOnError(t, err)
assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version)
assert.Equal(t, time.Second, saramaConfig.Net.DialTimeout)
assert.Equal(t, time.Second, saramaConfig.Net.ReadTimeout)
assert.Equal(t, time.Second, saramaConfig.Net.WriteTimeout)
Expand All @@ -48,7 +46,6 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) {

saramaConfig, err = kafka.SaramaConfigFromBrokerConfig(&cfg)
helpers.FailOnError(t, err)
assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version)
assert.True(t, saramaConfig.Net.TLS.Enable)

cfg = kafka.BrokerConfiguration{
Expand All @@ -60,7 +57,6 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) {
}
saramaConfig, err = kafka.SaramaConfigFromBrokerConfig(&cfg)
helpers.FailOnError(t, err)
assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version)
assert.True(t, saramaConfig.Net.TLS.Enable)
assert.True(t, saramaConfig.Net.SASL.Enable)
assert.Equal(t, sarama.SASLMechanism("PLAIN"), saramaConfig.Net.SASL.Mechanism)
Expand All @@ -71,7 +67,6 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) {
cfg.SaslMechanism = "SCRAM-SHA-512"
saramaConfig, err = kafka.SaramaConfigFromBrokerConfig(&cfg)
helpers.FailOnError(t, err)
assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version)
assert.True(t, saramaConfig.Net.TLS.Enable)
assert.True(t, saramaConfig.Net.SASL.Enable)
assert.Equal(t, sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512), saramaConfig.Net.SASL.Mechanism)
Expand Down

0 comments on commit 35b5c4f

Please sign in to comment.