Skip to content

Commit

Permalink
support for multiple broker addresses when using clowder config
Browse files Browse the repository at this point in the history
  • Loading branch information
epapbak committed Jan 4, 2024
1 parent e37a6f7 commit a17285a
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 13 deletions.
8 changes: 4 additions & 4 deletions clowder/clowder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, addr, brokerCfg.Address)
assert.Equal(t, []string{addr}, brokerCfg.Addresses)
}

func TestUseBrokerConfigNoAuth(t *testing.T) {
Expand All @@ -145,7 +145,7 @@ func TestUseBrokerConfigNoAuth(t *testing.T) {
}

clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses)
}

func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
Expand All @@ -169,7 +169,7 @@ func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})

assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses)
assert.Contains(t, output, clowder.NoSaslCfg)
}

Expand Down Expand Up @@ -205,7 +205,7 @@ func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) {
clowder.UseBrokerConfig(&brokerCfg, &loadedConfig)
})

assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address)
assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses)
assert.Contains(t, output, "kafka is configured to use authentication")
assert.Equal(t, saslUsr, brokerCfg.SaslUsername)
assert.Equal(t, saslPwd, brokerCfg.SaslPassword)
Expand Down
16 changes: 8 additions & 8 deletions clowder/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ const (
// UseBrokerConfig tries to replace parts of the BrokerConfiguration with the values
// loaded by Clowder
func UseBrokerConfig(brokerCfg *kafka.BrokerConfiguration, loadedConfig *api.AppConfig) {
// make sure broker(s) are configured in Clowder
if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 {
broker := loadedConfig.Kafka.Brokers[0]
// port can be empty in api, so taking it into account
if broker.Port != nil {
brokerCfg.Address = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
} else {
brokerCfg.Address = broker.Hostname
brokerCfg.Addresses = make([]string, len(loadedConfig.Kafka.Brokers))
for i, broker := range loadedConfig.Kafka.Brokers {
if broker.Port != nil {
brokerCfg.Addresses[i] = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
} else {
brokerCfg.Addresses[i] = broker.Hostname
}
}

// SSL config
broker := loadedConfig.Kafka.Brokers[0]
if broker.Authtype != nil {
fmt.Println("kafka is configured to use authentication")
if broker.Sasl != nil {
Expand Down
2 changes: 1 addition & 1 deletion kafka/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

// BrokerConfiguration represents configuration of Kafka broker
type BrokerConfiguration struct {
Address string `mapstructure:"address" toml:"address"`
Addresses []string `mapstructure:"addresses" toml:"addresses"`
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
CertPath string `mapstructure:"cert_path" toml:"cert_path"`
SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"`
Expand Down

0 comments on commit a17285a

Please sign in to comment.