Skip to content

Commit

Permalink
refactor: rm global Kafka configuration
Browse files Browse the repository at this point in the history
Global Kafka configration would not work as certian services/components
require a slightly different parameters to be set for the Kafka client
initialized for them.

One solution would be to allow service/components to override certian
parameters set in the global Kafka configuration, however it would
introduce extra complexity around generating service specific KAfka
configuration from multiple sources.

Another alternative to have dedicated Kafka configuration for all the
service/components. It makes the configuration more verbose, but less
complex/error prone.
  • Loading branch information
chrisgacsal committed Sep 9, 2024
1 parent ef914df commit d87c9b7
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 105 deletions.
89 changes: 45 additions & 44 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,47 +97,48 @@ svix:
apiKey: secret
serverURL: http://localhost:8071

kafka:
brokers: 127.0.0.1:9092,127.0.0.2:9092
securityProtocol: SASL_SSL
saslMechanisms: PLAIN
saslUsername: user
saslPassword: pass
# To enable stats reporting set this value to >=5s.
# Setting this value to 0 makes reporting explicitly disabled.
statsInterval: 5s
# Set IP address family used for communicating with Kafka cluster
brokerAddressFamily: v4
# Use this configuration parameter to define how frequently the local metadata cache needs to be updated.
# It cannot be lower than 10 seconds.
topicMetadataRefreshInterval: 1m
# Use this config parameter to enable TCP keep-alive in order to prevent the Kafka broker to close idle network connection.
socketKeepAliveEnabled: true
# Set list of debug contexts to enable for librdkafka
# See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts
debugContexts:
- broker
- topic
# Consumer/Producer identifier
clientID: kafka-client-1
# Consumer group identifier
consumerGroupID: consumer-group
# Static membership identifier in consumer group
consumerGroupInstanceID: consumer-group-1
# Consumer group session and failure detection timeout.
# The consumer sends periodic heartbeats (heartbeatInterval) to indicate its liveness to the broker.
# If no hearts are received by the broker for a group member within the session timeout,
# the broker will remove the consumer from the group and trigger a rebalance.
sessionTimeout: 5m
# Consumer group session keepalive heartbeat interval
heartbeatInterval: 3s
# Automatically and periodically commit offsets in the background
enableAutoCommit: true
# Automatically store offset of last message provided to application.
# The offset store is an in-memory store of the next offset to (auto-)commit for each partition.
enableAutoOffsetStore: false
# AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range:
# * "smallest","earliest","beginning": automatically reset the offset to the smallest offset
# * "largest","latest","end": automatically reset the offset to the largest offset
# * "error": trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'.
autoOffsetReset: "error"
#sink:
# kafka:
# brokers: 127.0.0.1:9092,127.0.0.2:9092
# securityProtocol: SASL_SSL
# saslMechanisms: PLAIN
# saslUsername: user
# saslPassword: pass
# # To enable stats reporting set this value to >=5s.
# # Setting this value to 0 makes reporting explicitly disabled.
# statsInterval: 5s
# # Set IP address family used for communicating with Kafka cluster
# brokerAddressFamily: v4
# # Use this configuration parameter to define how frequently the local metadata cache needs to be updated.
# # It cannot be lower than 10 seconds.
# topicMetadataRefreshInterval: 1m
# # Use this config parameter to enable TCP keep-alive in order to prevent the Kafka broker to close idle network connection.
# socketKeepAliveEnabled: true
# # Set list of debug contexts to enable for librdkafka
# # See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts
# debugContexts:
# - broker
# - topic
# # Consumer/Producer identifier
# clientID: kafka-client-1
# # Consumer group identifier
# consumerGroupID: consumer-group
# # Static membership identifier in consumer group
# consumerGroupInstanceID: consumer-group-1
# # Consumer group session and failure detection timeout.
# # The consumer sends periodic heartbeats (heartbeatInterval) to indicate its liveness to the broker.
# # If no hearts are received by the broker for a group member within the session timeout,
# # the broker will remove the consumer from the group and trigger a rebalance.
# sessionTimeout: 5m
# # Consumer group session keepalive heartbeat interval
# heartbeatInterval: 3s
# # Automatically and periodically commit offsets in the background
# enableAutoCommit: true
# # Automatically store offset of last message provided to application.
# # The offset store is an in-memory store of the next offset to (auto-)commit for each partition.
# enableAutoOffsetStore: false
# # AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range:
# # * "smallest","earliest","beginning": automatically reset the offset to the smallest offset
# # * "largest","latest","end": automatically reset the offset to the largest offset
# # * "error": trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'.
# autoOffsetReset: "error"
6 changes: 0 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type Configuration struct {
BalanceWorker BalanceWorkerConfiguration
Notification NotificationConfiguration
Svix SvixConfig
Kafka KafkaConfig
}

// Validate validates the configuration.
Expand Down Expand Up @@ -106,10 +105,6 @@ func (c Configuration) Validate() error {
}
}

if err := c.Kafka.Validate(); err != nil {
return fmt.Errorf("kafka: %w", err)
}

return nil
}

Expand Down Expand Up @@ -145,5 +140,4 @@ func SetViperDefaults(v *viper.Viper, flags *pflag.FlagSet) {
ConfigureEvents(v)
ConfigureBalanceWorker(v)
ConfigureNotification(v)
ConfigureKafkaConfiguration(v)
}
57 changes: 28 additions & 29 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,34 @@ func TestComplete(t *testing.T) {
IngestNotifications: IngestNotificationsConfiguration{
MaxEventsInBatch: 500,
},
Kafka: KafkaConfig{
CommonConfigParams: pkgkafka.CommonConfigParams{
Brokers: "127.0.0.1:9092",
SecurityProtocol: "SASL_SSL",
SaslMechanisms: "PLAIN",
SaslUsername: "user",
SaslPassword: "pass",
ClientID: "kafka-client-1",
StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second),
BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny,
TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute),
SocketKeepAliveEnabled: true,
DebugContexts: pkgkafka.DebugContexts{
"broker",
"topic",
"consumer",
},
},
ConsumerConfigParams: pkgkafka.ConsumerConfigParams{
ConsumerGroupID: "consumer-group",
ConsumerGroupInstanceID: "consumer-group-1",
SessionTimeout: pkgkafka.TimeDurationMilliSeconds(5 * time.Minute),
HeartbeatInterval: pkgkafka.TimeDurationMilliSeconds(3 * time.Second),
EnableAutoCommit: true,
EnableAutoOffsetStore: false,
AutoOffsetReset: "error",
},
},
},
Dedupe: DedupeConfiguration{
Enabled: true,
Expand Down Expand Up @@ -250,35 +278,6 @@ func TestComplete(t *testing.T) {
ServerURL: "http://127.0.0.1:8071",
Debug: true,
},
Kafka: KafkaConfig{
CommonConfigParams: pkgkafka.CommonConfigParams{
Brokers: "127.0.0.1:9092",
SecurityProtocol: "SASL_SSL",
SaslMechanisms: "PLAIN",
SaslUsername: "user",
SaslPassword: "pass",
ClientID: "kafka-client-1",
StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second),
BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny,
TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute),
SocketKeepAliveEnabled: true,
DebugContexts: pkgkafka.DebugContexts{
"broker",
"topic",
"consumer",
},
},
ConsumerConfigParams: pkgkafka.ConsumerConfigParams{
ConsumerGroupID: "consumer-group",
ConsumerGroupInstanceID: "consumer-group-1",
SessionTimeout: pkgkafka.TimeDurationMilliSeconds(5 * time.Minute),
HeartbeatInterval: pkgkafka.TimeDurationMilliSeconds(3 * time.Second),
EnableAutoCommit: true,
EnableAutoOffsetStore: false,
AutoOffsetReset: "error",
},
ProducerConfigParams: pkgkafka.ProducerConfigParams{},
},
}

assert.Equal(t, expected, actual)
Expand Down
13 changes: 13 additions & 0 deletions config/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package config

import "strings"

// AddPrefix returns string with "<prefix>." prepended to key.
// If returns key unmodified if prefix is empty or key already has the prefix added.
func AddPrefix(prefix, key string) string {
if prefix == "" || strings.HasPrefix(key, prefix+".") {
return key
}

return prefix + "." + key
}
4 changes: 2 additions & 2 deletions config/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ func (c KafkaConfig) Validate() error {
}

// ConfigureKafkaConfiguration sets defaults in the Viper instance.
func ConfigureKafkaConfiguration(v *viper.Viper) {
v.SetDefault("kafka.brokers", "127.0.0.1:29092")
func ConfigureKafkaConfiguration(v *viper.Viper, prefix string) {
v.SetDefault(AddPrefix(prefix, "kafka.brokers"), "127.0.0.1:29092")
}
13 changes: 12 additions & 1 deletion config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import (
)

type SinkConfiguration struct {
// FIXME(chrisgacsal): remove as it is deprecated by moving Kafka specific configuration to dedicated config params.
GroupId string
Dedupe DedupeConfiguration
MinCommitCount int
MaxCommitWait time.Duration
NamespaceRefetch time.Duration
IngestNotifications IngestNotificationsConfiguration
// Kafka client/Consumer configuration
Kafka KafkaConfig
}

func (c SinkConfiguration) Validate() error {
Expand All @@ -34,6 +37,10 @@ func (c SinkConfiguration) Validate() error {
return fmt.Errorf("ingest notifications: %w", err)
}

if err := c.Kafka.Validate(); err != nil {
return fmt.Errorf("kafka: %w", err)
}

return nil
}

Expand All @@ -53,7 +60,7 @@ func (c IngestNotificationsConfiguration) Validate() error {
return nil
}

// Configure configures some defaults in the Viper instance.
// ConfigureSink setup Sink specific configuration defaults for provided *viper.Viper instance.
func ConfigureSink(v *viper.Viper) {
// Sink Dedupe
v.SetDefault("sink.dedupe.enabled", false)
Expand All @@ -74,9 +81,13 @@ func ConfigureSink(v *viper.Viper) {
v.SetDefault("sink.dedupe.config.tls.insecureSkipVerify", false)

// Sink
// FIXME(chrisgacsal): remove as it is deprecated by moving Kafka specific configuration to dedicated config params.
v.SetDefault("sink.groupId", "openmeter-sink-worker")
v.SetDefault("sink.minCommitCount", 500)
v.SetDefault("sink.maxCommitWait", "5s")
v.SetDefault("sink.namespaceRefetch", "15s")
v.SetDefault("sink.ingestNotifications.maxEventsInBatch", 500)

// Sink Kafka configuration
ConfigureKafkaConfiguration(v, "sink")
}
45 changes: 22 additions & 23 deletions config/testdata/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,28 @@ sink:
password: pass
tls:
enabled: true
kafka:
brokers: 127.0.0.1:9092
securityProtocol: SASL_SSL
saslMechanisms: PLAIN
saslUsername: user
saslPassword: pass
statsInterval: 5s
brokerAddressFamily: any
socketKeepAliveEnabled: true
topicMetadataRefreshInterval: 1m
debugContexts:
- broker
- topic
- consumer
clientID: kafka-client-1
consumerGroupID: consumer-group
consumerGroupInstanceID: consumer-group-1
sessionTimeout: 5m
heartbeatInterval: 3s
enableAutoCommit: true
enableAutoOffsetStore: false
autoOffsetReset: "error"

dedupe:
enabled: true
Expand Down Expand Up @@ -97,26 +119,3 @@ svix:
apiKey: test-svix-token
serverURL: http://127.0.0.1:8071
debug: true

kafka:
brokers: 127.0.0.1:9092
securityProtocol: SASL_SSL
saslMechanisms: PLAIN
saslUsername: user
saslPassword: pass
statsInterval: 5s
brokerAddressFamily: any
socketKeepAliveEnabled: true
topicMetadataRefreshInterval: 1m
debugContexts:
- broker
- topic
- consumer
clientID: kafka-client-1
consumerGroupID: consumer-group
consumerGroupInstanceID: consumer-group-1
sessionTimeout: 5m
heartbeatInterval: 3s
enableAutoCommit: true
enableAutoOffsetStore: false
autoOffsetReset: "error"

0 comments on commit d87c9b7

Please sign in to comment.