Skip to content

Commit

Permalink
support for multiple broker addresses when using clowder config
Browse files Browse the repository at this point in the history
  • Loading branch information
epapbak committed Jan 4, 2024
1 parent e37a6f7 commit c991576
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 13 deletions.
32 changes: 28 additions & 4 deletions clowder/clowder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,30 @@ func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) {
assert.Contains(t, output, clowder.NoBrokerCfg)
}

func TestUseBrokerConfigMultipleKafkaBrokers(t *testing.T) {
addr1 := "test_broker"
addr2 := "test_broker_backup"
port := 12345
brokerCfg := kafka.BrokerConfiguration{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
{
Hostname: addr1,
Port: &port,
},
{
Hostname: addr2,
Port: nil,
},
},
},
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr1, port), addr2}, brokerCfg.Addresses)
}

func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
addr := "test_broker"
brokerCfg := kafka.BrokerConfiguration{}
Expand All @@ -126,7 +150,7 @@ func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, addr, brokerCfg.Address)
assert.Equal(t, []string{addr}, brokerCfg.Addresses)
}

func TestUseBrokerConfigNoAuth(t *testing.T) {
Expand All @@ -145,7 +169,7 @@ func TestUseBrokerConfigNoAuth(t *testing.T) {
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses)
}

func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
Expand All @@ -169,7 +193,7 @@ func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})

assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses)
assert.Contains(t, output, clowder.NoSaslCfg)
}

Expand Down Expand Up @@ -205,7 +229,7 @@ func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})

assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses)
assert.Contains(t, output, "kafka is configured to use authentication")
assert.Equal(t, saslUsr, brokerCfg.SaslUsername)
assert.Equal(t, saslPwd, brokerCfg.SaslPassword)
Expand Down
16 changes: 8 additions & 8 deletions clowder/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ const (
// UseBrokerConfig tries to replace parts of the BrokerConfiguration with the values
// loaded by Clowder
func UseBrokerConfig(brokerCfg *kafka.BrokerConfiguration, loadedConfig *api.AppConfig) {
// make sure broker(s) are configured in Clowder
if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 {
broker := loadedConfig.Kafka.Brokers[0]
// port can be empty in api, so taking it into account
if broker.Port != nil {
brokerCfg.Address = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
} else {
brokerCfg.Address = broker.Hostname
brokerCfg.Addresses = make([]string, len(loadedConfig.Kafka.Brokers))
for i, broker := range loadedConfig.Kafka.Brokers {
if broker.Port != nil {
brokerCfg.Addresses[i] = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
} else {
brokerCfg.Addresses[i] = broker.Hostname
}
}

// SSL config
broker := loadedConfig.Kafka.Brokers[0]
if broker.Authtype != nil {
fmt.Println("kafka is configured to use authentication")
if broker.Sasl != nil {
Expand Down
2 changes: 1 addition & 1 deletion kafka/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

// BrokerConfiguration represents configuration of Kafka broker
type BrokerConfiguration struct {
Address string `mapstructure:"address" toml:"address"`
Addresses []string `mapstructure:"addresses" toml:"addresses"`
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
CertPath string `mapstructure:"cert_path" toml:"cert_path"`
SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"`
Expand Down

0 comments on commit c991576

Please sign in to comment.