diff --git a/cmd/balance-worker/main.go b/cmd/balance-worker/main.go index c0568ef10..0039ccb97 100644 --- a/cmd/balance-worker/main.go +++ b/cmd/balance-worker/main.go @@ -19,7 +19,6 @@ import ( "github.com/ThreeDotsLabs/watermill" wmkafka "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka" "github.com/ThreeDotsLabs/watermill/message" - "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/go-slog/otelslog" @@ -39,7 +38,6 @@ import ( "github.com/openmeterio/openmeter/internal/entitlement/balanceworker" entitlementpgadapter "github.com/openmeterio/openmeter/internal/entitlement/postgresadapter" "github.com/openmeterio/openmeter/internal/event/publisher" - "github.com/openmeterio/openmeter/internal/ingest/kafkaingest" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/registry" registrybuilder "github.com/openmeterio/openmeter/internal/registry/builder" @@ -49,8 +47,6 @@ import ( "github.com/openmeterio/openmeter/pkg/framework/entutils" "github.com/openmeterio/openmeter/pkg/framework/operation" "github.com/openmeterio/openmeter/pkg/gosundheit" - pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" - kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics" "github.com/openmeterio/openmeter/pkg/models" "github.com/openmeterio/openmeter/pkg/slicesx" ) @@ -140,6 +136,7 @@ func main() { } }() otel.SetMeterProvider(otelMeterProvider) + metricMeter := otelMeterProvider.Meter(otelName) if conf.Telemetry.Metrics.Exporters.Prometheus.Enabled { @@ -233,18 +230,19 @@ func main() { } // Create publisher - kafkaPublisher, err := initKafkaProducer(ctx, conf, logger, metricMeter, &group) - if err != nil { - logger.Error("failed to initialize Kafka producer", slog.String("error", err.Error())) - os.Exit(1) - } - - publishers, err := initEventPublisher(ctx, logger, conf, kafkaPublisher) + publishers, err := initEventPublisher(logger, conf, metricMeter) if err != nil { logger.Error("failed to initialize event publisher", slog.String("error", err.Error())) os.Exit(1) } + defer func() { + // We are using sync publishing, so it's fine to close the publisher using defers. + if err := publishers.watermillPublisher.Close(); err != nil { + logger.Error("failed to close event publisher", slog.String("error", err.Error())) + } + }() + // Dependencies: entitlement entitlementConnectors := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{ DatabaseClient: pgClients.client, @@ -361,24 +359,24 @@ type eventPublishers struct { eventPublisher publisher.Publisher } -func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (*eventPublishers, error) { - eventDriver := watermillkafka.NewPublisher(kafkaProducer) - +func initEventPublisher(logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*eventPublishers, error) { + provisionTopics := []watermillkafka.AutoProvisionTopic{} if conf.BalanceWorker.DLQ.AutoProvision.Enabled { - adminClient, err := kafka.NewAdminClientFromProducer(kafkaProducer) - if err != nil { - return nil, fmt.Errorf("failed to create Kafka admin client: %w", err) - } - - defer adminClient.Close() + provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{ + Topic: conf.BalanceWorker.DLQ.Topic, + NumPartitions: int32(conf.BalanceWorker.DLQ.AutoProvision.Partitions), + }) + } - if err := pkgkafka.ProvisionTopic(ctx, - adminClient, - logger, - conf.BalanceWorker.DLQ.Topic, - conf.BalanceWorker.DLQ.AutoProvision.Partitions); err != nil { - return nil, fmt.Errorf("failed to auto provision topic: %w", err) - } + eventDriver, err := watermillkafka.NewPublisher(watermillkafka.PublisherOptions{ + KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration, + ProvisionTopics: provisionTopics, + ClientID: otelName, + Logger: logger, + MetricMeter: metricMeter, + }) + if err != nil { + return nil, fmt.Errorf("failed to create event driver: %w", err) } eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{ @@ -396,30 +394,6 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co }, nil } -func initKafkaProducer(ctx context.Context, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, group *run.Group) (*kafka.Producer, error) { - // Initialize Kafka Admin Client - kafkaConfig := config.Ingest.Kafka.CreateKafkaConfig() - - // Initialize Kafka Producer - producer, err := kafka.NewProducer(&kafkaConfig) - if err != nil { - return nil, fmt.Errorf("init kafka ingest: %w", err) - } - - // Initialize Kafka Client Statistics reporter - kafkaMetrics, err := kafkametrics.New(metricMeter) - if err != nil { - return nil, fmt.Errorf("failed to create Kafka client metrics: %w", err) - } - - group.Add(kafkaingest.KafkaProducerGroup(ctx, producer, logger, kafkaMetrics)) - - go pkgkafka.ConsumeLogChannel(producer, logger.WithGroup("kafka").WithGroup("producer")) - - slog.Debug("connected to Kafka") - return producer, nil -} - type pgClients struct { driver *sql.Driver client *db.Client diff --git a/cmd/notification-service/main.go b/cmd/notification-service/main.go index a2ebf4396..d0b322f77 100644 --- a/cmd/notification-service/main.go +++ b/cmd/notification-service/main.go @@ -19,7 +19,6 @@ import ( "github.com/ThreeDotsLabs/watermill" wmkafka "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka" "github.com/ThreeDotsLabs/watermill/message" - "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/go-slog/otelslog" @@ -37,7 +36,6 @@ import ( "github.com/openmeterio/openmeter/config" "github.com/openmeterio/openmeter/internal/ent/db" "github.com/openmeterio/openmeter/internal/event/publisher" - "github.com/openmeterio/openmeter/internal/ingest/kafkaingest" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/notification/consumer" "github.com/openmeterio/openmeter/internal/registry" @@ -48,8 +46,6 @@ import ( "github.com/openmeterio/openmeter/pkg/framework/entutils" "github.com/openmeterio/openmeter/pkg/framework/operation" "github.com/openmeterio/openmeter/pkg/gosundheit" - pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" - kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics" "github.com/openmeterio/openmeter/pkg/models" "github.com/openmeterio/openmeter/pkg/slicesx" ) @@ -139,6 +135,7 @@ func main() { } }() otel.SetMeterProvider(otelMeterProvider) + metricMeter := otelMeterProvider.Meter(otelName) if conf.Telemetry.Metrics.Exporters.Prometheus.Enabled { @@ -232,18 +229,19 @@ func main() { } // Create publisher - kafkaPublisher, err := initKafkaProducer(ctx, conf, logger, metricMeter, &group) - if err != nil { - logger.Error("failed to initialize Kafka producer", slog.String("error", err.Error())) - os.Exit(1) - } - - publishers, err := initEventPublisher(ctx, logger, conf, kafkaPublisher) + publishers, err := initEventPublisher(logger, conf, metricMeter) if err != nil { logger.Error("failed to initialize event publisher", slog.String("error", err.Error())) os.Exit(1) } + defer func() { + // We are using sync producer, so it is fine to close this as a last step + if err := publishers.watermillPublisher.Close(); err != nil { + logger.Error("failed to close kafka producer", slog.String("error", err.Error())) + } + }() + // Dependencies: entitlement entitlementConnectors := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{ DatabaseClient: pgClients.client, @@ -356,24 +354,24 @@ type eventPublishers struct { eventPublisher publisher.Publisher } -func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (*eventPublishers, error) { - eventDriver := watermillkafka.NewPublisher(kafkaProducer) - +func initEventPublisher(logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*eventPublishers, error) { + provisionTopics := []watermillkafka.AutoProvisionTopic{} if conf.NotificationService.Consumer.DLQ.AutoProvision.Enabled { - adminClient, err := kafka.NewAdminClientFromProducer(kafkaProducer) - if err != nil { - return nil, fmt.Errorf("failed to create Kafka admin client: %w", err) - } - - defer adminClient.Close() + provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{ + Topic: conf.NotificationService.Consumer.DLQ.Topic, + NumPartitions: int32(conf.NotificationService.Consumer.DLQ.AutoProvision.Partitions), + }) + } - if err := pkgkafka.ProvisionTopic(ctx, - adminClient, - logger, - conf.NotificationService.Consumer.DLQ.Topic, - conf.NotificationService.Consumer.DLQ.AutoProvision.Partitions); err != nil { - return nil, fmt.Errorf("failed to auto provision topic: %w", err) - } + eventDriver, err := watermillkafka.NewPublisher(watermillkafka.PublisherOptions{ + KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration, + ProvisionTopics: provisionTopics, + ClientID: otelName, + Logger: logger, + MetricMeter: metricMeter, + }) + if err != nil { + return nil, fmt.Errorf("failed to create event driver: %w", err) } eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{ @@ -391,30 +389,6 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co }, nil } -func initKafkaProducer(ctx context.Context, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, group *run.Group) (*kafka.Producer, error) { - // Initialize Kafka Admin Client - kafkaConfig := config.Ingest.Kafka.CreateKafkaConfig() - - // Initialize Kafka Producer - producer, err := kafka.NewProducer(&kafkaConfig) - if err != nil { - return nil, fmt.Errorf("init kafka ingest: %w", err) - } - - // Initialize Kafka Client Statistics reporter - kafkaMetrics, err := kafkametrics.New(metricMeter) - if err != nil { - return nil, fmt.Errorf("failed to create Kafka client metrics: %w", err) - } - - group.Add(kafkaingest.KafkaProducerGroup(ctx, producer, logger, kafkaMetrics)) - - go pkgkafka.ConsumeLogChannel(producer, logger.WithGroup("kafka").WithGroup("producer")) - - slog.Debug("connected to Kafka") - return producer, nil -} - type pgClients struct { driver *sql.Driver client *db.Client diff --git a/cmd/server/main.go b/cmd/server/main.go index e92308029..cbce1201e 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -16,6 +16,7 @@ import ( health "github.com/AppsFlyer/go-sundheit" healthhttp "github.com/AppsFlyer/go-sundheit/http" "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ThreeDotsLabs/watermill/message" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" @@ -212,12 +213,19 @@ func main() { os.Exit(1) } - eventPublisher, err := initEventPublisher(ctx, logger, conf, kafkaProducer) + eventPublishers, err := initEventPublisher(logger, conf, metricMeter) if err != nil { logger.Error("failed to initialize event publisher", "error", err) os.Exit(1) } + defer func() { + logger.Info("closing event publisher") + if err = eventPublishers.driver.Close(); err != nil { + logger.Error("failed to close event publisher", "error", err) + } + }() + // Initialize Kafka Ingest ingestCollector, kafkaIngestNamespaceHandler, err := initKafkaIngest( kafkaProducer, @@ -314,7 +322,7 @@ func main() { StreamingConnector: streamingConnector, MeterRepository: meterRepository, Logger: logger, - Publisher: eventPublisher.ForTopic(conf.Events.SystemEvents.Topic), + Publisher: eventPublishers.eventPublisher.ForTopic(conf.Events.SystemEvents.Topic), }) } @@ -421,34 +429,54 @@ func main() { } } -func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (publisher.Publisher, error) { +type publishers struct { + eventPublisher publisher.Publisher + driver message.Publisher +} + +func initEventPublisher(logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*publishers, error) { if !conf.Events.Enabled { - return publisher.NewPublisher(publisher.PublisherOptions{ + publisher, err := publisher.NewPublisher(publisher.PublisherOptions{ Publisher: &noop.Publisher{}, }) + if err != nil { + return nil, fmt.Errorf("failed to create event driver: %w", err) + } + + return &publishers{ + eventPublisher: publisher, + driver: &noop.Publisher{}, + }, nil + } + + provisionTopics := []watermillkafka.AutoProvisionTopic{} + if conf.Events.SystemEvents.AutoProvision.Enabled { + provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{ + Topic: conf.Events.SystemEvents.Topic, + NumPartitions: int32(conf.Events.SystemEvents.AutoProvision.Partitions), + }) + } + + eventDriver, err := watermillkafka.NewPublisher(watermillkafka.PublisherOptions{ + KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration, + ProvisionTopics: provisionTopics, + ClientID: otelName, + Logger: logger, + MetricMeter: metricMeter, + }) + if err != nil { + return nil, fmt.Errorf("failed to create event driver: %w", err) } - eventDriver := watermillkafka.NewPublisher(kafkaProducer) eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{ Publisher: eventDriver, Transform: watermillkafka.AddPartitionKeyFromSubject, }) - // Auto provision topics if needed - if conf.Events.SystemEvents.AutoProvision.Enabled { - adminClient, err := kafka.NewAdminClientFromProducer(kafkaProducer) - if err != nil { - return nil, fmt.Errorf("failed to create Kafka admin client: %w", err) - } - - defer adminClient.Close() - - if err := pkgkafka.ProvisionTopic(ctx, adminClient, logger, conf.Events.SystemEvents.Topic, conf.Events.SystemEvents.AutoProvision.Partitions); err != nil { - return nil, fmt.Errorf("failed to auto provision topic: %w", err) - } - } - - return eventPublisher, err + return &publishers{ + eventPublisher: eventPublisher, + driver: eventDriver, + }, err } func initKafkaProducer(ctx context.Context, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, group *run.Group) (*kafka.Producer, error) { diff --git a/cmd/sink-worker/main.go b/cmd/sink-worker/main.go index 0e4a9d19f..2362f9549 100644 --- a/cmd/sink-worker/main.go +++ b/cmd/sink-worker/main.go @@ -31,7 +31,6 @@ import ( "github.com/openmeterio/openmeter/config" "github.com/openmeterio/openmeter/internal/dedupe" "github.com/openmeterio/openmeter/internal/event/publisher" - "github.com/openmeterio/openmeter/internal/ingest/kafkaingest" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/sink" "github.com/openmeterio/openmeter/internal/sink/flushhandler" @@ -39,7 +38,6 @@ import ( watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" "github.com/openmeterio/openmeter/pkg/gosundheit" pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" - kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics" "github.com/openmeterio/openmeter/pkg/models" "github.com/openmeterio/openmeter/pkg/slicesx" ) @@ -163,13 +161,7 @@ func main() { var group run.Group // initialize system event producer - kafkaProducer, err := initKafkaProducer(ctx, conf, logger, metricMeter, &group) - if err != nil { - logger.Error("failed to initialize kafka producer", "error", err) - os.Exit(1) - } - - ingestEventFlushHandler, err := initIngestEventPublisher(ctx, logger, conf, kafkaProducer, metricMeter) + ingestEventFlushHandler, err := initIngestEventPublisher(logger, conf, metricMeter) if err != nil { logger.Error("failed to initialize event publisher", "error", err) os.Exit(1) @@ -228,56 +220,34 @@ func main() { } } -func initKafkaProducer(ctx context.Context, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, group *run.Group) (*kafka.Producer, error) { - // Initialize Kafka Admin Client - kafkaConfig := config.Ingest.Kafka.CreateKafkaConfig() - - // Initialize Kafka Producer - producer, err := kafka.NewProducer(&kafkaConfig) - if err != nil { - return nil, fmt.Errorf("init kafka ingest: %w", err) - } - - // Initialize Kafka Client Statistics reporter - kafkaMetrics, err := kafkametrics.New(metricMeter) - if err != nil { - return nil, fmt.Errorf("failed to create Kafka client metrics: %w", err) +func initIngestEventPublisher(logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (flushhandler.FlushEventHandler, error) { + if !conf.Events.Enabled { + return nil, nil } - group.Add(kafkaingest.KafkaProducerGroup(ctx, producer, logger, kafkaMetrics)) - - go pkgkafka.ConsumeLogChannel(producer, logger.WithGroup("kafka").WithGroup("producer")) + eventDriver, err := watermillkafka.NewPublisher(watermillkafka.PublisherOptions{ + KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration, + ClientID: otelName, + Logger: logger, - slog.Debug("connected to Kafka") - return producer, nil -} - -func initIngestEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer, metricMeter metric.Meter) (flushhandler.FlushEventHandler, error) { - if !conf.Events.Enabled { - return nil, nil + ProvisionTopics: []watermillkafka.AutoProvisionTopic{ + { + Topic: conf.Events.IngestEvents.Topic, + NumPartitions: int32(conf.Events.IngestEvents.AutoProvision.Partitions), + }, + }, + MetricMeter: metricMeter, + }) + if err != nil { + return nil, err } - eventDriver := watermillkafka.NewPublisher(kafkaProducer) eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{ Publisher: eventDriver, Transform: watermillkafka.AddPartitionKeyFromSubject, }) if err != nil { - return nil, fmt.Errorf("failed to create event publisher: %w", err) - } - - // Auto provision topics if needed - if conf.Events.IngestEvents.AutoProvision.Enabled { - adminClient, err := kafka.NewAdminClientFromProducer(kafkaProducer) - if err != nil { - return nil, fmt.Errorf("failed to create Kafka admin client: %w", err) - } - - defer adminClient.Close() - - if err := pkgkafka.ProvisionTopic(ctx, adminClient, logger, conf.Events.IngestEvents.Topic, conf.Events.IngestEvents.AutoProvision.Partitions); err != nil { - return nil, fmt.Errorf("failed to auto provision topic: %w", err) - } + return nil, err } targetTopic := eventPublisher.ForTopic(conf.Events.IngestEvents.Topic) @@ -286,7 +256,9 @@ func initIngestEventPublisher(ctx context.Context, logger *slog.Logger, conf con // We should only close the producer once the ingest events are fully processed flushHandlerMux.OnDrainComplete(func() { logger.Info("shutting down kafka producer") - kafkaProducer.Close() + if err := eventDriver.Close(); err != nil { + logger.Error("failed to close kafka producer", slog.String("error", err.Error())) + } }) ingestNotificationHandler, err := ingestnotification.NewHandler(logger, metricMeter, targetTopic, ingestnotification.HandlerConfig{ diff --git a/internal/watermill/driver/kafka/marshaler.go b/internal/watermill/driver/kafka/marshaler.go new file mode 100644 index 000000000..9a2303013 --- /dev/null +++ b/internal/watermill/driver/kafka/marshaler.go @@ -0,0 +1,38 @@ +package kafka + +import ( + "github.com/IBM/sarama" + "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/cloudevents/sdk-go/v2/event" +) + +const ( + PartitionKeyMetadataKey = "x-kafka-partition-key" +) + +type marshalerWithPartitionKey struct { + kafka.DefaultMarshaler +} + +func (m marshalerWithPartitionKey) Marshal(topic string, msg *message.Message) (*sarama.ProducerMessage, error) { + kafkaMsg, err := m.DefaultMarshaler.Marshal(topic, msg) + if err != nil { + return nil, err + } + + partitionKey := msg.Metadata.Get(PartitionKeyMetadataKey) + if partitionKey != "" { + kafkaMsg.Key = sarama.ByteEncoder(partitionKey) + } + + return kafkaMsg, nil +} + +// AddPartitionKeyFromSubject adds partition key to the message based on the CloudEvent subject. +func AddPartitionKeyFromSubject(watermillIn *message.Message, cloudEvent event.Event) (*message.Message, error) { + if cloudEvent.Subject() != "" { + watermillIn.Metadata[PartitionKeyMetadataKey] = cloudEvent.Subject() + } + return watermillIn, nil +} diff --git a/internal/watermill/driver/kafka/metrics/README.md b/internal/watermill/driver/kafka/metrics/README.md new file mode 100644 index 000000000..1007a5a77 --- /dev/null +++ b/internal/watermill/driver/kafka/metrics/README.md @@ -0,0 +1,9 @@ +# go-metrics to OpenTelemetry adapter + +Sarama (any most probably other projects) rely on the [http://github.com/rcrowley/go-metrics](go-metrics) package, numerous connectors exist +for the package, however there seem to be no opentelemetry one. + +Given the package only supports periodic scraping, it's better to wrap the metric types of go-metrics so that we can send the raw events to OpenTelemetry. The existing event interface of go-metrics is quite limited compared to OpenTelemetry esp considering the usage in Sarama lib, so right now: + +- Context is context.Background() for OpenTelemetry calls +- Errors for metric registration is only logged diff --git a/internal/watermill/driver/kafka/metrics/adapter.go b/internal/watermill/driver/kafka/metrics/adapter.go new file mode 100644 index 000000000..b335752a6 --- /dev/null +++ b/internal/watermill/driver/kafka/metrics/adapter.go @@ -0,0 +1,202 @@ +package metrics + +import ( + "context" + "errors" + "fmt" + "log/slog" + "reflect" + + "github.com/rcrowley/go-metrics" + otelmetric "go.opentelemetry.io/otel/metric" +) + +type ( + TransformMetricsNameToOtel func(string) string + ErrorHandler func(error) +) + +func LoggingErrorHandler(dest *slog.Logger) ErrorHandler { + return func(err error) { + dest.Error("error registering meter", "err", err) + } +} + +func MetricAddNamePrefix(prefix string) TransformMetricsNameToOtel { + return func(name string) string { + return prefix + name + } +} + +type NewRegistryOptions struct { + MetricMeter otelmetric.Meter + NameTransformFn TransformMetricsNameToOtel + ErrorHandler ErrorHandler +} + +func NewRegistry(opts NewRegistryOptions) (metrics.Registry, error) { + if opts.MetricMeter == nil { + return nil, errors.New("metric meter is required") + } + + if opts.NameTransformFn == nil { + opts.NameTransformFn = func(name string) string { + return name + } + } + + if opts.ErrorHandler == nil { + opts.ErrorHandler = func(err error) { + // no-op + } + } + + return ®istry{ + Registry: metrics.NewRegistry(), + meticMeter: opts.MetricMeter, + nameTransformFn: opts.NameTransformFn, + errorHandler: opts.ErrorHandler, + }, nil +} + +type registry struct { + metrics.Registry + meticMeter otelmetric.Meter + nameTransformFn TransformMetricsNameToOtel + errorHandler ErrorHandler +} + +func (r *registry) GetOrRegister(name string, def interface{}) interface{} { + existingMeter := r.Registry.Get(name) + if existingMeter != nil { + return existingMeter + } + + wrappedMeter, err := r.getWrappedMeter(name, def) + if err != nil { + r.errorHandler(err) + return def + } + + if err := r.Register(name, wrappedMeter); err != nil { + r.errorHandler(err) + } + + return wrappedMeter +} + +func (r *registry) Register(name string, def interface{}) error { + wrappedMeter, err := r.getWrappedMeter(name, def) + if err != nil { + return err + } + + return r.Registry.Register(name, wrappedMeter) +} + +func (r *registry) getWrappedMeter(name string, def interface{}) (interface{}, error) { + // def might be a function that returns the actual metric, not an interface{}, so we need to have reflect here, to instantiate + // the actual metric in such cases + if v := reflect.ValueOf(def); v.Kind() == reflect.Func { + def = v.Call(nil)[0].Interface() + } + + switch meterDef := def.(type) { + case metrics.Meter: + otelMeter, err := r.meticMeter.Int64Counter(r.nameTransformFn(name)) + if err != nil { + return def, err + } + + return &wrappedMeter{Meter: meterDef, otelMeter: otelMeter}, nil + case metrics.Counter: + otelMeter, err := r.meticMeter.Int64UpDownCounter(r.nameTransformFn(name)) + if err != nil { + return def, err + } + + return &wrappedCounter{Counter: meterDef, otelMeter: otelMeter}, nil + case metrics.GaugeFloat64: + otelMeter, err := r.meticMeter.Float64Gauge(r.nameTransformFn(name)) + if err != nil { + return def, err + } + + return &wrappedGaugeFloat64{GaugeFloat64: meterDef, otelMeter: otelMeter}, nil + case metrics.Gauge: + otelMeter, err := r.meticMeter.Int64Gauge(r.nameTransformFn(name)) + if err != nil { + return def, err + } + + return &wrappedGauge{Gauge: meterDef, otelMeter: otelMeter}, nil + case metrics.Histogram: + otelMeter, err := r.meticMeter.Int64Histogram(r.nameTransformFn(name)) + if err != nil { + r.errorHandler(err) + break + } + + return &wrappedHistogram{Histogram: meterDef, otelMeter: otelMeter}, nil + default: + // this is just a safety net, as we should have handled all the cases above (based on the lib) + r.errorHandler(fmt.Errorf("unsupported metric type (name=%s): %v", name, def)) + } + + return def, nil +} + +type wrappedMeter struct { + metrics.Meter + otelMeter otelmetric.Int64Counter +} + +func (m *wrappedMeter) Mark(n int64) { + m.otelMeter.Add(context.Background(), n) + m.Meter.Mark(n) +} + +type wrappedCounter struct { + metrics.Counter + otelMeter otelmetric.Int64UpDownCounter +} + +func (m *wrappedCounter) Inc(n int64) { + m.otelMeter.Add(context.Background(), n) + m.Counter.Inc(n) +} + +func (m *wrappedCounter) Dec(n int64) { + m.otelMeter.Add(context.Background(), -n) + m.Counter.Dec(n) +} + +type wrappedGaugeFloat64 struct { + metrics.GaugeFloat64 + otelMeter otelmetric.Float64Gauge +} + +func (m *wrappedGaugeFloat64) Update(newVal float64) { + m.otelMeter.Record(context.Background(), newVal) + m.GaugeFloat64.Update(newVal) +} + +type wrappedGauge struct { + metrics.Gauge + otelMeter otelmetric.Int64Gauge +} + +func (m *wrappedGauge) Update(newVal int64) { + m.otelMeter.Record(context.Background(), newVal) + m.Gauge.Update(newVal) +} + +type wrappedHistogram struct { + metrics.Histogram + otelMeter otelmetric.Int64Histogram +} + +func (m *wrappedHistogram) Update(newVal int64) { + m.otelMeter.Record(context.Background(), newVal) + m.Histogram.Update(newVal) +} diff --git a/internal/watermill/driver/kafka/publisher.go b/internal/watermill/driver/kafka/publisher.go index 60ce6a123..60fab5b60 100644 --- a/internal/watermill/driver/kafka/publisher.go +++ b/internal/watermill/driver/kafka/publisher.go @@ -1,63 +1,103 @@ package kafka import ( - "github.com/ThreeDotsLabs/watermill/message" - "github.com/cloudevents/sdk-go/v2/event" - "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "crypto/tls" + "errors" + "fmt" + "log/slog" + + "github.com/IBM/sarama" + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka" + otelmetric "go.opentelemetry.io/otel/metric" + + "github.com/openmeterio/openmeter/config" + "github.com/openmeterio/openmeter/internal/watermill/driver/kafka/metrics" ) const ( - PartitionKeyMetadataKey = "x-kafka-partition-key" + defaultMeterPrefix = "sarama.publisher." ) -type Publisher struct { - producer *kafka.Producer +type PublisherOptions struct { + KafkaConfig config.KafkaConfiguration + ProvisionTopics []AutoProvisionTopic + ClientID string + Logger *slog.Logger + MetricMeter otelmetric.Meter + MeterPrefix string } -var _ message.Publisher = (*Publisher)(nil) +func (o *PublisherOptions) Validate() error { + if err := o.KafkaConfig.Validate(); err != nil { + return fmt.Errorf("invalid kafka config: %w", err) + } -func NewPublisher(producer *kafka.Producer) *Publisher { - return &Publisher{producer: producer} -} + if o.ClientID == "" { + return errors.New("client ID is required") + } -func (p *Publisher) Publish(topic string, messages ...*message.Message) error { - for _, message := range messages { - kafkaMessage := &kafka.Message{ - TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, - Value: []byte(message.Payload), - Headers: make([]kafka.Header, 0, len(message.Metadata)), - } - - for k, v := range message.Metadata { - if k == PartitionKeyMetadataKey { - continue - } - kafkaMessage.Headers = append(kafkaMessage.Headers, kafka.Header{ - Key: k, - Value: []byte(v), - }) - } - - if partitionKey, ok := message.Metadata[PartitionKeyMetadataKey]; ok { - kafkaMessage.Key = []byte(partitionKey) - } - - if err := p.producer.Produce(kafkaMessage, nil); err != nil { - return err - } + if o.Logger == nil { + return errors.New("logger is required") } + if o.MetricMeter == nil { + return errors.New("metric meter is required") + } return nil } -func (p *Publisher) Close() error { - p.producer.Close() - return nil -} +func NewPublisher(in PublisherOptions) (*kafka.Publisher, error) { + if err := in.Validate(); err != nil { + return nil, err + } -func AddPartitionKeyFromSubject(watermillIn *message.Message, cloudEvent event.Event) (*message.Message, error) { - if cloudEvent.Subject() != "" { - watermillIn.Metadata[PartitionKeyMetadataKey] = cloudEvent.Subject() + if in.MeterPrefix == "" { + in.MeterPrefix = defaultMeterPrefix } - return watermillIn, nil + + wmConfig := kafka.PublisherConfig{ + Brokers: []string{in.KafkaConfig.Broker}, + OverwriteSaramaConfig: sarama.NewConfig(), + Marshaler: marshalerWithPartitionKey{}, + OTELEnabled: true, // This relies on the global trace provider + } + + wmConfig.OverwriteSaramaConfig.Metadata.RefreshFrequency = in.KafkaConfig.TopicMetadataRefreshInterval.Duration() + wmConfig.OverwriteSaramaConfig.ClientID = "openmeter/balance-worker" + + switch in.KafkaConfig.SecurityProtocol { + case "SASL_SSL": + wmConfig.OverwriteSaramaConfig.Net.SASL.Enable = true + wmConfig.OverwriteSaramaConfig.Net.SASL.User = in.KafkaConfig.SaslUsername + wmConfig.OverwriteSaramaConfig.Net.SASL.Password = in.KafkaConfig.SaslPassword + wmConfig.OverwriteSaramaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(in.KafkaConfig.SecurityProtocol) + wmConfig.OverwriteSaramaConfig.Net.TLS.Enable = true + wmConfig.OverwriteSaramaConfig.Net.TLS.Config = &tls.Config{} + default: + } + + // Producer specific settings + wmConfig.OverwriteSaramaConfig.Producer.Return.Successes = true + + meterRegistry, err := metrics.NewRegistry(metrics.NewRegistryOptions{ + MetricMeter: in.MetricMeter, + NameTransformFn: metrics.MetricAddNamePrefix(in.MeterPrefix), + ErrorHandler: metrics.LoggingErrorHandler(in.Logger), + }) + if err != nil { + return nil, err + } + + wmConfig.OverwriteSaramaConfig.MetricRegistry = meterRegistry + + if err := wmConfig.Validate(); err != nil { + return nil, err + } + + if err := provisionTopics(in.KafkaConfig.Broker, wmConfig.OverwriteSaramaConfig, in.ProvisionTopics); err != nil { + return nil, err + } + + return kafka.NewPublisher(wmConfig, watermill.NewSlogLogger(in.Logger)) } diff --git a/internal/watermill/driver/kafka/topic_provision.go b/internal/watermill/driver/kafka/topic_provision.go new file mode 100644 index 000000000..333915aa4 --- /dev/null +++ b/internal/watermill/driver/kafka/topic_provision.go @@ -0,0 +1,36 @@ +package kafka + +import ( + "github.com/IBM/sarama" + + "github.com/openmeterio/openmeter/pkg/errorsx" +) + +type AutoProvisionTopic struct { + Topic string + NumPartitions int32 +} + +func provisionTopics(broker string, config *sarama.Config, topics []AutoProvisionTopic) error { + admin, err := sarama.NewClusterAdmin([]string{broker}, config) + if err != nil { + return err + } + defer admin.Close() + + for _, topic := range topics { + err := admin.CreateTopic(topic.Topic, &sarama.TopicDetail{ + NumPartitions: topic.NumPartitions, + ReplicationFactor: -1, // use default + }, false) + if err != nil { + if topicError, ok := errorsx.ErrorAs[*sarama.TopicError](err); ok && topicError.Err == sarama.ErrTopicAlreadyExists { + continue + } + + return err + } + } + + return nil +} diff --git a/openmeter/watermill/driver/kafka/driver.go b/openmeter/watermill/driver/kafka/driver.go index 1c6a1b62d..ae9e1214c 100644 --- a/openmeter/watermill/driver/kafka/driver.go +++ b/openmeter/watermill/driver/kafka/driver.go @@ -1,9 +1,9 @@ package kafka import ( + "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka" "github.com/ThreeDotsLabs/watermill/message" "github.com/cloudevents/sdk-go/v2/event" - "github.com/confluentinc/confluent-kafka-go/v2/kafka" watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" ) @@ -13,11 +13,11 @@ const ( ) type ( - Publisher = watermillkafka.Publisher + PublisherOptions = watermillkafka.PublisherOptions ) -func NewPublisher(producer *kafka.Producer) *Publisher { - return watermillkafka.NewPublisher(producer) +func NewPublisher(in PublisherOptions) (*kafka.Publisher, error) { + return watermillkafka.NewPublisher(in) } func AddPartitionKeyFromSubject(watermillIn *message.Message, cloudEvent event.Event) (*message.Message, error) {