Skip to content

Commit

Permalink
refactor: subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
turip committed Aug 9, 2024
1 parent 8c47b80 commit d666ba1
Show file tree
Hide file tree
Showing 23 changed files with 591 additions and 541 deletions.
86 changes: 26 additions & 60 deletions cmd/balance-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"crypto/tls"
"errors"
"fmt"
"log/slog"
Expand All @@ -15,9 +14,6 @@ import (
health "github.com/AppsFlyer/go-sundheit"
healthhttp "github.com/AppsFlyer/go-sundheit/http"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/IBM/sarama"
"github.com/ThreeDotsLabs/watermill"
wmkafka "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
Expand All @@ -44,6 +40,7 @@ import (
"github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
"github.com/openmeterio/openmeter/internal/watermill/eventbus"
"github.com/openmeterio/openmeter/internal/watermill/router"
"github.com/openmeterio/openmeter/pkg/contextx"
"github.com/openmeterio/openmeter/pkg/framework/entutils"
"github.com/openmeterio/openmeter/pkg/framework/operation"
Expand Down Expand Up @@ -224,14 +221,19 @@ func main() {
logger.Info("Postgres clients initialized")

// Create subscriber
wmSubscriber, err := initKafkaSubscriber(conf, logger)
wmBrokerConfig := wmBrokerConfiguration(conf, logger, metricMeter)

wmSubscriber, err := watermillkafka.NewSubscriber(watermillkafka.SubscriberOptions{
Broker: wmBrokerConfig,
ConsumerGroupName: conf.BalanceWorker.ConsumerGroupName,
})
if err != nil {
logger.Error("failed to initialize Kafka subscriber", slog.String("error", err.Error()))
os.Exit(1)
}

// Create publisher
eventPublisherDriver, err := initEventPublisherDriver(ctx, logger, conf, metricMeter)
eventPublisherDriver, err := initEventPublisherDriver(ctx, wmBrokerConfig, conf)
if err != nil {
logger.Error("failed to initialize event publisher", slog.String("error", err.Error()))
os.Exit(1)
Expand Down Expand Up @@ -268,27 +270,23 @@ func main() {
workerOptions := balanceworker.WorkerOptions{
SystemEventsTopic: conf.Events.SystemEvents.Topic,
IngestEventsTopic: conf.Events.IngestEvents.Topic,
Subscriber: wmSubscriber,

TargetTopic: conf.Events.SystemEvents.Topic,
Publisher: eventPublisherDriver,
Marshaler: eventPublisher.Marshaler(),
Router: router.Options{
Subscriber: wmSubscriber,
Publisher: eventPublisherDriver,
Logger: logger,

DLQ: conf.BalanceWorker.DLQ,
},

EventBus: eventPublisher,

Entitlement: entitlementConnectors,
Repo: entitlementpgadapter.NewPostgresEntitlementRepo(pgClients.client),

Logger: logger,
}

if conf.BalanceWorker.DLQ.Enabled {
workerOptions.DLQ = &balanceworker.WorkerDLQOptions{
Topic: conf.BalanceWorker.DLQ.Topic,
Throttle: conf.BalanceWorker.DLQ.Throttle.Enabled,
ThrottleDuration: conf.BalanceWorker.DLQ.Throttle.Duration,
ThrottleCount: conf.BalanceWorker.DLQ.Throttle.Count,
}
}

worker, err := balanceworker.New(workerOptions)
if err != nil {
logger.Error("failed to initialize worker", slog.String("error", err.Error()))
Expand Down Expand Up @@ -327,45 +325,17 @@ func main() {
}
}

func initKafkaSubscriber(conf config.Configuration, logger *slog.Logger) (message.Subscriber, error) {
wmConfig := wmkafka.SubscriberConfig{
Brokers: []string{conf.Ingest.Kafka.Broker},
OverwriteSaramaConfig: sarama.NewConfig(),
ConsumerGroup: conf.BalanceWorker.ConsumerGroupName,
ReconnectRetrySleep: 100 * time.Millisecond,
Unmarshaler: wmkafka.DefaultMarshaler{},
}

wmConfig.OverwriteSaramaConfig.Metadata.RefreshFrequency = conf.Ingest.Kafka.TopicMetadataRefreshInterval.Duration()
wmConfig.OverwriteSaramaConfig.ClientID = "openmeter/balance-worker"

switch conf.Ingest.Kafka.SecurityProtocol {
case "SASL_SSL":
wmConfig.OverwriteSaramaConfig.Net.SASL.Enable = true
wmConfig.OverwriteSaramaConfig.Net.SASL.User = conf.Ingest.Kafka.SaslUsername
wmConfig.OverwriteSaramaConfig.Net.SASL.Password = conf.Ingest.Kafka.SaslPassword
wmConfig.OverwriteSaramaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(conf.Ingest.Kafka.SecurityProtocol)
wmConfig.OverwriteSaramaConfig.Net.TLS.Enable = true
wmConfig.OverwriteSaramaConfig.Net.TLS.Config = &tls.Config{}
default:
}

if err := wmConfig.Validate(); err != nil {
logger.Error("failed to validate Kafka subscriber configuration", slog.String("error", err.Error()))
return nil, err
}

// Initialize Kafka subscriber
subscriber, err := wmkafka.NewSubscriber(wmConfig, watermill.NewSlogLogger(logger))
if err != nil {
logger.Error("failed to initialize Kafka subscriber", slog.String("error", err.Error()))
return nil, err
func wmBrokerConfiguration(conf config.Configuration, logger *slog.Logger, metricMeter metric.Meter) watermillkafka.BrokerOptions {
return watermillkafka.BrokerOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
}

return subscriber, nil
}

func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (message.Publisher, error) {
func initEventPublisherDriver(ctx context.Context, broker watermillkafka.BrokerOptions, conf config.Configuration) (message.Publisher, error) {
provisionTopics := []watermillkafka.AutoProvisionTopic{}
if conf.BalanceWorker.DLQ.AutoProvision.Enabled {
provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{
Expand All @@ -375,12 +345,8 @@ func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf con
}

return watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
Broker: broker,
ProvisionTopics: provisionTopics,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
})
}

Expand Down
75 changes: 19 additions & 56 deletions cmd/notification-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"crypto/tls"
"errors"
"fmt"
"log/slog"
Expand All @@ -15,9 +14,6 @@ import (
health "github.com/AppsFlyer/go-sundheit"
healthhttp "github.com/AppsFlyer/go-sundheit/http"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/IBM/sarama"
"github.com/ThreeDotsLabs/watermill"
wmkafka "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
Expand All @@ -43,6 +39,7 @@ import (
"github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
"github.com/openmeterio/openmeter/internal/watermill/eventbus"
"github.com/openmeterio/openmeter/internal/watermill/router"
"github.com/openmeterio/openmeter/pkg/contextx"
"github.com/openmeterio/openmeter/pkg/framework/entutils"
"github.com/openmeterio/openmeter/pkg/framework/operation"
Expand Down Expand Up @@ -223,7 +220,10 @@ func main() {
logger.Info("Postgres clients initialized")

// Create subscriber
wmSubscriber, err := initKafkaSubscriber(conf, logger)
wmSubscriber, err := watermillkafka.NewSubscriber(watermillkafka.SubscriberOptions{
Broker: wmBrokerConfiguration(conf, logger, metricMeter),
ConsumerGroupName: conf.NotificationService.Consumer.ConsumerGroupName,
})
if err != nil {
logger.Error("failed to initialize Kafka subscriber", slog.String("error", err.Error()))
os.Exit(1)
Expand Down Expand Up @@ -266,25 +266,20 @@ func main() {
// Initialize consumer
consumerOptions := consumer.Options{
SystemEventsTopic: conf.Events.SystemEvents.Topic,
Subscriber: wmSubscriber,
Router: router.Options{
Subscriber: wmSubscriber,
Publisher: eventPublisherDriver,
Logger: logger,

Publisher: eventPublisherDriver,
DLQ: conf.NotificationService.Consumer.DLQ,
},
Marshaler: eventPublisher.Marshaler(),

Entitlement: entitlementConnectors,

Logger: logger,
}

if conf.NotificationService.Consumer.DLQ.Enabled {
consumerOptions.DLQ = &consumer.DLQOptions{
Topic: conf.NotificationService.Consumer.DLQ.Topic,
Throttle: conf.NotificationService.Consumer.DLQ.Throttle.Enabled,
ThrottleDuration: conf.NotificationService.Consumer.DLQ.Throttle.Duration,
ThrottleCount: conf.NotificationService.Consumer.DLQ.Throttle.Count,
}
}

notifictionConsumer, err := consumer.New(consumerOptions)
if err != nil {
logger.Error("failed to initialize worker", slog.String("error", err.Error()))
Expand Down Expand Up @@ -323,42 +318,14 @@ func main() {
}
}

func initKafkaSubscriber(conf config.Configuration, logger *slog.Logger) (message.Subscriber, error) {
wmConfig := wmkafka.SubscriberConfig{
Brokers: []string{conf.Ingest.Kafka.Broker},
OverwriteSaramaConfig: sarama.NewConfig(),
ConsumerGroup: conf.NotificationService.Consumer.ConsumerGroupName,
ReconnectRetrySleep: 100 * time.Millisecond,
Unmarshaler: wmkafka.DefaultMarshaler{},
func wmBrokerConfiguration(conf config.Configuration, logger *slog.Logger, metricMeter metric.Meter) kafka.BrokerOptions {
return kafka.BrokerOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
}

wmConfig.OverwriteSaramaConfig.Metadata.RefreshFrequency = conf.Ingest.Kafka.TopicMetadataRefreshInterval.Duration()
wmConfig.OverwriteSaramaConfig.ClientID = "openmeter/notification-service"

switch conf.Ingest.Kafka.SecurityProtocol {
case "SASL_SSL":
wmConfig.OverwriteSaramaConfig.Net.SASL.Enable = true
wmConfig.OverwriteSaramaConfig.Net.SASL.User = conf.Ingest.Kafka.SaslUsername
wmConfig.OverwriteSaramaConfig.Net.SASL.Password = conf.Ingest.Kafka.SaslPassword
wmConfig.OverwriteSaramaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(conf.Ingest.Kafka.SecurityProtocol)
wmConfig.OverwriteSaramaConfig.Net.TLS.Enable = true
wmConfig.OverwriteSaramaConfig.Net.TLS.Config = &tls.Config{}
default:
}

if err := wmConfig.Validate(); err != nil {
logger.Error("failed to validate Kafka subscriber configuration", slog.String("error", err.Error()))
return nil, err
}

// Initialize Kafka subscriber
subscriber, err := wmkafka.NewSubscriber(wmConfig, watermill.NewSlogLogger(logger))
if err != nil {
logger.Error("failed to initialize Kafka subscriber", slog.String("error", err.Error()))
return nil, err
}

return subscriber, nil
}

func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (message.Publisher, error) {
Expand All @@ -371,12 +338,8 @@ func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf con
}

return watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
Broker: wmBrokerConfiguration(conf, logger, metricMeter),
ProvisionTopics: provisionTopics,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
})
}

Expand Down
12 changes: 7 additions & 5 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,12 +455,14 @@ func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf con
}

return watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
Broker: watermillkafka.BrokerOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
},
ProvisionTopics: provisionTopics,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
})
}

Expand Down
12 changes: 7 additions & 5 deletions cmd/sink-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,20 @@ func initIngestEventPublisher(ctx context.Context, logger *slog.Logger, conf con
}

eventDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ClientID: otelName,
Logger: logger,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
Broker: watermillkafka.BrokerOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ClientID: otelName,
Logger: logger,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
MetricMeter: metricMeter,
},

ProvisionTopics: []watermillkafka.AutoProvisionTopic{
{
Topic: conf.Events.IngestEvents.Topic,
NumPartitions: int32(conf.Events.IngestEvents.AutoProvision.Partitions),
},
},
MetricMeter: metricMeter,
})
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit d666ba1

Please sign in to comment.