From d87c9b74d6720a48740004da1244ef86545a8d95 Mon Sep 17 00:00:00 2001 From: Krisztian Gacsal Date: Mon, 9 Sep 2024 13:24:43 +0200 Subject: [PATCH] refactor: rm global Kafka configuration Global Kafka configration would not work as certian services/components require a slightly different parameters to be set for the Kafka client initialized for them. One solution would be to allow service/components to override certian parameters set in the global Kafka configuration, however it would introduce extra complexity around generating service specific KAfka configuration from multiple sources. Another alternative to have dedicated Kafka configuration for all the service/components. It makes the configuration more verbose, but less complex/error prone. --- config.example.yaml | 89 ++++++++++++++++++----------------- config/config.go | 6 --- config/config_test.go | 57 +++++++++++----------- config/helpers.go | 13 +++++ config/kafka.go | 4 +- config/sink.go | 13 ++++- config/testdata/complete.yaml | 45 +++++++++--------- 7 files changed, 122 insertions(+), 105 deletions(-) create mode 100644 config/helpers.go diff --git a/config.example.yaml b/config.example.yaml index 15d5661d4..e24d7762f 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -97,47 +97,48 @@ svix: apiKey: secret serverURL: http://localhost:8071 -kafka: - brokers: 127.0.0.1:9092,127.0.0.2:9092 - securityProtocol: SASL_SSL - saslMechanisms: PLAIN - saslUsername: user - saslPassword: pass - # To enable stats reporting set this value to >=5s. - # Setting this value to 0 makes reporting explicitly disabled. - statsInterval: 5s - # Set IP address family used for communicating with Kafka cluster - brokerAddressFamily: v4 - # Use this configuration parameter to define how frequently the local metadata cache needs to be updated. - # It cannot be lower than 10 seconds. - topicMetadataRefreshInterval: 1m - # Use this config parameter to enable TCP keep-alive in order to prevent the Kafka broker to close idle network connection. - socketKeepAliveEnabled: true - # Set list of debug contexts to enable for librdkafka - # See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts - debugContexts: - - broker - - topic - # Consumer/Producer identifier - clientID: kafka-client-1 - # Consumer group identifier - consumerGroupID: consumer-group - # Static membership identifier in consumer group - consumerGroupInstanceID: consumer-group-1 - # Consumer group session and failure detection timeout. - # The consumer sends periodic heartbeats (heartbeatInterval) to indicate its liveness to the broker. - # If no hearts are received by the broker for a group member within the session timeout, - # the broker will remove the consumer from the group and trigger a rebalance. - sessionTimeout: 5m - # Consumer group session keepalive heartbeat interval - heartbeatInterval: 3s - # Automatically and periodically commit offsets in the background - enableAutoCommit: true - # Automatically store offset of last message provided to application. - # The offset store is an in-memory store of the next offset to (auto-)commit for each partition. - enableAutoOffsetStore: false - # AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range: - # * "smallest","earliest","beginning": automatically reset the offset to the smallest offset - # * "largest","latest","end": automatically reset the offset to the largest offset - # * "error": trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'. - autoOffsetReset: "error" +#sink: +# kafka: +# brokers: 127.0.0.1:9092,127.0.0.2:9092 +# securityProtocol: SASL_SSL +# saslMechanisms: PLAIN +# saslUsername: user +# saslPassword: pass +# # To enable stats reporting set this value to >=5s. +# # Setting this value to 0 makes reporting explicitly disabled. +# statsInterval: 5s +# # Set IP address family used for communicating with Kafka cluster +# brokerAddressFamily: v4 +# # Use this configuration parameter to define how frequently the local metadata cache needs to be updated. +# # It cannot be lower than 10 seconds. +# topicMetadataRefreshInterval: 1m +# # Use this config parameter to enable TCP keep-alive in order to prevent the Kafka broker to close idle network connection. +# socketKeepAliveEnabled: true +# # Set list of debug contexts to enable for librdkafka +# # See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts +# debugContexts: +# - broker +# - topic +# # Consumer/Producer identifier +# clientID: kafka-client-1 +# # Consumer group identifier +# consumerGroupID: consumer-group +# # Static membership identifier in consumer group +# consumerGroupInstanceID: consumer-group-1 +# # Consumer group session and failure detection timeout. +# # The consumer sends periodic heartbeats (heartbeatInterval) to indicate its liveness to the broker. +# # If no hearts are received by the broker for a group member within the session timeout, +# # the broker will remove the consumer from the group and trigger a rebalance. +# sessionTimeout: 5m +# # Consumer group session keepalive heartbeat interval +# heartbeatInterval: 3s +# # Automatically and periodically commit offsets in the background +# enableAutoCommit: true +# # Automatically store offset of last message provided to application. +# # The offset store is an in-memory store of the next offset to (auto-)commit for each partition. +# enableAutoOffsetStore: false +# # AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range: +# # * "smallest","earliest","beginning": automatically reset the offset to the smallest offset +# # * "largest","latest","end": automatically reset the offset to the largest offset +# # * "error": trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'. +# autoOffsetReset: "error" diff --git a/config/config.go b/config/config.go index abf7569d5..ac321bf9f 100644 --- a/config/config.go +++ b/config/config.go @@ -33,7 +33,6 @@ type Configuration struct { BalanceWorker BalanceWorkerConfiguration Notification NotificationConfiguration Svix SvixConfig - Kafka KafkaConfig } // Validate validates the configuration. @@ -106,10 +105,6 @@ func (c Configuration) Validate() error { } } - if err := c.Kafka.Validate(); err != nil { - return fmt.Errorf("kafka: %w", err) - } - return nil } @@ -145,5 +140,4 @@ func SetViperDefaults(v *viper.Viper, flags *pflag.FlagSet) { ConfigureEvents(v) ConfigureBalanceWorker(v) ConfigureNotification(v) - ConfigureKafkaConfiguration(v) } diff --git a/config/config_test.go b/config/config_test.go index 669edb864..f95a00af2 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -144,6 +144,34 @@ func TestComplete(t *testing.T) { IngestNotifications: IngestNotificationsConfiguration{ MaxEventsInBatch: 500, }, + Kafka: KafkaConfig{ + CommonConfigParams: pkgkafka.CommonConfigParams{ + Brokers: "127.0.0.1:9092", + SecurityProtocol: "SASL_SSL", + SaslMechanisms: "PLAIN", + SaslUsername: "user", + SaslPassword: "pass", + ClientID: "kafka-client-1", + StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second), + BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny, + TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute), + SocketKeepAliveEnabled: true, + DebugContexts: pkgkafka.DebugContexts{ + "broker", + "topic", + "consumer", + }, + }, + ConsumerConfigParams: pkgkafka.ConsumerConfigParams{ + ConsumerGroupID: "consumer-group", + ConsumerGroupInstanceID: "consumer-group-1", + SessionTimeout: pkgkafka.TimeDurationMilliSeconds(5 * time.Minute), + HeartbeatInterval: pkgkafka.TimeDurationMilliSeconds(3 * time.Second), + EnableAutoCommit: true, + EnableAutoOffsetStore: false, + AutoOffsetReset: "error", + }, + }, }, Dedupe: DedupeConfiguration{ Enabled: true, @@ -250,35 +278,6 @@ func TestComplete(t *testing.T) { ServerURL: "http://127.0.0.1:8071", Debug: true, }, - Kafka: KafkaConfig{ - CommonConfigParams: pkgkafka.CommonConfigParams{ - Brokers: "127.0.0.1:9092", - SecurityProtocol: "SASL_SSL", - SaslMechanisms: "PLAIN", - SaslUsername: "user", - SaslPassword: "pass", - ClientID: "kafka-client-1", - StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second), - BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny, - TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute), - SocketKeepAliveEnabled: true, - DebugContexts: pkgkafka.DebugContexts{ - "broker", - "topic", - "consumer", - }, - }, - ConsumerConfigParams: pkgkafka.ConsumerConfigParams{ - ConsumerGroupID: "consumer-group", - ConsumerGroupInstanceID: "consumer-group-1", - SessionTimeout: pkgkafka.TimeDurationMilliSeconds(5 * time.Minute), - HeartbeatInterval: pkgkafka.TimeDurationMilliSeconds(3 * time.Second), - EnableAutoCommit: true, - EnableAutoOffsetStore: false, - AutoOffsetReset: "error", - }, - ProducerConfigParams: pkgkafka.ProducerConfigParams{}, - }, } assert.Equal(t, expected, actual) diff --git a/config/helpers.go b/config/helpers.go new file mode 100644 index 000000000..5fce99a1e --- /dev/null +++ b/config/helpers.go @@ -0,0 +1,13 @@ +package config + +import "strings" + +// AddPrefix returns string with "." prepended to key. +// If returns key unmodified if prefix is empty or key already has the prefix added. +func AddPrefix(prefix, key string) string { + if prefix == "" || strings.HasPrefix(key, prefix+".") { + return key + } + + return prefix + "." + key +} diff --git a/config/kafka.go b/config/kafka.go index b73ad6f4d..0f9c846c0 100644 --- a/config/kafka.go +++ b/config/kafka.go @@ -45,6 +45,6 @@ func (c KafkaConfig) Validate() error { } // ConfigureKafkaConfiguration sets defaults in the Viper instance. -func ConfigureKafkaConfiguration(v *viper.Viper) { - v.SetDefault("kafka.brokers", "127.0.0.1:29092") +func ConfigureKafkaConfiguration(v *viper.Viper, prefix string) { + v.SetDefault(AddPrefix(prefix, "kafka.brokers"), "127.0.0.1:29092") } diff --git a/config/sink.go b/config/sink.go index aec90da21..b281e12a6 100644 --- a/config/sink.go +++ b/config/sink.go @@ -9,12 +9,15 @@ import ( ) type SinkConfiguration struct { + // FIXME(chrisgacsal): remove as it is deprecated by moving Kafka specific configuration to dedicated config params. GroupId string Dedupe DedupeConfiguration MinCommitCount int MaxCommitWait time.Duration NamespaceRefetch time.Duration IngestNotifications IngestNotificationsConfiguration + // Kafka client/Consumer configuration + Kafka KafkaConfig } func (c SinkConfiguration) Validate() error { @@ -34,6 +37,10 @@ func (c SinkConfiguration) Validate() error { return fmt.Errorf("ingest notifications: %w", err) } + if err := c.Kafka.Validate(); err != nil { + return fmt.Errorf("kafka: %w", err) + } + return nil } @@ -53,7 +60,7 @@ func (c IngestNotificationsConfiguration) Validate() error { return nil } -// Configure configures some defaults in the Viper instance. +// ConfigureSink setup Sink specific configuration defaults for provided *viper.Viper instance. func ConfigureSink(v *viper.Viper) { // Sink Dedupe v.SetDefault("sink.dedupe.enabled", false) @@ -74,9 +81,13 @@ func ConfigureSink(v *viper.Viper) { v.SetDefault("sink.dedupe.config.tls.insecureSkipVerify", false) // Sink + // FIXME(chrisgacsal): remove as it is deprecated by moving Kafka specific configuration to dedicated config params. v.SetDefault("sink.groupId", "openmeter-sink-worker") v.SetDefault("sink.minCommitCount", 500) v.SetDefault("sink.maxCommitWait", "5s") v.SetDefault("sink.namespaceRefetch", "15s") v.SetDefault("sink.ingestNotifications.maxEventsInBatch", 500) + + // Sink Kafka configuration + ConfigureKafkaConfiguration(v, "sink") } diff --git a/config/testdata/complete.yaml b/config/testdata/complete.yaml index 962529d8d..b43d09fd6 100644 --- a/config/testdata/complete.yaml +++ b/config/testdata/complete.yaml @@ -68,6 +68,28 @@ sink: password: pass tls: enabled: true + kafka: + brokers: 127.0.0.1:9092 + securityProtocol: SASL_SSL + saslMechanisms: PLAIN + saslUsername: user + saslPassword: pass + statsInterval: 5s + brokerAddressFamily: any + socketKeepAliveEnabled: true + topicMetadataRefreshInterval: 1m + debugContexts: + - broker + - topic + - consumer + clientID: kafka-client-1 + consumerGroupID: consumer-group + consumerGroupInstanceID: consumer-group-1 + sessionTimeout: 5m + heartbeatInterval: 3s + enableAutoCommit: true + enableAutoOffsetStore: false + autoOffsetReset: "error" dedupe: enabled: true @@ -97,26 +119,3 @@ svix: apiKey: test-svix-token serverURL: http://127.0.0.1:8071 debug: true - -kafka: - brokers: 127.0.0.1:9092 - securityProtocol: SASL_SSL - saslMechanisms: PLAIN - saslUsername: user - saslPassword: pass - statsInterval: 5s - brokerAddressFamily: any - socketKeepAliveEnabled: true - topicMetadataRefreshInterval: 1m - debugContexts: - - broker - - topic - - consumer - clientID: kafka-client-1 - consumerGroupID: consumer-group - consumerGroupInstanceID: consumer-group-1 - sessionTimeout: 5m - heartbeatInterval: 3s - enableAutoCommit: true - enableAutoOffsetStore: false - autoOffsetReset: "error"