Skip to content

Commit

Permalink
Merge pull request #684 from RedHatInsights/support_multiple_brokers
Browse files Browse the repository at this point in the history
[RHCLOUD-29528] Support multiple kafka brokers
  • Loading branch information
petracihalova authored Jan 26, 2024
2 parents 3cf6b06 + bc06da3 commit 188f918
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 25 deletions.
12 changes: 6 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var parsedConfig *SourcesApiConfig
type SourcesApiConfig struct {
AppName string
Hostname string
KafkaBrokerConfig clowder.BrokerConfig
KafkaBrokerConfig []clowder.BrokerConfig
KafkaTopics map[string]string
KafkaGroupID string
MetricsPort int
Expand Down Expand Up @@ -129,7 +129,7 @@ func Get() *SourcesApiConfig {
}

// Grab the first broker
options.SetDefault("KafkaBrokerConfig", cfg.Kafka.Brokers[0])
options.SetDefault("KafkaBrokerConfig", cfg.Kafka.Brokers)
// [/Kafka]

options.SetDefault("LogGroup", cfg.Logging.Cloudwatch.LogGroup)
Expand Down Expand Up @@ -182,10 +182,10 @@ func Get() *SourcesApiConfig {
log.Fatalf(`the provided "QUEUE_PORT", "%s", is not a valid integer: %s`, kafkaPort, err)
}

brokerConfig := clowder.BrokerConfig{
brokerConfig := []clowder.BrokerConfig{{
Hostname: os.Getenv("QUEUE_HOST"),
Port: &port,
}
}}

options.SetDefault("KafkaBrokerConfig", brokerConfig)
}
Expand Down Expand Up @@ -282,8 +282,8 @@ func Get() *SourcesApiConfig {
options.SetDefault("psks", strings.Split(os.Getenv("SOURCES_PSKS"), ","))

// Grab the Kafka Sasl Settings.
var brokerConfig clowder.BrokerConfig
bcRaw, ok := options.Get("KafkaBrokerConfig").(clowder.BrokerConfig)
var brokerConfig []clowder.BrokerConfig
bcRaw, ok := options.Get("KafkaBrokerConfig").([]clowder.BrokerConfig)
if ok {
brokerConfig = bcRaw
}
Expand Down
2 changes: 1 addition & 1 deletion internal/events/event_stream_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (esp *EventStreamSender) RaiseEvent(eventType string, payload []byte, heade
logging.Log.Debugf("publishing message %v to topic %q...", eventType, eventStreamTopic)

kf, err := kafka.GetWriter(&kafka.Options{
BrokerConfig: &config.KafkaBrokerConfig,
BrokerConfig: config.KafkaBrokerConfig,
Topic: eventStreamTopic,
Logger: logging.Log,
})
Expand Down
38 changes: 26 additions & 12 deletions kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,23 @@ func GetReader(conf *Options) (*Reader, error) {
return nil, errors.New("could not create Kafka reader: the provided configuration is empty")
}

if conf.BrokerConfig.Port == nil || *conf.BrokerConfig.Port == 0 {
return nil, errors.New("could not create a Kafka reader: the provided port is empty")
for _, c := range conf.BrokerConfig {
if c.Port == nil || *c.Port == 0 {
return nil, errors.New("could not create a Kafka reader: a provided port is empty")
}
}

if conf.Topic == "" {
return nil, errors.New("could not create a Kafka reader: a topic is required")
}

brokers := make([]string, len(conf.BrokerConfig))
for i, c := range conf.BrokerConfig {
brokers[i] = fmt.Sprintf("%s:%d", c.Hostname, *c.Port)
}

readerConfig := kafka.ReaderConfig{
Brokers: []string{fmt.Sprintf("%s:%d", conf.BrokerConfig.Hostname, *conf.BrokerConfig.Port)},
Brokers: brokers,
Topic: conf.Topic,
}

Expand All @@ -84,8 +91,8 @@ func GetReader(conf *Options) (*Reader, error) {

// When using managed Kafka, Clowder will add some Sasl authentication details so that the services can connect to
// it. The following code block sets up "kafka-go" to work with these settings.
if conf.BrokerConfig.Authtype != nil {
dialer, err := CreateDialer(conf.BrokerConfig)
if conf.BrokerConfig[0].Authtype != nil {
dialer, err := CreateDialer(&conf.BrokerConfig[0])
if err != nil {
return nil, fmt.Errorf(`unable to create the dialer for the Kafka reader: %w`, err)
}
Expand All @@ -98,20 +105,27 @@ func GetReader(conf *Options) (*Reader, error) {

// GetWriter returns a Kafka writer configured with the specified settings.
func GetWriter(conf *Options) (*Writer, error) {
if conf.BrokerConfig == nil {
if conf.BrokerConfig == nil || len(conf.BrokerConfig) == 0 {
return nil, errors.New("could not create Kafka writer: the provided configuration is empty")
}

if conf.BrokerConfig.Port == nil || *conf.BrokerConfig.Port == 0 {
return nil, errors.New("could not create a Kafka writer: the provided port is empty")
for _, c := range conf.BrokerConfig {
if c.Port == nil || *c.Port == 0 {
return nil, errors.New("could not create a Kafka reader: a provided port is empty")
}
}

if conf.Topic == "" {
return nil, errors.New("could not create a Kafka writer: a topic is required")
}

brokers := make([]string, len(conf.BrokerConfig))
for i, c := range conf.BrokerConfig {
brokers[i] = fmt.Sprintf("%s:%d", c.Hostname, *c.Port)
}

kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(fmt.Sprintf("%s:%d", conf.BrokerConfig.Hostname, *conf.BrokerConfig.Port)),
Addr: kafka.TCP(brokers...),
Topic: conf.Topic,
}

Expand All @@ -120,10 +134,10 @@ func GetWriter(conf *Options) (*Writer, error) {
kafkaWriter.ErrorLogger = kafka.LoggerFunc(conf.Logger.Errorf)
}

if conf.BrokerConfig.Authtype != nil {
tls := CreateTLSConfig(conf.BrokerConfig.Cacert)
if conf.BrokerConfig[0].Authtype != nil {
tls := CreateTLSConfig(conf.BrokerConfig[0].Cacert)

mechanism, err := CreateSaslMechanism(conf.BrokerConfig.Sasl)
mechanism, err := CreateSaslMechanism(conf.BrokerConfig[0].Sasl)
if err != nil {
return nil, fmt.Errorf(`unable to create Kafka producer's Sasl mechanism: %w`, err)
}
Expand Down
2 changes: 1 addition & 1 deletion kafka/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// Options is a struct for creating a reader/writer
type Options struct {
// REQUIRED FIELDS
BrokerConfig *clowder.BrokerConfig
BrokerConfig []clowder.BrokerConfig
Topic string

// only used for reader, optional.
Expand Down
2 changes: 1 addition & 1 deletion service/availability_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (acr availabilityCheckRequester) EndpointAvailabilityCheck(source *m.Source

// instantiate a producer for this source
writer, err := kafka.GetWriter(&kafka.Options{
BrokerConfig: &conf.KafkaBrokerConfig,
BrokerConfig: conf.KafkaBrokerConfig,
Topic: satelliteTopic,
Logger: acr.Logger(),
})
Expand Down
2 changes: 1 addition & 1 deletion service/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type notificationMessage struct {

func (producer *AvailabilityStatusNotifier) EmitAvailabilityStatusNotification(id *identity.Identity, emailNotificationInfo *m.EmailNotificationInfo, sourceIdentification string) error {
writer, err := kafka.GetWriter(&kafka.Options{
BrokerConfig: &conf.KafkaBrokerConfig,
BrokerConfig: conf.KafkaBrokerConfig,
Topic: notificationTopic,
Logger: l.Log,
})
Expand Down
2 changes: 1 addition & 1 deletion service/superkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func SendSuperKeyDeleteRequest(application *m.Application, headers []kafka.Heade

func produceSuperkeyRequest(m *kafka.Message) error {
writer, err := kafka.GetWriter(&kafka.Options{
BrokerConfig: &conf.KafkaBrokerConfig,
BrokerConfig: conf.KafkaBrokerConfig,
Topic: superkeyTopic,
Logger: l.Log,
})
Expand Down
4 changes: 2 additions & 2 deletions statuslistener/statuslistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (avs *AvailabilityStatusListener) subscribeToAvailabilityStatus(shutdown ch
}

kf, err := kafka.GetReader(&kafka.Options{
BrokerConfig: &config.KafkaBrokerConfig,
BrokerConfig: config.KafkaBrokerConfig,
GroupID: util.StringRef(groupID),
Topic: sourcesStatusTopic,
Logger: l.Log,
Expand Down Expand Up @@ -270,7 +270,7 @@ func (avs *AvailabilityStatusListener) healthCheckProducer() {
// send a message every <healthCheckInterval> seconds
for range time.NewTicker(healthCheckInterval * time.Second).C {
w, err := kafka.GetWriter(&kafka.Options{
BrokerConfig: &config.KafkaBrokerConfig,
BrokerConfig: config.KafkaBrokerConfig,
Topic: sourcesStatusTopic,
Logger: l.Log,
})
Expand Down

0 comments on commit 188f918

Please sign in to comment.