Skip to content

Commit

Permalink
Merge pull request #1291 from openmeterio/feat/use-sarama-lib-interna…
Browse files Browse the repository at this point in the history
…l-message-publishing

feat: use sarama lib for internal message publishing
  • Loading branch information
turip authored Aug 5, 2024
2 parents 66687eb + 46682f4 commit 6c66089
Show file tree
Hide file tree
Showing 10 changed files with 492 additions and 219 deletions.
76 changes: 25 additions & 51 deletions cmd/balance-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/ThreeDotsLabs/watermill"
wmkafka "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-slog/otelslog"
Expand All @@ -39,7 +38,6 @@ import (
"github.com/openmeterio/openmeter/internal/entitlement/balanceworker"
entitlementpgadapter "github.com/openmeterio/openmeter/internal/entitlement/postgresadapter"
"github.com/openmeterio/openmeter/internal/event/publisher"
"github.com/openmeterio/openmeter/internal/ingest/kafkaingest"
"github.com/openmeterio/openmeter/internal/meter"
"github.com/openmeterio/openmeter/internal/registry"
registrybuilder "github.com/openmeterio/openmeter/internal/registry/builder"
Expand All @@ -49,8 +47,6 @@ import (
"github.com/openmeterio/openmeter/pkg/framework/entutils"
"github.com/openmeterio/openmeter/pkg/framework/operation"
"github.com/openmeterio/openmeter/pkg/gosundheit"
pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics"
"github.com/openmeterio/openmeter/pkg/models"
"github.com/openmeterio/openmeter/pkg/slicesx"
)
Expand Down Expand Up @@ -140,6 +136,7 @@ func main() {
}
}()
otel.SetMeterProvider(otelMeterProvider)

metricMeter := otelMeterProvider.Meter(otelName)

if conf.Telemetry.Metrics.Exporters.Prometheus.Enabled {
Expand Down Expand Up @@ -233,18 +230,19 @@ func main() {
}

// Create publisher
kafkaPublisher, err := initKafkaProducer(ctx, conf, logger, metricMeter, &group)
if err != nil {
logger.Error("failed to initialize Kafka producer", slog.String("error", err.Error()))
os.Exit(1)
}

publishers, err := initEventPublisher(ctx, logger, conf, kafkaPublisher)
publishers, err := initEventPublisher(logger, conf, metricMeter)
if err != nil {
logger.Error("failed to initialize event publisher", slog.String("error", err.Error()))
os.Exit(1)
}

defer func() {
// We are using sync publishing, so it's fine to close the publisher using defers.
if err := publishers.watermillPublisher.Close(); err != nil {
logger.Error("failed to close event publisher", slog.String("error", err.Error()))
}
}()

// Dependencies: entitlement
entitlementConnectors := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{
DatabaseClient: pgClients.client,
Expand Down Expand Up @@ -361,24 +359,24 @@ type eventPublishers struct {
eventPublisher publisher.Publisher
}

func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (*eventPublishers, error) {
eventDriver := watermillkafka.NewPublisher(kafkaProducer)

func initEventPublisher(logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*eventPublishers, error) {
provisionTopics := []watermillkafka.AutoProvisionTopic{}
if conf.BalanceWorker.DLQ.AutoProvision.Enabled {
adminClient, err := kafka.NewAdminClientFromProducer(kafkaProducer)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka admin client: %w", err)
}

defer adminClient.Close()
provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{
Topic: conf.BalanceWorker.DLQ.Topic,
NumPartitions: int32(conf.BalanceWorker.DLQ.AutoProvision.Partitions),
})
}

if err := pkgkafka.ProvisionTopic(ctx,
adminClient,
logger,
conf.BalanceWorker.DLQ.Topic,
conf.BalanceWorker.DLQ.AutoProvision.Partitions); err != nil {
return nil, fmt.Errorf("failed to auto provision topic: %w", err)
}
eventDriver, err := watermillkafka.NewPublisher(watermillkafka.PublisherOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ProvisionTopics: provisionTopics,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
})
if err != nil {
return nil, fmt.Errorf("failed to create event driver: %w", err)
}

eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{
Expand All @@ -396,30 +394,6 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co
}, nil
}

func initKafkaProducer(ctx context.Context, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, group *run.Group) (*kafka.Producer, error) {
// Initialize Kafka Admin Client
kafkaConfig := config.Ingest.Kafka.CreateKafkaConfig()

// Initialize Kafka Producer
producer, err := kafka.NewProducer(&kafkaConfig)
if err != nil {
return nil, fmt.Errorf("init kafka ingest: %w", err)
}

// Initialize Kafka Client Statistics reporter
kafkaMetrics, err := kafkametrics.New(metricMeter)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka client metrics: %w", err)
}

group.Add(kafkaingest.KafkaProducerGroup(ctx, producer, logger, kafkaMetrics))

go pkgkafka.ConsumeLogChannel(producer, logger.WithGroup("kafka").WithGroup("producer"))

slog.Debug("connected to Kafka")
return producer, nil
}

type pgClients struct {
driver *sql.Driver
client *db.Client
Expand Down
76 changes: 25 additions & 51 deletions cmd/notification-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/ThreeDotsLabs/watermill"
wmkafka "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-slog/otelslog"
Expand All @@ -37,7 +36,6 @@ import (
"github.com/openmeterio/openmeter/config"
"github.com/openmeterio/openmeter/internal/ent/db"
"github.com/openmeterio/openmeter/internal/event/publisher"
"github.com/openmeterio/openmeter/internal/ingest/kafkaingest"
"github.com/openmeterio/openmeter/internal/meter"
"github.com/openmeterio/openmeter/internal/notification/consumer"
"github.com/openmeterio/openmeter/internal/registry"
Expand All @@ -48,8 +46,6 @@ import (
"github.com/openmeterio/openmeter/pkg/framework/entutils"
"github.com/openmeterio/openmeter/pkg/framework/operation"
"github.com/openmeterio/openmeter/pkg/gosundheit"
pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics"
"github.com/openmeterio/openmeter/pkg/models"
"github.com/openmeterio/openmeter/pkg/slicesx"
)
Expand Down Expand Up @@ -139,6 +135,7 @@ func main() {
}
}()
otel.SetMeterProvider(otelMeterProvider)

metricMeter := otelMeterProvider.Meter(otelName)

if conf.Telemetry.Metrics.Exporters.Prometheus.Enabled {
Expand Down Expand Up @@ -232,18 +229,19 @@ func main() {
}

// Create publisher
kafkaPublisher, err := initKafkaProducer(ctx, conf, logger, metricMeter, &group)
if err != nil {
logger.Error("failed to initialize Kafka producer", slog.String("error", err.Error()))
os.Exit(1)
}

publishers, err := initEventPublisher(ctx, logger, conf, kafkaPublisher)
publishers, err := initEventPublisher(logger, conf, metricMeter)
if err != nil {
logger.Error("failed to initialize event publisher", slog.String("error", err.Error()))
os.Exit(1)
}

defer func() {
// We are using sync producer, so it is fine to close this as a last step
if err := publishers.watermillPublisher.Close(); err != nil {
logger.Error("failed to close kafka producer", slog.String("error", err.Error()))
}
}()

// Dependencies: entitlement
entitlementConnectors := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{
DatabaseClient: pgClients.client,
Expand Down Expand Up @@ -356,24 +354,24 @@ type eventPublishers struct {
eventPublisher publisher.Publisher
}

func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (*eventPublishers, error) {
eventDriver := watermillkafka.NewPublisher(kafkaProducer)

func initEventPublisher(logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*eventPublishers, error) {
provisionTopics := []watermillkafka.AutoProvisionTopic{}
if conf.NotificationService.Consumer.DLQ.AutoProvision.Enabled {
adminClient, err := kafka.NewAdminClientFromProducer(kafkaProducer)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka admin client: %w", err)
}

defer adminClient.Close()
provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{
Topic: conf.NotificationService.Consumer.DLQ.Topic,
NumPartitions: int32(conf.NotificationService.Consumer.DLQ.AutoProvision.Partitions),
})
}

if err := pkgkafka.ProvisionTopic(ctx,
adminClient,
logger,
conf.NotificationService.Consumer.DLQ.Topic,
conf.NotificationService.Consumer.DLQ.AutoProvision.Partitions); err != nil {
return nil, fmt.Errorf("failed to auto provision topic: %w", err)
}
eventDriver, err := watermillkafka.NewPublisher(watermillkafka.PublisherOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ProvisionTopics: provisionTopics,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
})
if err != nil {
return nil, fmt.Errorf("failed to create event driver: %w", err)
}

eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{
Expand All @@ -391,30 +389,6 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co
}, nil
}

func initKafkaProducer(ctx context.Context, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, group *run.Group) (*kafka.Producer, error) {
// Initialize Kafka Admin Client
kafkaConfig := config.Ingest.Kafka.CreateKafkaConfig()

// Initialize Kafka Producer
producer, err := kafka.NewProducer(&kafkaConfig)
if err != nil {
return nil, fmt.Errorf("init kafka ingest: %w", err)
}

// Initialize Kafka Client Statistics reporter
kafkaMetrics, err := kafkametrics.New(metricMeter)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka client metrics: %w", err)
}

group.Add(kafkaingest.KafkaProducerGroup(ctx, producer, logger, kafkaMetrics))

go pkgkafka.ConsumeLogChannel(producer, logger.WithGroup("kafka").WithGroup("producer"))

slog.Debug("connected to Kafka")
return producer, nil
}

type pgClients struct {
driver *sql.Driver
client *db.Client
Expand Down
68 changes: 48 additions & 20 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
health "github.com/AppsFlyer/go-sundheit"
healthhttp "github.com/AppsFlyer/go-sundheit/http"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
Expand Down Expand Up @@ -212,12 +213,19 @@ func main() {
os.Exit(1)
}

eventPublisher, err := initEventPublisher(ctx, logger, conf, kafkaProducer)
eventPublishers, err := initEventPublisher(logger, conf, metricMeter)
if err != nil {
logger.Error("failed to initialize event publisher", "error", err)
os.Exit(1)
}

defer func() {
logger.Info("closing event publisher")
if err = eventPublishers.driver.Close(); err != nil {
logger.Error("failed to close event publisher", "error", err)
}
}()

// Initialize Kafka Ingest
ingestCollector, kafkaIngestNamespaceHandler, err := initKafkaIngest(
kafkaProducer,
Expand Down Expand Up @@ -314,7 +322,7 @@ func main() {
StreamingConnector: streamingConnector,
MeterRepository: meterRepository,
Logger: logger,
Publisher: eventPublisher.ForTopic(conf.Events.SystemEvents.Topic),
Publisher: eventPublishers.eventPublisher.ForTopic(conf.Events.SystemEvents.Topic),
})
}

Expand Down Expand Up @@ -421,34 +429,54 @@ func main() {
}
}

func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (publisher.Publisher, error) {
type publishers struct {
eventPublisher publisher.Publisher
driver message.Publisher
}

func initEventPublisher(logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*publishers, error) {
if !conf.Events.Enabled {
return publisher.NewPublisher(publisher.PublisherOptions{
publisher, err := publisher.NewPublisher(publisher.PublisherOptions{
Publisher: &noop.Publisher{},
})
if err != nil {
return nil, fmt.Errorf("failed to create event driver: %w", err)
}

return &publishers{
eventPublisher: publisher,
driver: &noop.Publisher{},
}, nil
}

provisionTopics := []watermillkafka.AutoProvisionTopic{}
if conf.Events.SystemEvents.AutoProvision.Enabled {
provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{
Topic: conf.Events.SystemEvents.Topic,
NumPartitions: int32(conf.Events.SystemEvents.AutoProvision.Partitions),
})
}

eventDriver, err := watermillkafka.NewPublisher(watermillkafka.PublisherOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ProvisionTopics: provisionTopics,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
})
if err != nil {
return nil, fmt.Errorf("failed to create event driver: %w", err)
}

eventDriver := watermillkafka.NewPublisher(kafkaProducer)
eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{
Publisher: eventDriver,
Transform: watermillkafka.AddPartitionKeyFromSubject,
})

// Auto provision topics if needed
if conf.Events.SystemEvents.AutoProvision.Enabled {
adminClient, err := kafka.NewAdminClientFromProducer(kafkaProducer)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka admin client: %w", err)
}

defer adminClient.Close()

if err := pkgkafka.ProvisionTopic(ctx, adminClient, logger, conf.Events.SystemEvents.Topic, conf.Events.SystemEvents.AutoProvision.Partitions); err != nil {
return nil, fmt.Errorf("failed to auto provision topic: %w", err)
}
}

return eventPublisher, err
return &publishers{
eventPublisher: eventPublisher,
driver: eventDriver,
}, err
}

func initKafkaProducer(ctx context.Context, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, group *run.Group) (*kafka.Producer, error) {
Expand Down
Loading

0 comments on commit 6c66089

Please sign in to comment.