Skip to content

Commit

Permalink
Revert to simple BrokerConfiguration structure with slice of addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
epapbak committed Jan 15, 2024
1 parent fec884b commit 237f801
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 87 deletions.
36 changes: 16 additions & 20 deletions clowder/clowder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ package clowder_test

import (
"fmt"
"testing"

"github.com/RedHatInsights/insights-operator-utils/clowder"
"github.com/RedHatInsights/insights-operator-utils/kafka"
"github.com/RedHatInsights/insights-operator-utils/postgres"
api "github.com/redhatinsights/app-common-go/pkg/api/v1"
"github.com/stretchr/testify/assert"
"github.com/tisnik/go-capture"
"testing"
)

func TestUseDBConfig(t *testing.T) {
Expand Down Expand Up @@ -54,7 +55,7 @@ func TestUseDBConfig(t *testing.T) {
func TestUseClowderTopicsTopicFound(t *testing.T) {
originalTopicName := "topic1"
clowderTopicName := "NewTopicName"
brokerCfg := kafka.SingleBrokerConfiguration{
brokerCfg := kafka.BrokerConfiguration{
Topic: originalTopicName,
}
kafkaTopics := map[string]api.TopicConfig{
Expand All @@ -73,7 +74,7 @@ func TestUseClowderTopicsTopicFound(t *testing.T) {
func TestUseClowderTopicsTopicFoundMultiBrokers(t *testing.T) {
originalTopicName := "topic1"
clowderTopicName := "NewTopicName"
brokerCfg := kafka.MultiBrokerConfiguration{
brokerCfg := kafka.BrokerConfiguration{
Topic: originalTopicName,
}
kafkaTopics := map[string]api.TopicConfig{
Expand All @@ -92,7 +93,7 @@ func TestUseClowderTopicsTopicFoundMultiBrokers(t *testing.T) {
func TestUseClowderTopicsTopicNotFound(t *testing.T) {
originalTopicName := "topic1"

brokerCfg := kafka.SingleBrokerConfiguration{
brokerCfg := kafka.BrokerConfiguration{
Topic: originalTopicName,
}
kafkaTopics := map[string]api.TopicConfig{
Expand All @@ -109,7 +110,7 @@ func TestUseClowderTopicsTopicNotFound(t *testing.T) {
}

func TestUseBrokerConfigNoKafkaConfig(t *testing.T) {
brokerCfg := kafka.MultiBrokerConfiguration{}
brokerCfg := kafka.BrokerConfiguration{}
loadedConfig := api.AppConfig{}

output, _ := capture.StandardOutput(func() {
Expand All @@ -119,7 +120,7 @@ func TestUseBrokerConfigNoKafkaConfig(t *testing.T) {
}

func TestUseBrokerConfigNoKafkaBrokers(t *testing.T) {
brokerCfg := kafka.MultiBrokerConfiguration{}
brokerCfg := kafka.BrokerConfiguration{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{},
}
Expand All @@ -134,7 +135,7 @@ func TestUseBrokerConfigMultipleKafkaBrokers(t *testing.T) {
addr1 := "test_broker"
addr2 := "test_broker_backup"
port := 12345
brokerCfg := kafka.MultiBrokerConfiguration{}
brokerCfg := kafka.BrokerConfiguration{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
Expand All @@ -156,7 +157,7 @@ func TestUseBrokerConfigMultipleKafkaBrokers(t *testing.T) {

func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
addr := "test_broker"
brokerCfg := kafka.MultiBrokerConfiguration{}
brokerCfg := kafka.BrokerConfiguration{}
loadedConfig := api.AppConfig{
Kafka: &api.KafkaConfig{
Brokers: []api.BrokerConfig{
Expand All @@ -173,7 +174,7 @@ func TestUseBrokerConfigNoAuthNoPort(t *testing.T) {
}

func TestUseBrokerConfigNoAuth(t *testing.T) {
brokerCfg := kafka.MultiBrokerConfiguration{}
brokerCfg := kafka.BrokerConfiguration{}
port := 12345
addr := "test_broker"
loadedConfig := api.AppConfig{
Expand All @@ -192,7 +193,7 @@ func TestUseBrokerConfigNoAuth(t *testing.T) {
}

func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
brokerCfg := kafka.MultiBrokerConfiguration{}
brokerCfg := kafka.BrokerConfiguration{}
port := 12345
addr := "test_broker"
authType := api.BrokerConfigAuthtypeSasl
Expand All @@ -217,7 +218,7 @@ func TestUseBrokerConfigAuthEnabledNoSasl(t *testing.T) {
}

func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) {
brokerCfg := kafka.MultiBrokerConfiguration{}
brokerCfg := kafka.BrokerConfiguration{}
port := 12345
addr := "test_broker"
addr2 := "test_broker_backup"
Expand Down Expand Up @@ -263,14 +264,9 @@ func TestUseBrokerConfigAuthEnabledWithSaslConfig(t *testing.T) {

assert.Equal(t, []string{fmt.Sprintf("%s:%d", addr, port), fmt.Sprintf("%s:%d", addr2, port)}, brokerCfg.Addresses)
assert.Contains(t, output, "kafka is configured to use authentication")
assert.Equal(t, 2, len(brokerCfg.SASLConfigs))
assert.Equal(t, saslUsr, brokerCfg.SASLConfigs[0].SaslUsername)
assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[0].SaslPassword)
assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[0].SaslMechanism)
assert.Equal(t, protocol, brokerCfg.SASLConfigs[0].SecurityProtocol)
assert.Equal(t, saslUsr, brokerCfg.SaslUsername)
assert.Equal(t, saslPwd, brokerCfg.SaslPassword)
assert.Equal(t, saslMechanism, brokerCfg.SaslMechanism)
assert.Equal(t, protocol, brokerCfg.SecurityProtocol)

assert.Equal(t, saslUsr2, brokerCfg.SASLConfigs[1].SaslUsername)
assert.Equal(t, saslPwd, brokerCfg.SASLConfigs[1].SaslPassword)
assert.Equal(t, saslMechanism, brokerCfg.SASLConfigs[1].SaslMechanism)
assert.Equal(t, protocol, brokerCfg.SASLConfigs[1].SecurityProtocol)
}
57 changes: 24 additions & 33 deletions clowder/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package clowder

import (
"fmt"

"github.com/RedHatInsights/insights-operator-utils/kafka"
api "github.com/redhatinsights/app-common-go/pkg/api/v1"
)
Expand All @@ -29,33 +30,33 @@ const (

// UseBrokerConfig tries to replace parts of the BrokerConfiguration with the values
// loaded by Clowder
func UseBrokerConfig(brokerCfg *kafka.MultiBrokerConfiguration, loadedConfig *api.AppConfig) {
func UseBrokerConfig(brokerCfg *kafka.BrokerConfiguration, loadedConfig *api.AppConfig) {
if loadedConfig.Kafka != nil && len(loadedConfig.Kafka.Brokers) > 0 {
brokerCfg.Addresses = make([]string, len(loadedConfig.Kafka.Brokers))
brokerCfg.SASLConfigs = make([]kafka.SASLConfiguration, len(loadedConfig.Kafka.Brokers))
for i, broker := range loadedConfig.Kafka.Brokers {
if broker.Port != nil {
brokerCfg.Addresses[i] = fmt.Sprintf("%s:%d", broker.Hostname, *broker.Port)
} else {
brokerCfg.Addresses[i] = broker.Hostname
}
// SSL config
if broker.Authtype != nil {
fmt.Println("kafka is configured to use authentication")
if broker.Sasl != nil {
// we are trusting that these values are set and
// dereferencing the pointers without any check...
brokerCfg.SASLConfigs[i].SaslUsername = *broker.Sasl.Username
brokerCfg.SASLConfigs[i].SaslPassword = *broker.Sasl.Password
brokerCfg.SASLConfigs[i].SaslMechanism = *broker.Sasl.SaslMechanism
brokerCfg.SASLConfigs[i].SecurityProtocol = *broker.SecurityProtocol
}
// SSL config
clowderCfg := loadedConfig.Kafka.Brokers[0]
if clowderCfg.Authtype != nil {
fmt.Println("kafka is configured to use authentication")
if clowderCfg.Sasl != nil {
// we are trusting that these values are set and
// dereferencing the pointers without any check...
brokerCfg.SaslUsername = *clowderCfg.Sasl.Username
brokerCfg.SaslPassword = *clowderCfg.Sasl.Password
brokerCfg.SaslMechanism = *clowderCfg.Sasl.SaslMechanism
brokerCfg.SecurityProtocol = *clowderCfg.SecurityProtocol

if caPath, err := loadedConfig.KafkaCa(broker); err == nil {
brokerCfg.SASLConfigs[i].CertPath = caPath
}
} else {
fmt.Println(noSaslConfig)
if caPath, err := loadedConfig.KafkaCa(clowderCfg); err == nil {
brokerCfg.CertPath = caPath
}
} else {
fmt.Println(noSaslConfig)
}
}
} else {
Expand All @@ -65,21 +66,11 @@ func UseBrokerConfig(brokerCfg *kafka.MultiBrokerConfiguration, loadedConfig *ap

// UseClowderTopics tries to replace the configured topic with the corresponding
// topic loaded by Clowder
func UseClowderTopics(brokerCfg interface{}, kafkaTopics map[string]api.TopicConfig) {
switch cfg := brokerCfg.(type) {
case *kafka.SingleBrokerConfiguration:
if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok {
cfg.Topic = clowderTopic.Name
} else {
fmt.Printf(noTopicMapping, cfg.Topic)
}
case *kafka.MultiBrokerConfiguration:
if clowderTopic, ok := kafkaTopics[cfg.Topic]; ok {
cfg.Topic = clowderTopic.Name
} else {
fmt.Printf(noTopicMapping, cfg.Topic)
}
default:
fmt.Printf("Unknown Broker configuration type")
func UseClowderTopics(brokerCfg *kafka.BrokerConfiguration, kafkaTopics map[string]api.TopicConfig) {
if clowderTopic, ok := kafkaTopics[brokerCfg.Topic]; ok {
brokerCfg.Topic = clowderTopic.Name
} else {
fmt.Printf(noTopicMapping, brokerCfg.Topic)
}

}
31 changes: 4 additions & 27 deletions kafka/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,9 @@ import (
"github.com/rs/zerolog/log"
)

// SASLConfiguration represents configuration of SASL authentication for
// a given Kafka broker
type SASLConfiguration struct {
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
CertPath string `mapstructure:"cert_path" toml:"cert_path"`
SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"`
SaslUsername string `mapstructure:"sasl_username" toml:"sasl_username"`
SaslPassword string `mapstructure:"sasl_password" toml:"sasl_password"`
}

// SingleBrokerConfiguration represents configuration of a single-instance Kafka broker
type SingleBrokerConfiguration struct {
Address string `mapstructure:"address" toml:"address"`
// BrokerConfiguration represents configuration of a single-instance Kafka broker
type BrokerConfiguration struct {
Addresses []string `mapstructure:"address" toml:"address"`
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
CertPath string `mapstructure:"cert_path" toml:"cert_path"`
SaslMechanism string `mapstructure:"sasl_mechanism" toml:"sasl_mechanism"`
Expand All @@ -54,21 +44,8 @@ type SingleBrokerConfiguration struct {
Enabled bool `mapstructure:"enabled" toml:"enabled"`
}

// MultiBrokerConfiguration represents configuration of Kafka broker with
// multiple instances running on different hosts
type MultiBrokerConfiguration struct {
Addresses []string `mapstructure:"addresses" toml:"addresses"`
SecurityProtocol string `mapstructure:"security_protocol" toml:"security_protocol"`
SASLConfigs []SASLConfiguration `mapstructure:"sasl_configs" toml:"sasl_configs"`
Topic string `mapstructure:"topic" toml:"topic"`
Timeout time.Duration `mapstructure:"timeout" toml:"timeout"`
Group string `mapstructure:"group" toml:"group"`
ClientID string `mapstructure:"client_id" toml:"client_id"`
Enabled bool `mapstructure:"enabled" toml:"enabled"`
}

// SaramaConfigFromBrokerConfig returns a Config struct from broker.Configuration parameters
func SaramaConfigFromBrokerConfig(cfg *SingleBrokerConfiguration) (*sarama.Config, error) {
func SaramaConfigFromBrokerConfig(cfg *BrokerConfiguration) (*sarama.Config, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V0_10_2_0

Expand Down
15 changes: 8 additions & 7 deletions kafka/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,23 @@
package kafka_test

import (
"github.com/RedHatInsights/insights-operator-utils/kafka"
"github.com/RedHatInsights/insights-operator-utils/tests/helpers"
"testing"
"time"

"github.com/RedHatInsights/insights-operator-utils/kafka"
"github.com/RedHatInsights/insights-operator-utils/tests/helpers"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
)

func TestSaramaConfigFromBrokerConfig(t *testing.T) {
cfg := kafka.SingleBrokerConfiguration{}
cfg := kafka.BrokerConfiguration{}
saramaConfig, err := kafka.SaramaConfigFromBrokerConfig(&cfg)
helpers.FailOnError(t, err)
assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version)

cfg = kafka.SingleBrokerConfiguration{
cfg = kafka.BrokerConfiguration{
Timeout: time.Second,
}
saramaConfig, err = kafka.SaramaConfigFromBrokerConfig(&cfg)
Expand All @@ -41,7 +42,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) {
assert.Equal(t, time.Second, saramaConfig.Net.WriteTimeout)
assert.Equal(t, "sarama", saramaConfig.ClientID) // default value

cfg = kafka.SingleBrokerConfiguration{
cfg = kafka.BrokerConfiguration{
SecurityProtocol: "SSL",
}

Expand All @@ -50,7 +51,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) {
assert.Equal(t, sarama.V0_10_2_0, saramaConfig.Version)
assert.True(t, saramaConfig.Net.TLS.Enable)

cfg = kafka.SingleBrokerConfiguration{
cfg = kafka.BrokerConfiguration{
SecurityProtocol: "SASL_SSL",
SaslMechanism: "PLAIN",
SaslUsername: "username",
Expand Down Expand Up @@ -80,7 +81,7 @@ func TestSaramaConfigFromBrokerConfig(t *testing.T) {
}

func TestBadConfiguration(t *testing.T) {
cfg := kafka.SingleBrokerConfiguration{
cfg := kafka.BrokerConfiguration{
SecurityProtocol: "SSL",
CertPath: "missing_path",
}
Expand Down

0 comments on commit 237f801

Please sign in to comment.