diff --git a/config.example.yaml b/config.example.yaml index 15d5661d4..e24d7762f 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -97,47 +97,48 @@ svix: apiKey: secret serverURL: http://localhost:8071 -kafka: - brokers: 127.0.0.1:9092,127.0.0.2:9092 - securityProtocol: SASL_SSL - saslMechanisms: PLAIN - saslUsername: user - saslPassword: pass - # To enable stats reporting set this value to >=5s. - # Setting this value to 0 makes reporting explicitly disabled. - statsInterval: 5s - # Set IP address family used for communicating with Kafka cluster - brokerAddressFamily: v4 - # Use this configuration parameter to define how frequently the local metadata cache needs to be updated. - # It cannot be lower than 10 seconds. - topicMetadataRefreshInterval: 1m - # Use this config parameter to enable TCP keep-alive in order to prevent the Kafka broker to close idle network connection. - socketKeepAliveEnabled: true - # Set list of debug contexts to enable for librdkafka - # See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts - debugContexts: - - broker - - topic - # Consumer/Producer identifier - clientID: kafka-client-1 - # Consumer group identifier - consumerGroupID: consumer-group - # Static membership identifier in consumer group - consumerGroupInstanceID: consumer-group-1 - # Consumer group session and failure detection timeout. - # The consumer sends periodic heartbeats (heartbeatInterval) to indicate its liveness to the broker. - # If no hearts are received by the broker for a group member within the session timeout, - # the broker will remove the consumer from the group and trigger a rebalance. - sessionTimeout: 5m - # Consumer group session keepalive heartbeat interval - heartbeatInterval: 3s - # Automatically and periodically commit offsets in the background - enableAutoCommit: true - # Automatically store offset of last message provided to application. - # The offset store is an in-memory store of the next offset to (auto-)commit for each partition. - enableAutoOffsetStore: false - # AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range: - # * "smallest","earliest","beginning": automatically reset the offset to the smallest offset - # * "largest","latest","end": automatically reset the offset to the largest offset - # * "error": trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'. - autoOffsetReset: "error" +#sink: +# kafka: +# brokers: 127.0.0.1:9092,127.0.0.2:9092 +# securityProtocol: SASL_SSL +# saslMechanisms: PLAIN +# saslUsername: user +# saslPassword: pass +# # To enable stats reporting set this value to >=5s. +# # Setting this value to 0 makes reporting explicitly disabled. +# statsInterval: 5s +# # Set IP address family used for communicating with Kafka cluster +# brokerAddressFamily: v4 +# # Use this configuration parameter to define how frequently the local metadata cache needs to be updated. +# # It cannot be lower than 10 seconds. +# topicMetadataRefreshInterval: 1m +# # Use this config parameter to enable TCP keep-alive in order to prevent the Kafka broker to close idle network connection. +# socketKeepAliveEnabled: true +# # Set list of debug contexts to enable for librdkafka +# # See: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts +# debugContexts: +# - broker +# - topic +# # Consumer/Producer identifier +# clientID: kafka-client-1 +# # Consumer group identifier +# consumerGroupID: consumer-group +# # Static membership identifier in consumer group +# consumerGroupInstanceID: consumer-group-1 +# # Consumer group session and failure detection timeout. +# # The consumer sends periodic heartbeats (heartbeatInterval) to indicate its liveness to the broker. +# # If no hearts are received by the broker for a group member within the session timeout, +# # the broker will remove the consumer from the group and trigger a rebalance. +# sessionTimeout: 5m +# # Consumer group session keepalive heartbeat interval +# heartbeatInterval: 3s +# # Automatically and periodically commit offsets in the background +# enableAutoCommit: true +# # Automatically store offset of last message provided to application. +# # The offset store is an in-memory store of the next offset to (auto-)commit for each partition. +# enableAutoOffsetStore: false +# # AutoOffsetReset defines the action to take when there is no initial offset in offset store or the desired offset is out of range: +# # * "smallest","earliest","beginning": automatically reset the offset to the smallest offset +# # * "largest","latest","end": automatically reset the offset to the largest offset +# # * "error": trigger an error (ERR__AUTO_OFFSET_RESET) which is retrieved by consuming messages and checking 'message->err'. +# autoOffsetReset: "error" diff --git a/config/config.go b/config/config.go index abf7569d5..ac321bf9f 100644 --- a/config/config.go +++ b/config/config.go @@ -33,7 +33,6 @@ type Configuration struct { BalanceWorker BalanceWorkerConfiguration Notification NotificationConfiguration Svix SvixConfig - Kafka KafkaConfig } // Validate validates the configuration. @@ -106,10 +105,6 @@ func (c Configuration) Validate() error { } } - if err := c.Kafka.Validate(); err != nil { - return fmt.Errorf("kafka: %w", err) - } - return nil } @@ -145,5 +140,4 @@ func SetViperDefaults(v *viper.Viper, flags *pflag.FlagSet) { ConfigureEvents(v) ConfigureBalanceWorker(v) ConfigureNotification(v) - ConfigureKafkaConfiguration(v) } diff --git a/config/config_test.go b/config/config_test.go index 669edb864..f95a00af2 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -144,6 +144,34 @@ func TestComplete(t *testing.T) { IngestNotifications: IngestNotificationsConfiguration{ MaxEventsInBatch: 500, }, + Kafka: KafkaConfig{ + CommonConfigParams: pkgkafka.CommonConfigParams{ + Brokers: "127.0.0.1:9092", + SecurityProtocol: "SASL_SSL", + SaslMechanisms: "PLAIN", + SaslUsername: "user", + SaslPassword: "pass", + ClientID: "kafka-client-1", + StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second), + BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny, + TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute), + SocketKeepAliveEnabled: true, + DebugContexts: pkgkafka.DebugContexts{ + "broker", + "topic", + "consumer", + }, + }, + ConsumerConfigParams: pkgkafka.ConsumerConfigParams{ + ConsumerGroupID: "consumer-group", + ConsumerGroupInstanceID: "consumer-group-1", + SessionTimeout: pkgkafka.TimeDurationMilliSeconds(5 * time.Minute), + HeartbeatInterval: pkgkafka.TimeDurationMilliSeconds(3 * time.Second), + EnableAutoCommit: true, + EnableAutoOffsetStore: false, + AutoOffsetReset: "error", + }, + }, }, Dedupe: DedupeConfiguration{ Enabled: true, @@ -250,35 +278,6 @@ func TestComplete(t *testing.T) { ServerURL: "http://127.0.0.1:8071", Debug: true, }, - Kafka: KafkaConfig{ - CommonConfigParams: pkgkafka.CommonConfigParams{ - Brokers: "127.0.0.1:9092", - SecurityProtocol: "SASL_SSL", - SaslMechanisms: "PLAIN", - SaslUsername: "user", - SaslPassword: "pass", - ClientID: "kafka-client-1", - StatsInterval: pkgkafka.TimeDurationMilliSeconds(5 * time.Second), - BrokerAddressFamily: pkgkafka.BrokerAddressFamilyAny, - TopicMetadataRefreshInterval: pkgkafka.TimeDurationMilliSeconds(time.Minute), - SocketKeepAliveEnabled: true, - DebugContexts: pkgkafka.DebugContexts{ - "broker", - "topic", - "consumer", - }, - }, - ConsumerConfigParams: pkgkafka.ConsumerConfigParams{ - ConsumerGroupID: "consumer-group", - ConsumerGroupInstanceID: "consumer-group-1", - SessionTimeout: pkgkafka.TimeDurationMilliSeconds(5 * time.Minute), - HeartbeatInterval: pkgkafka.TimeDurationMilliSeconds(3 * time.Second), - EnableAutoCommit: true, - EnableAutoOffsetStore: false, - AutoOffsetReset: "error", - }, - ProducerConfigParams: pkgkafka.ProducerConfigParams{}, - }, } assert.Equal(t, expected, actual) diff --git a/config/helpers.go b/config/helpers.go new file mode 100644 index 000000000..5fce99a1e --- /dev/null +++ b/config/helpers.go @@ -0,0 +1,13 @@ +package config + +import "strings" + +// AddPrefix returns string with "." prepended to key. +// If returns key unmodified if prefix is empty or key already has the prefix added. +func AddPrefix(prefix, key string) string { + if prefix == "" || strings.HasPrefix(key, prefix+".") { + return key + } + + return prefix + "." + key +} diff --git a/config/kafka.go b/config/kafka.go index b73ad6f4d..0f9c846c0 100644 --- a/config/kafka.go +++ b/config/kafka.go @@ -45,6 +45,6 @@ func (c KafkaConfig) Validate() error { } // ConfigureKafkaConfiguration sets defaults in the Viper instance. -func ConfigureKafkaConfiguration(v *viper.Viper) { - v.SetDefault("kafka.brokers", "127.0.0.1:29092") +func ConfigureKafkaConfiguration(v *viper.Viper, prefix string) { + v.SetDefault(AddPrefix(prefix, "kafka.brokers"), "127.0.0.1:29092") } diff --git a/config/sink.go b/config/sink.go index aec90da21..b281e12a6 100644 --- a/config/sink.go +++ b/config/sink.go @@ -9,12 +9,15 @@ import ( ) type SinkConfiguration struct { + // FIXME(chrisgacsal): remove as it is deprecated by moving Kafka specific configuration to dedicated config params. GroupId string Dedupe DedupeConfiguration MinCommitCount int MaxCommitWait time.Duration NamespaceRefetch time.Duration IngestNotifications IngestNotificationsConfiguration + // Kafka client/Consumer configuration + Kafka KafkaConfig } func (c SinkConfiguration) Validate() error { @@ -34,6 +37,10 @@ func (c SinkConfiguration) Validate() error { return fmt.Errorf("ingest notifications: %w", err) } + if err := c.Kafka.Validate(); err != nil { + return fmt.Errorf("kafka: %w", err) + } + return nil } @@ -53,7 +60,7 @@ func (c IngestNotificationsConfiguration) Validate() error { return nil } -// Configure configures some defaults in the Viper instance. +// ConfigureSink setup Sink specific configuration defaults for provided *viper.Viper instance. func ConfigureSink(v *viper.Viper) { // Sink Dedupe v.SetDefault("sink.dedupe.enabled", false) @@ -74,9 +81,13 @@ func ConfigureSink(v *viper.Viper) { v.SetDefault("sink.dedupe.config.tls.insecureSkipVerify", false) // Sink + // FIXME(chrisgacsal): remove as it is deprecated by moving Kafka specific configuration to dedicated config params. v.SetDefault("sink.groupId", "openmeter-sink-worker") v.SetDefault("sink.minCommitCount", 500) v.SetDefault("sink.maxCommitWait", "5s") v.SetDefault("sink.namespaceRefetch", "15s") v.SetDefault("sink.ingestNotifications.maxEventsInBatch", 500) + + // Sink Kafka configuration + ConfigureKafkaConfiguration(v, "sink") } diff --git a/config/testdata/complete.yaml b/config/testdata/complete.yaml index 962529d8d..b43d09fd6 100644 --- a/config/testdata/complete.yaml +++ b/config/testdata/complete.yaml @@ -68,6 +68,28 @@ sink: password: pass tls: enabled: true + kafka: + brokers: 127.0.0.1:9092 + securityProtocol: SASL_SSL + saslMechanisms: PLAIN + saslUsername: user + saslPassword: pass + statsInterval: 5s + brokerAddressFamily: any + socketKeepAliveEnabled: true + topicMetadataRefreshInterval: 1m + debugContexts: + - broker + - topic + - consumer + clientID: kafka-client-1 + consumerGroupID: consumer-group + consumerGroupInstanceID: consumer-group-1 + sessionTimeout: 5m + heartbeatInterval: 3s + enableAutoCommit: true + enableAutoOffsetStore: false + autoOffsetReset: "error" dedupe: enabled: true @@ -97,26 +119,3 @@ svix: apiKey: test-svix-token serverURL: http://127.0.0.1:8071 debug: true - -kafka: - brokers: 127.0.0.1:9092 - securityProtocol: SASL_SSL - saslMechanisms: PLAIN - saslUsername: user - saslPassword: pass - statsInterval: 5s - brokerAddressFamily: any - socketKeepAliveEnabled: true - topicMetadataRefreshInterval: 1m - debugContexts: - - broker - - topic - - consumer - clientID: kafka-client-1 - consumerGroupID: consumer-group - consumerGroupInstanceID: consumer-group-1 - sessionTimeout: 5m - heartbeatInterval: 3s - enableAutoCommit: true - enableAutoOffsetStore: false - autoOffsetReset: "error"