diff --git a/clowder/clowder_test.go b/clowder/clowder_test.go index d82f0aa..0e8de25 100644 --- a/clowder/clowder_test.go +++ b/clowder/clowder_test.go @@ -111,6 +111,30 @@ func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) { assert.Contains(t, output, clowder.NoBrokerCfg) } +func TestUseBrokerConfigMultipleKafkaBrokers(t *testing.T) { + addr1 := "test_broker" + addr2 := "test_broker_backup" + port := 12345 + brokerCfg := kafka.BrokerConfiguration{} + loadedConfig := api.AppConfig{ + Kafka: &api.KafkaConfig{ + Brokers: []api.BrokerConfig{ + { + Hostname: addr1, + Port: &port, + }, + { + Hostname: addr2, + Port: nil, + }, + }, + }, + } + + clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) + assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr1, port), addr2}, brokerCfg.Addresses) +} + func TestUseBrokerConfigNoAuthNoPort(t *testing.T) { addr := "test_broker" brokerCfg := kafka.BrokerConfiguration{} @@ -126,7 +150,7 @@ func TestUseBrokerConfigNoAuthNoPort(t *testing.T) { } clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) - assert.Equal(t, addr, brokerCfg.Address) + assert.Equal(t, []string{addr}, brokerCfg.Addresses) } func TestUseBrokerConfigNoAuth(t *testing.T) { @@ -145,7 +169,7 @@ func TestUseBrokerConfigNoAuth(t *testing.T) { } clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) - assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address) + assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses) } func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) { @@ -169,7 +193,7 @@ func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) { clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) }) - assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address) + assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses) assert.Contains(t, output, clowder.NoSaslCfg) } @@ -205,7 +229,7 @@ func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) { clowder.UseBrokerConfig(&brokerCfg, &loadedConfig) }) - assert.Equal(t, fmt.Sprintf("%s:%d", addr, port), brokerCfg.Address) + assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port)}, brokerCfg.Addresses) assert.Contains(t, output, "kafka is configured to use authentication") assert.Equal(t, saslUsr, brokerCfg.SaslUsername) assert.Equal(t, saslPwd, brokerCfg.SaslPassword) diff --git a/clowder/kafka.go b/clowder/kafka.go index 8a2bfe6..eb82f8b 100644 --- a/clowder/kafka.go +++ b/clowder/kafka.go @@ -30,17 +30,17 @@ const ( // UseBrokerConfig tries to replace parts of the BrokerConfiguration with the values // loaded by Clowder func UseBrokerConfig(brokerCfg *kafka.BrokerConfiguration, loadedConfig *api.AppConfig) { - // make sure broker(s) are configured in Clowder if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 { - broker := loadedConfig.Kafka.Brokers[0] - // port can be empty in api, so taking it into account - if broker.Port != nil { - brokerCfg.Address = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port) - } else { - brokerCfg.Address = broker.Hostname + brokerCfg.Addresses = make([]string, len(loadedConfig.Kafka.Brokers)) + for i, broker := range loadedConfig.Kafka.Brokers { + if broker.Port != nil { + brokerCfg.Addresses[i] = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port) + } else { + brokerCfg.Addresses[i] = broker.Hostname + } } - // SSL config + broker := loadedConfig.Kafka.Brokers[0] if broker.Authtype != nil { fmt.Println("kafka is configured to use authentication") if broker.Sasl != nil { diff --git a/kafka/configuration.go b/kafka/configuration.go index 70c9a32..6189829 100644 --- a/kafka/configuration.go +++ b/kafka/configuration.go @@ -31,7 +31,7 @@ import ( // BrokerConfiguration represents configuration of Kafka broker type BrokerConfiguration struct { - Address string `mapstructure:"address" toml:"address"` + Addresses []string `mapstructure:"addresses" toml:"addresses"` SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"` CertPath string `mapstructure:"cert_path" toml:"cert_path"` SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"`