Skip to content

Commit

Permalink
Merge pull request #1487 from openmeterio/refactor/kafka-config
Browse files Browse the repository at this point in the history
refactor: rm global Kafka configuration
  • Loading branch information
chrisgacsal authored Sep 9, 2024
2 parents ef914df + d87c9b7 commit d8bdd34
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 105 deletions.
89 changes: 45 additions & 44 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,47 +97,48 @@ svix:
apiKey: secret
serverURL: http://localhost:8071

kafka:
brokers: 127.0.0.1:9092,127.0.0.2:9092
securityProtocol: SASL_SSL
saslMechanisms: PLAIN
saslUsername: user
saslPassword: pass
# To enable stats reporting set this value to >=5s.
# Setting this value to 0 makes reporting explicitly disabled.
statsInterval: 5s
# Set IP address family used for communicating with Kafka cluster
brokerAddressFamily: v4
# Use this configuration parameter to define how frequently the local metadata cache needs to be updated.
# It cannot be lower than 10 seconds.
topicMetadataRefreshInterval: 1m
# Use this config parameter to enable TCP keep-alive in order to prevent the Kafka broker to close idle network connection.
socketKeepAliveEnabled: true
# Set list of debug contexts to enable for librdkafka
# See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts
debugContexts:
- broker
- topic
# Consumer/Producer identifier
clientID: kafka-client-1
# Consumer group identifier
consumerGroupID: consumer-group
# Static membership identifier in consumer group
consumerGroupInstanceID: consumer-group-1
# Consumer group session and failure detection timeout.
# The consumer sends periodic heartbeats (heartbeatInterval) to indicate its liveness to the broker.
# If no hearts are received by the broker for a group member within the session timeout,
# the broker will remove the consumer from the group and trigger a rebalance.
sessionTimeout: 5m
# Consumer group session keepalive heartbeat interval
heartbeatInterval: 3s
# Automatically and periodically commit offsets in the background
enableAutoCommit: true
# Automatically store offset of last message provided to application.
# The offset store is an in-memory store of the next offset to (auto-)commit for each partition.
enableAutoOffsetStore: false
# AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range:
# * "smallest","earliest","beginning": automatically reset the offset to the smallest offset
# * "largest","latest","end": automatically reset the offset to the largest offset
# * "error": trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'.
autoOffsetReset: "error"
#sink:
# kafka:
# brokers: 127.0.0.1:9092,127.0.0.2:9092
# securityProtocol: SASL_SSL
# saslMechanisms: PLAIN
# saslUsername: user
# saslPassword: pass
# # To enable stats reporting set this value to >=5s.
# # Setting this value to 0 makes reporting explicitly disabled.
# statsInterval: 5s
# # Set IP address family used for communicating with Kafka cluster
# brokerAddressFamily: v4
# # Use this configuration parameter to define how frequently the local metadata cache needs to be updated.
# # It cannot be lower than 10 seconds.
# topicMetadataRefreshInterval: 1m
# # Use this config parameter to enable TCP keep-alive in order to prevent the Kafka broker to close idle network connection.
# socketKeepAliveEnabled: true
# # Set list of debug contexts to enable for librdkafka
# # See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts
# debugContexts:
# - broker
# - topic
# # Consumer/Producer identifier
# clientID: kafka-client-1
# # Consumer group identifier
# consumerGroupID: consumer-group
# # Static membership identifier in consumer group
# consumerGroupInstanceID: consumer-group-1
# # Consumer group session and failure detection timeout.
# # The consumer sends periodic heartbeats (heartbeatInterval) to indicate its liveness to the broker.
# # If no hearts are received by the broker for a group member within the session timeout,
# # the broker will remove the consumer from the group and trigger a rebalance.
# sessionTimeout: 5m
# # Consumer group session keepalive heartbeat interval
# heartbeatInterval: 3s
# # Automatically and periodically commit offsets in the background
# enableAutoCommit: true
# # Automatically store offset of last message provided to application.
# # The offset store is an in-memory store of the next offset to (auto-)commit for each partition.
# enableAutoOffsetStore: false
# # AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range:
# # * "smallest","earliest","beginning": automatically reset the offset to the smallest offset
# # * "largest","latest","end": automatically reset the offset to the largest offset
# # * "error": trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'.
# autoOffsetReset: "error"
6 changes: 0 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type Configuration struct {
BalanceWorker BalanceWorkerConfiguration
Notification NotificationConfiguration
Svix SvixConfig
Kafka KafkaConfig
}

// Validate validates the configuration.
Expand Down Expand Up @@ -106,10 +105,6 @@ func (c Configuration) Validate() error {
}
}

if err := c.Kafka.Validate(); err != nil {
return fmt.Errorf("kafka: %w", err)
}

return nil
}

Expand Down Expand Up @@ -145,5 +140,4 @@ func SetViperDefaults(v *viper.Viper, flags *pflag.FlagSet) {
ConfigureEvents(v)
ConfigureBalanceWorker(v)
ConfigureNotification(v)
ConfigureKafkaConfiguration(v)
}
57 changes: 28 additions & 29 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,34 @@ func TestComplete(t *testing.T) {
IngestNotifications: IngestNotificationsConfiguration{
MaxEventsInBatch: 500,
},
Kafka: KafkaConfig{
CommonConfigParams: pkgkafka.CommonConfigParams{
Brokers: "127.0.0.1:9092",
SecurityProtocol: "SASL_SSL",
SaslMechanisms: "PLAIN",
SaslUsername: "user",
SaslPassword: "pass",
ClientID: "kafka-client-1",
StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second),
BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny,
TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute),
SocketKeepAliveEnabled: true,
DebugContexts: pkgkafka.DebugContexts{
"broker",
"topic",
"consumer",
},
},
ConsumerConfigParams: pkgkafka.ConsumerConfigParams{
ConsumerGroupID: "consumer-group",
ConsumerGroupInstanceID: "consumer-group-1",
SessionTimeout: pkgkafka.TimeDurationMilliSeconds(5 * time.Minute),
HeartbeatInterval: pkgkafka.TimeDurationMilliSeconds(3 * time.Second),
EnableAutoCommit: true,
EnableAutoOffsetStore: false,
AutoOffsetReset: "error",
},
},
},
Dedupe: DedupeConfiguration{
Enabled: true,
Expand Down Expand Up @@ -250,35 +278,6 @@ func TestComplete(t *testing.T) {
ServerURL: "http://127.0.0.1:8071",
Debug: true,
},
Kafka: KafkaConfig{
CommonConfigParams: pkgkafka.CommonConfigParams{
Brokers: "127.0.0.1:9092",
SecurityProtocol: "SASL_SSL",
SaslMechanisms: "PLAIN",
SaslUsername: "user",
SaslPassword: "pass",
ClientID: "kafka-client-1",
StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second),
BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny,
TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute),
SocketKeepAliveEnabled: true,
DebugContexts: pkgkafka.DebugContexts{
"broker",
"topic",
"consumer",
},
},
ConsumerConfigParams: pkgkafka.ConsumerConfigParams{
ConsumerGroupID: "consumer-group",
ConsumerGroupInstanceID: "consumer-group-1",
SessionTimeout: pkgkafka.TimeDurationMilliSeconds(5 * time.Minute),
HeartbeatInterval: pkgkafka.TimeDurationMilliSeconds(3 * time.Second),
EnableAutoCommit: true,
EnableAutoOffsetStore: false,
AutoOffsetReset: "error",
},
ProducerConfigParams: pkgkafka.ProducerConfigParams{},
},
}

assert.Equal(t, expected, actual)
Expand Down
13 changes: 13 additions & 0 deletions config/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package config

import "strings"

// AddPrefix returns string with "<prefix>." prepended to key.
// If returns key unmodified if prefix is empty or key already has the prefix added.
func AddPrefix(prefix, key string) string {
if prefix == "" || strings.HasPrefix(key, prefix+".") {
return key
}

return prefix + "." + key
}
4 changes: 2 additions & 2 deletions config/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ func (c KafkaConfig) Validate() error {
}

// ConfigureKafkaConfiguration sets defaults in the Viper instance.
func ConfigureKafkaConfiguration(v *viper.Viper) {
v.SetDefault("kafka.brokers", "127.0.0.1:29092")
func ConfigureKafkaConfiguration(v *viper.Viper, prefix string) {
v.SetDefault(AddPrefix(prefix, "kafka.brokers"), "127.0.0.1:29092")
}
13 changes: 12 additions & 1 deletion config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import (
)

type SinkConfiguration struct {
// FIXME(chrisgacsal): remove as it is deprecated by moving Kafka specific configuration to dedicated config params.
GroupId string
Dedupe DedupeConfiguration
MinCommitCount int
MaxCommitWait time.Duration
NamespaceRefetch time.Duration
IngestNotifications IngestNotificationsConfiguration
// Kafka client/Consumer configuration
Kafka KafkaConfig
}

func (c SinkConfiguration) Validate() error {
Expand All @@ -34,6 +37,10 @@ func (c SinkConfiguration) Validate() error {
return fmt.Errorf("ingest notifications: %w", err)
}

if err := c.Kafka.Validate(); err != nil {
return fmt.Errorf("kafka: %w", err)
}

return nil
}

Expand All @@ -53,7 +60,7 @@ func (c IngestNotificationsConfiguration) Validate() error {
return nil
}

// Configure configures some defaults in the Viper instance.
// ConfigureSink setup Sink specific configuration defaults for provided *viper.Viper instance.
func ConfigureSink(v *viper.Viper) {
// Sink Dedupe
v.SetDefault("sink.dedupe.enabled", false)
Expand All @@ -74,9 +81,13 @@ func ConfigureSink(v *viper.Viper) {
v.SetDefault("sink.dedupe.config.tls.insecureSkipVerify", false)

// Sink
// FIXME(chrisgacsal): remove as it is deprecated by moving Kafka specific configuration to dedicated config params.
v.SetDefault("sink.groupId", "openmeter-sink-worker")
v.SetDefault("sink.minCommitCount", 500)
v.SetDefault("sink.maxCommitWait", "5s")
v.SetDefault("sink.namespaceRefetch", "15s")
v.SetDefault("sink.ingestNotifications.maxEventsInBatch", 500)

// Sink Kafka configuration
ConfigureKafkaConfiguration(v, "sink")
}
45 changes: 22 additions & 23 deletions config/testdata/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,28 @@ sink:
password: pass
tls:
enabled: true
kafka:
brokers: 127.0.0.1:9092
securityProtocol: SASL_SSL
saslMechanisms: PLAIN
saslUsername: user
saslPassword: pass
statsInterval: 5s
brokerAddressFamily: any
socketKeepAliveEnabled: true
topicMetadataRefreshInterval: 1m
debugContexts:
- broker
- topic
- consumer
clientID: kafka-client-1
consumerGroupID: consumer-group
consumerGroupInstanceID: consumer-group-1
sessionTimeout: 5m
heartbeatInterval: 3s
enableAutoCommit: true
enableAutoOffsetStore: false
autoOffsetReset: "error"

dedupe:
enabled: true
Expand Down Expand Up @@ -97,26 +119,3 @@ svix:
apiKey: test-svix-token
serverURL: http://127.0.0.1:8071
debug: true

kafka:
brokers: 127.0.0.1:9092
securityProtocol: SASL_SSL
saslMechanisms: PLAIN
saslUsername: user
saslPassword: pass
statsInterval: 5s
brokerAddressFamily: any
socketKeepAliveEnabled: true
topicMetadataRefreshInterval: 1m
debugContexts:
- broker
- topic
- consumer
clientID: kafka-client-1
consumerGroupID: consumer-group
consumerGroupInstanceID: consumer-group-1
sessionTimeout: 5m
heartbeatInterval: 3s
enableAutoCommit: true
enableAutoOffsetStore: false
autoOffsetReset: "error"

0 comments on commit d8bdd34

Please sign in to comment.