diff --git a/cmd/balance-worker/main.go b/cmd/balance-worker/main.go index 8f2d3e649..e58b47947 100644 --- a/cmd/balance-worker/main.go +++ b/cmd/balance-worker/main.go @@ -37,12 +37,13 @@ import ( "github.com/openmeterio/openmeter/internal/ent/db" entitlementpgadapter "github.com/openmeterio/openmeter/internal/entitlement/adapter" "github.com/openmeterio/openmeter/internal/entitlement/balanceworker" - "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/registry" registrybuilder "github.com/openmeterio/openmeter/internal/registry/builder" "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" + "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/contextx" "github.com/openmeterio/openmeter/pkg/framework/entutils" "github.com/openmeterio/openmeter/pkg/framework/operation" @@ -230,7 +231,7 @@ func main() { } // Create publisher - publishers, err := initEventPublisher(ctx, logger, conf, metricMeter) + eventPublisherDriver, err := initEventPublisherDriver(ctx, logger, conf, metricMeter) if err != nil { logger.Error("failed to initialize event publisher", slog.String("error", err.Error())) os.Exit(1) @@ -238,18 +239,29 @@ func main() { defer func() { // We are using sync publishing, so it's fine to close the publisher using defers. - if err := publishers.watermillPublisher.Close(); err != nil { + if err := eventPublisherDriver.Close(); err != nil { logger.Error("failed to close event publisher", slog.String("error", err.Error())) } }() + eventPublisher, err := eventbus.New(eventbus.Options{ + Publisher: eventPublisherDriver, + Config: conf.Events, + Logger: logger, + MarshalerTransformFunc: kafka.AddPartitionKeyFromSubject, + }) + if err != nil { + logger.Error("failed to initialize event publisher", slog.String("error", err.Error())) + os.Exit(1) + } + // Dependencies: entitlement entitlementConnectors := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{ DatabaseClient: pgClients.client, StreamingConnector: clickhouseStreamingConnector, MeterRepository: meterRepository, Logger: logger, - Publisher: publishers.eventPublisher.ForTopic(conf.Events.SystemEvents.Topic), + Publisher: eventPublisher, }) // Initialize worker @@ -259,8 +271,8 @@ func main() { Subscriber: wmSubscriber, TargetTopic: conf.Events.SystemEvents.Topic, - Publisher: publishers.watermillPublisher, - Marshaler: publishers.marshaler, + Publisher: eventPublisherDriver, + Marshaler: eventPublisher.Marshaler(), Entitlement: entitlementConnectors, Repo: entitlementpgadapter.NewPostgresEntitlementRepo(pgClients.client), @@ -353,13 +365,7 @@ func initKafkaSubscriber(conf config.Configuration, logger *slog.Logger) (messag return subscriber, nil } -type eventPublishers struct { - watermillPublisher message.Publisher - marshaler publisher.CloudEventMarshaler - eventPublisher publisher.Publisher -} - -func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*eventPublishers, error) { +func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (message.Publisher, error) { provisionTopics := []watermillkafka.AutoProvisionTopic{} if conf.BalanceWorker.DLQ.AutoProvision.Enabled { provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{ @@ -368,7 +374,7 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co }) } - eventDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{ + return watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{ KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration, ProvisionTopics: provisionTopics, ClientID: otelName, @@ -376,23 +382,6 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co MetricMeter: metricMeter, DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug, }) - if err != nil { - return nil, fmt.Errorf("failed to create event driver: %w", err) - } - - eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{ - Publisher: eventDriver, - Transform: watermillkafka.AddPartitionKeyFromSubject, - }) - if err != nil { - return nil, fmt.Errorf("failed to create event publisher: %w", err) - } - - return &eventPublishers{ - watermillPublisher: eventDriver, - marshaler: publisher.NewCloudEventMarshaler(watermillkafka.AddPartitionKeyFromSubject), - eventPublisher: eventPublisher, - }, nil } type pgClients struct { diff --git a/cmd/notification-service/main.go b/cmd/notification-service/main.go index 7d900ceea..b8835a6e1 100644 --- a/cmd/notification-service/main.go +++ b/cmd/notification-service/main.go @@ -35,13 +35,14 @@ import ( "github.com/openmeterio/openmeter/config" "github.com/openmeterio/openmeter/internal/ent/db" - "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/notification/consumer" "github.com/openmeterio/openmeter/internal/registry" registrybuilder "github.com/openmeterio/openmeter/internal/registry/builder" "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" + "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/contextx" "github.com/openmeterio/openmeter/pkg/framework/entutils" "github.com/openmeterio/openmeter/pkg/framework/operation" @@ -229,7 +230,7 @@ func main() { } // Create publisher - publishers, err := initEventPublisher(ctx, logger, conf, metricMeter) + eventPublisherDriver, err := initEventPublisherDriver(ctx, logger, conf, metricMeter) if err != nil { logger.Error("failed to initialize event publisher", slog.String("error", err.Error())) os.Exit(1) @@ -237,18 +238,29 @@ func main() { defer func() { // We are using sync producer, so it is fine to close this as a last step - if err := publishers.watermillPublisher.Close(); err != nil { + if err := eventPublisherDriver.Close(); err != nil { logger.Error("failed to close kafka producer", slog.String("error", err.Error())) } }() + eventPublisher, err := eventbus.New(eventbus.Options{ + Publisher: eventPublisherDriver, + Config: conf.Events, + Logger: logger, + MarshalerTransformFunc: kafka.AddPartitionKeyFromSubject, + }) + if err != nil { + logger.Error("failed to initialize event publisher", slog.String("error", err.Error())) + os.Exit(1) + } + // Dependencies: entitlement entitlementConnectors := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{ DatabaseClient: pgClients.client, StreamingConnector: clickhouseStreamingConnector, MeterRepository: meterRepository, Logger: logger, - Publisher: publishers.eventPublisher.ForTopic(conf.Events.SystemEvents.Topic), + Publisher: eventPublisher, }) // Initialize consumer @@ -256,7 +268,8 @@ func main() { SystemEventsTopic: conf.Events.SystemEvents.Topic, Subscriber: wmSubscriber, - Publisher: publishers.watermillPublisher, + Publisher: eventPublisherDriver, + Marshaler: eventPublisher.Marshaler(), Entitlement: entitlementConnectors, @@ -348,13 +361,7 @@ func initKafkaSubscriber(conf config.Configuration, logger *slog.Logger) (messag return subscriber, nil } -type eventPublishers struct { - watermillPublisher message.Publisher - marshaler publisher.CloudEventMarshaler - eventPublisher publisher.Publisher -} - -func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*eventPublishers, error) { +func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (message.Publisher, error) { provisionTopics := []watermillkafka.AutoProvisionTopic{} if conf.NotificationService.Consumer.DLQ.AutoProvision.Enabled { provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{ @@ -363,7 +370,7 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co }) } - eventDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{ + return watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{ KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration, ProvisionTopics: provisionTopics, ClientID: otelName, @@ -371,23 +378,6 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co MetricMeter: metricMeter, DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug, }) - if err != nil { - return nil, fmt.Errorf("failed to create event driver: %w", err) - } - - eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{ - Publisher: eventDriver, - Transform: watermillkafka.AddPartitionKeyFromSubject, - }) - if err != nil { - return nil, fmt.Errorf("failed to create event publisher: %w", err) - } - - return &eventPublishers{ - watermillPublisher: eventDriver, - marshaler: publisher.NewCloudEventMarshaler(watermillkafka.AddPartitionKeyFromSubject), - eventPublisher: eventPublisher, - }, nil } type pgClients struct { diff --git a/cmd/server/main.go b/cmd/server/main.go index 2d9740a40..734c93041 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -37,7 +37,6 @@ import ( "github.com/openmeterio/openmeter/config" "github.com/openmeterio/openmeter/internal/debug" "github.com/openmeterio/openmeter/internal/ent/db" - "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/ingest" "github.com/openmeterio/openmeter/internal/ingest/ingestdriver" "github.com/openmeterio/openmeter/internal/ingest/kafkaingest" @@ -54,6 +53,7 @@ import ( "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" "github.com/openmeterio/openmeter/internal/watermill/driver/noop" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/contextx" "github.com/openmeterio/openmeter/pkg/errorsx" "github.com/openmeterio/openmeter/pkg/framework/entutils" @@ -213,7 +213,7 @@ func main() { os.Exit(1) } - eventPublishers, err := initEventPublisher(ctx, logger, conf, metricMeter) + eventPublisherDriver, err := initEventPublisherDriver(ctx, logger, conf, metricMeter) if err != nil { logger.Error("failed to initialize event publisher", "error", err) os.Exit(1) @@ -221,11 +221,22 @@ func main() { defer func() { logger.Info("closing event publisher") - if err = eventPublishers.driver.Close(); err != nil { + if err = eventPublisherDriver.Close(); err != nil { logger.Error("failed to close event publisher", "error", err) } }() + eventPublisher, err := eventbus.New(eventbus.Options{ + Publisher: eventPublisherDriver, + Config: conf.Events, + Logger: logger, + MarshalerTransformFunc: watermillkafka.AddPartitionKeyFromSubject, + }) + if err != nil { + logger.Error("failed to initialize event bus", "error", err) + os.Exit(1) + } + // Initialize Kafka Ingest ingestCollector, kafkaIngestNamespaceHandler, err := initKafkaIngest( kafkaProducer, @@ -322,7 +333,7 @@ func main() { StreamingConnector: streamingConnector, MeterRepository: meterRepository, Logger: logger, - Publisher: eventPublishers.eventPublisher.ForTopic(conf.Events.SystemEvents.Topic), + Publisher: eventPublisher, }) } @@ -430,24 +441,9 @@ func main() { } } -type publishers struct { - eventPublisher publisher.Publisher - driver message.Publisher -} - -func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (*publishers, error) { +func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (message.Publisher, error) { if !conf.Events.Enabled { - publisher, err := publisher.NewPublisher(publisher.PublisherOptions{ - Publisher: &noop.Publisher{}, - }) - if err != nil { - return nil, fmt.Errorf("failed to create event driver: %w", err) - } - - return &publishers{ - eventPublisher: publisher, - driver: &noop.Publisher{}, - }, nil + return &noop.Publisher{}, nil } provisionTopics := []watermillkafka.AutoProvisionTopic{} @@ -458,7 +454,7 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co }) } - eventDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{ + return watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{ KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration, ProvisionTopics: provisionTopics, ClientID: otelName, @@ -466,19 +462,6 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co MetricMeter: metricMeter, DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug, }) - if err != nil { - return nil, fmt.Errorf("failed to create event driver: %w", err) - } - - eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{ - Publisher: eventDriver, - Transform: watermillkafka.AddPartitionKeyFromSubject, - }) - - return &publishers{ - eventPublisher: eventPublisher, - driver: eventDriver, - }, err } func initKafkaProducer(ctx context.Context, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, group *run.Group) (*kafka.Producer, error) { diff --git a/cmd/sink-worker/main.go b/cmd/sink-worker/main.go index 7740c15cd..012999f66 100644 --- a/cmd/sink-worker/main.go +++ b/cmd/sink-worker/main.go @@ -30,12 +30,12 @@ import ( "github.com/openmeterio/openmeter/config" "github.com/openmeterio/openmeter/internal/dedupe" - "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/sink" "github.com/openmeterio/openmeter/internal/sink/flushhandler" "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification" watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/gosundheit" pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" "github.com/openmeterio/openmeter/pkg/models" @@ -243,16 +243,16 @@ func initIngestEventPublisher(ctx context.Context, logger *slog.Logger, conf con return nil, err } - eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{ - Publisher: eventDriver, - Transform: watermillkafka.AddPartitionKeyFromSubject, + eventPublisher, err := eventbus.New(eventbus.Options{ + Publisher: eventDriver, + Config: conf.Events, + Logger: logger, + MarshalerTransformFunc: watermillkafka.AddPartitionKeyFromSubject, }) if err != nil { return nil, err } - targetTopic := eventPublisher.ForTopic(conf.Events.IngestEvents.Topic) - flushHandlerMux := flushhandler.NewFlushEventHandlers() // We should only close the producer once the ingest events are fully processed flushHandlerMux.OnDrainComplete(func() { @@ -262,7 +262,7 @@ func initIngestEventPublisher(ctx context.Context, logger *slog.Logger, conf con } }) - ingestNotificationHandler, err := ingestnotification.NewHandler(logger, metricMeter, targetTopic, ingestnotification.HandlerConfig{ + ingestNotificationHandler, err := ingestnotification.NewHandler(logger, metricMeter, eventPublisher, ingestnotification.HandlerConfig{ MaxEventsInBatch: conf.Sink.IngestNotifications.MaxEventsInBatch, }) if err != nil { diff --git a/internal/credit/connector.go b/internal/credit/connector.go index 1204d4e0a..c1cee162d 100644 --- a/internal/credit/connector.go +++ b/internal/credit/connector.go @@ -6,8 +6,8 @@ import ( "github.com/openmeterio/openmeter/internal/credit/balance" "github.com/openmeterio/openmeter/internal/credit/grant" - "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/streaming" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" ) type CreditConnector interface { @@ -20,7 +20,7 @@ type connector struct { grantRepo grant.Repo balanceSnapshotRepo balance.SnapshotRepo // external dependencies - publisher publisher.TopicPublisher + publisher eventbus.Publisher ownerConnector grant.OwnerConnector streamingConnector streaming.Connector logger *slog.Logger @@ -36,7 +36,7 @@ func NewCreditConnector( streamingConnector streaming.Connector, logger *slog.Logger, granularity time.Duration, - publisher publisher.TopicPublisher, + publisher eventbus.Publisher, ) CreditConnector { return &connector{ grantRepo: grantRepo, diff --git a/internal/credit/grant.go b/internal/credit/grant.go index 5aad85afc..b0ebc5987 100644 --- a/internal/credit/grant.go +++ b/internal/credit/grant.go @@ -7,7 +7,6 @@ import ( "github.com/openmeterio/openmeter/internal/credit/grant" eventmodels "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" "github.com/openmeterio/openmeter/pkg/clock" "github.com/openmeterio/openmeter/pkg/framework/entutils" "github.com/openmeterio/openmeter/pkg/models" @@ -87,22 +86,13 @@ func (m *connector) CreateGrant(ctx context.Context, owner grant.NamespacedOwner return nil, err } - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: spec.ComposeResourcePath(owner.Namespace, spec.EntityEntitlement, string(owner.ID), spec.EntityGrant, g.ID), - Subject: spec.ComposeResourcePath(owner.Namespace, spec.EntitySubjectKey, subjectKey), - }, - grant.CreatedEvent{ - Grant: *g, - Namespace: eventmodels.NamespaceID{ID: owner.Namespace}, - Subject: eventmodels.SubjectKeyAndID{Key: subjectKey}, - }, - ) - if err != nil { - return nil, err + event := grant.CreatedEvent{ + Grant: *g, + Namespace: eventmodels.NamespaceID{ID: owner.Namespace}, + Subject: eventmodels.SubjectKeyAndID{Key: subjectKey}, } - if err := m.publisher.Publish(event); err != nil { + if err := m.publisher.Publish(ctx, event); err != nil { return nil, err } @@ -154,22 +144,11 @@ func (m *connector) VoidGrant(ctx context.Context, grantID models.NamespacedID) return nil, err } - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: spec.ComposeResourcePath(grantID.Namespace, spec.EntityEntitlement, string(owner.ID), spec.EntityGrant, grantID.ID), - Subject: spec.ComposeResourcePath(grantID.Namespace, spec.EntitySubjectKey, subjectKey), - }, - grant.VoidedEvent{ - Grant: g, - Namespace: eventmodels.NamespaceID{ID: owner.Namespace}, - Subject: eventmodels.SubjectKeyAndID{Key: subjectKey}, - }, - ) - if err != nil { - return nil, err - } - - return nil, m.publisher.Publish(event) + return nil, m.publisher.Publish(ctx, grant.VoidedEvent{ + Grant: g, + Namespace: eventmodels.NamespaceID{ID: owner.Namespace}, + Subject: eventmodels.SubjectKeyAndID{Key: subjectKey}, + }) }) return err } diff --git a/internal/credit/grant/events.go b/internal/credit/grant/events.go index b682d970d..390a5aa0f 100644 --- a/internal/credit/grant/events.go +++ b/internal/credit/grant/events.go @@ -3,17 +3,13 @@ package grant import ( "errors" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" ) const ( - EventSubsystem spec.EventSubsystem = "credit" -) - -const ( - grantCreatedEventName spec.EventName = "grant.created" - grantVoidedEventName spec.EventName = "grant.voided" + EventSubsystem metadata.EventSubsystem = "credit" ) type grantEvent struct { @@ -45,16 +41,31 @@ func (g grantEvent) Validate() error { return nil } +func (e grantEvent) EventMetadata() metadata.EventMetadata { + return metadata.EventMetadata{ + Source: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntityEntitlement, string(e.OwnerID), metadata.EntityGrant, e.ID), + Subject: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntitySubjectKey, e.Subject.Key), + } +} + type CreatedEvent grantEvent -var grantCreatedEventSpec = spec.EventTypeSpec{ - Subsystem: EventSubsystem, - Name: grantCreatedEventName, - Version: "v1", +var ( + _ marshaler.Event = CreatedEvent{} + + grantCreatedEventName = metadata.GetEventName(metadata.EventType{ + Subsystem: EventSubsystem, + Name: "grant.created", + Version: "v1", + }) +) + +func (e CreatedEvent) EventName() string { + return grantCreatedEventName } -func (e CreatedEvent) Spec() *spec.EventTypeSpec { - return &grantCreatedEventSpec +func (e CreatedEvent) EventMetadata() metadata.EventMetadata { + return grantEvent(e).EventMetadata() } func (e CreatedEvent) Validate() error { @@ -63,14 +74,22 @@ func (e CreatedEvent) Validate() error { type VoidedEvent grantEvent -var grantVoidedEventSpec = spec.EventTypeSpec{ - Subsystem: EventSubsystem, - Name: grantVoidedEventName, - Version: "v1", +var ( + _ marshaler.Event = VoidedEvent{} + + grantVoidedEventName = metadata.GetEventName(metadata.EventType{ + Subsystem: EventSubsystem, + Name: "grant.voided", + Version: "v1", + }) +) + +func (e VoidedEvent) EventName() string { + return grantVoidedEventName } -func (e VoidedEvent) Spec() *spec.EventTypeSpec { - return &grantVoidedEventSpec +func (e VoidedEvent) EventMetadata() metadata.EventMetadata { + return grantEvent(e).EventMetadata() } func (e VoidedEvent) Validate() error { diff --git a/internal/entitlement/balanceworker/entitlementhandler.go b/internal/entitlement/balanceworker/entitlementhandler.go index 17e53339a..b7e2b1218 100644 --- a/internal/entitlement/balanceworker/entitlementhandler.go +++ b/internal/entitlement/balanceworker/entitlementhandler.go @@ -10,9 +10,10 @@ import ( "github.com/openmeterio/openmeter/internal/entitlement" entitlementdriver "github.com/openmeterio/openmeter/internal/entitlement/driver" "github.com/openmeterio/openmeter/internal/entitlement/snapshot" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" "github.com/openmeterio/openmeter/internal/productcatalog" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" "github.com/openmeterio/openmeter/pkg/convert" ) @@ -34,11 +35,8 @@ func (w *Worker) handleEntitlementDeleteEvent(ctx context.Context, delEvent enti calculationTime := time.Now() - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: spec.ComposeResourcePath(namespace, spec.EntityEntitlement, delEvent.ID), - Subject: spec.ComposeResourcePath(namespace, spec.EntitySubjectKey, delEvent.SubjectKey), - }, + event := marshaler.WithSource( + metadata.ComposeResourcePath(namespace, metadata.EntityEntitlement, delEvent.ID), snapshot.SnapshotEvent{ Entitlement: delEvent.Entitlement, Namespace: models.NamespaceID{ @@ -56,11 +54,8 @@ func (w *Worker) handleEntitlementDeleteEvent(ctx context.Context, delEvent enti CurrentUsagePeriod: delEvent.CurrentUsagePeriod, }, ) - if err != nil { - return nil, fmt.Errorf("failed to create cloud event: %w", err) - } - wmMessage, err := w.opts.Marshaler.MarshalEvent(event) + wmMessage, err := w.opts.Marshaler.Marshal(event) if err != nil { return nil, fmt.Errorf("failed to marshal cloud event: %w", err) } @@ -123,11 +118,8 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac } } - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: source, - Subject: spec.ComposeResourcePath(entitlementID.Namespace, spec.EntitySubjectKey, entitlement.SubjectKey), - }, + event := marshaler.WithSource( + source, snapshot.SnapshotEvent{ Entitlement: *entitlement, Namespace: models.NamespaceID{ @@ -146,11 +138,8 @@ func (w *Worker) createSnapshotEvent(ctx context.Context, entitlementID Namespac CurrentUsagePeriod: entitlement.CurrentUsagePeriod, }, ) - if err != nil { - return nil, fmt.Errorf("failed to create cloud event: %w", err) - } - wmMessage, err := w.opts.Marshaler.MarshalEvent(event) + wmMessage, err := w.opts.Marshaler.Marshal(event) if err != nil { return nil, fmt.Errorf("failed to marshal cloud event: %w", err) } diff --git a/internal/entitlement/balanceworker/ingesthandler.go b/internal/entitlement/balanceworker/ingesthandler.go index 153306f69..547c32731 100644 --- a/internal/entitlement/balanceworker/ingesthandler.go +++ b/internal/entitlement/balanceworker/ingesthandler.go @@ -7,14 +7,14 @@ import ( "github.com/hashicorp/go-multierror" "github.com/openmeterio/openmeter/internal/entitlement" - "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/productcatalog" - "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification" + ingestevents "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification/events" "github.com/openmeterio/openmeter/pkg/slicesx" ) -func (w *Worker) handleBatchedIngestEvent(ctx context.Context, event ingestnotification.EventBatchedIngest) ([]*message.Message, error) { - filters := slicesx.Map(event.Events, func(e ingestnotification.IngestEventData) IngestEventQueryFilter { +func (w *Worker) handleBatchedIngestEvent(ctx context.Context, event ingestevents.EventBatchedIngest) ([]*message.Message, error) { + filters := slicesx.Map(event.Events, func(e ingestevents.IngestEventData) IngestEventQueryFilter { return IngestEventQueryFilter{ Namespace: e.Namespace.ID, SubjectKey: e.SubjectKey, @@ -33,7 +33,7 @@ func (w *Worker) handleBatchedIngestEvent(ctx context.Context, event ingestnotif messages, err := w.handleEntitlementUpdateEvent( ctx, NamespacedID{Namespace: entitlement.Namespace, ID: entitlement.EntitlementID}, - spec.ComposeResourcePath(entitlement.Namespace, spec.EntityEvent), + metadata.ComposeResourcePath(entitlement.Namespace, metadata.EntityEvent), ) if err != nil { // TODO: add error information too diff --git a/internal/entitlement/balanceworker/worker.go b/internal/entitlement/balanceworker/worker.go index 359d6a714..05a35fe97 100644 --- a/internal/entitlement/balanceworker/worker.go +++ b/internal/entitlement/balanceworker/worker.go @@ -15,10 +15,10 @@ import ( "github.com/openmeterio/openmeter/internal/credit/grant" "github.com/openmeterio/openmeter/internal/entitlement" meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered" - "github.com/openmeterio/openmeter/internal/event/publisher" - "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/registry" - "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification" + ingestevents "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification/events" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" pkgmodels "github.com/openmeterio/openmeter/pkg/models" ) @@ -44,7 +44,7 @@ type WorkerOptions struct { TargetTopic string DLQ *WorkerDLQOptions Publisher message.Publisher - Marshaler publisher.CloudEventMarshaler + Marshaler marshaler.Marshaler Entitlement *registry.Entitlement Repo BalanceWorkerRepository @@ -169,7 +169,7 @@ func (w *Worker) Close() error { func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { w.opts.Logger.Debug("received system event", w.messageToLogFields(msg)...) - ceType, found := msg.Metadata[publisher.CloudEventsHeaderType] + ceType, found := msg.Metadata[marshaler.CloudEventsHeaderType] if !found { w.opts.Logger.Warn("missing CloudEvents type, ignoring message") return nil, nil @@ -177,70 +177,80 @@ func (w *Worker) handleEvent(msg *message.Message) ([]*message.Message, error) { switch ceType { // Entitlement events - case entitlement.EntitlementCreatedEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[entitlement.EntitlementCreatedEvent](msg.Payload) - if err != nil { + case entitlement.EntitlementCreatedEvent{}.EventName(): + event := entitlement.EntitlementCreatedEvent{} + + if err := w.opts.Marshaler.Unmarshal(msg, &event); err != nil { w.opts.Logger.Error("failed to parse entitlement created event", w.messageToLogFields(msg)...) return nil, err } + return w.handleEntitlementUpdateEvent( msg.Context(), - NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.ID}, - spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.ID), + NamespacedID{Namespace: event.Namespace.ID, ID: event.ID}, + metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, event.ID), ) - case entitlement.EntitlementDeletedEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[entitlement.EntitlementDeletedEvent](msg.Payload) - if err != nil { + + case entitlement.EntitlementDeletedEvent{}.EventName(): + event := entitlement.EntitlementDeletedEvent{} + + if err := w.opts.Marshaler.Unmarshal(msg, &event); err != nil { w.opts.Logger.Error("failed to parse entitlement deleted event", w.messageToLogFields(msg)...) return nil, err } - return w.handleEntitlementDeleteEvent(msg.Context(), event.Payload) + return w.handleEntitlementDeleteEvent(msg.Context(), event) // Grant events - case grant.CreatedEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[grant.CreatedEvent](msg.Payload) - if err != nil { + case grant.CreatedEvent{}.EventName(): + event := grant.CreatedEvent{} + + if err := w.opts.Marshaler.Unmarshal(msg, &event); err != nil { return nil, fmt.Errorf("failed to parse grant created event: %w", err) } return w.handleEntitlementUpdateEvent( msg.Context(), - NamespacedID{Namespace: event.Payload.Namespace.ID, ID: string(event.Payload.OwnerID)}, - spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, string(event.Payload.OwnerID), spec.EntityGrant, event.Payload.ID), + NamespacedID{Namespace: event.Namespace.ID, ID: string(event.OwnerID)}, + metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, string(event.OwnerID), metadata.EntityGrant, event.ID), ) - case grant.VoidedEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[grant.VoidedEvent](msg.Payload) - if err != nil { + + case grant.VoidedEvent{}.EventName(): + event := grant.VoidedEvent{} + + if err := w.opts.Marshaler.Unmarshal(msg, &event); err != nil { return nil, fmt.Errorf("failed to parse grant voided event: %w", err) } return w.handleEntitlementUpdateEvent( msg.Context(), - NamespacedID{Namespace: event.Payload.Namespace.ID, ID: string(event.Payload.OwnerID)}, - spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, string(event.Payload.OwnerID), spec.EntityGrant, event.Payload.ID), + NamespacedID{Namespace: event.Namespace.ID, ID: string(event.OwnerID)}, + metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, string(event.OwnerID), metadata.EntityGrant, event.ID), ) // Metered entitlement events - case meteredentitlement.EntitlementResetEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[meteredentitlement.EntitlementResetEvent](msg.Payload) - if err != nil { + case meteredentitlement.EntitlementResetEvent{}.EventName(): + event := meteredentitlement.EntitlementResetEvent{} + + if err := w.opts.Marshaler.Unmarshal(msg, &event); err != nil { return nil, fmt.Errorf("failed to parse reset entitlement event: %w", err) } return w.handleEntitlementUpdateEvent( msg.Context(), - NamespacedID{Namespace: event.Payload.Namespace.ID, ID: event.Payload.EntitlementID}, - spec.ComposeResourcePath(event.Payload.Namespace.ID, spec.EntityEntitlement, event.Payload.EntitlementID), + NamespacedID{Namespace: event.Namespace.ID, ID: event.EntitlementID}, + metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, event.EntitlementID), ) + // Ingest events - case ingestnotification.EventBatchedIngest{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[ingestnotification.EventBatchedIngest](msg.Payload) - if err != nil { + case ingestevents.EventBatchedIngest{}.EventName(): + event := ingestevents.EventBatchedIngest{} + + if err := w.opts.Marshaler.Unmarshal(msg, &event); err != nil { return nil, fmt.Errorf("failed to parse ingest event: %w", err) } - return w.handleBatchedIngestEvent(msg.Context(), event.Payload) + return w.handleBatchedIngestEvent(msg.Context(), event) } return nil, nil } diff --git a/internal/entitlement/connector.go b/internal/entitlement/connector.go index 22cd99355..313b61301 100644 --- a/internal/entitlement/connector.go +++ b/internal/entitlement/connector.go @@ -5,9 +5,8 @@ import ( "fmt" "time" + "github.com/openmeterio/openmeter/internal/event" eventmodels "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/publisher" - "github.com/openmeterio/openmeter/internal/event/spec" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/productcatalog" "github.com/openmeterio/openmeter/pkg/framework/entutils" @@ -74,7 +73,7 @@ type entitlementConnector struct { featureConnector productcatalog.FeatureConnector meterRepo meter.Repository - publisher publisher.TopicPublisher + publisher event.Publisher } func NewEntitlementConnector( @@ -84,7 +83,7 @@ func NewEntitlementConnector( meteredEntitlementConnector SubTypeConnector, staticEntitlementConnector SubTypeConnector, booleanEntitlementConnector SubTypeConnector, - publisher publisher.TopicPublisher, + publisher event.Publisher, ) Connector { return &entitlementConnector{ meteredEntitlementConnector: meteredEntitlementConnector, @@ -149,26 +148,16 @@ func (c *entitlementConnector) CreateEntitlement(ctx context.Context, input Crea return nil, err } - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: spec.ComposeResourcePath(input.Namespace, spec.EntityEntitlement, ent.ID), - Subject: spec.ComposeResourcePath(input.Namespace, spec.EntitySubjectKey, ent.SubjectKey), + err = c.publisher.Publish(ctx, EntitlementCreatedEvent{ + Entitlement: *ent, + Namespace: eventmodels.NamespaceID{ + ID: input.Namespace, }, - EntitlementCreatedEvent{ - Entitlement: *ent, - Namespace: eventmodels.NamespaceID{ - ID: input.Namespace, - }, - }, - ) + }) if err != nil { return nil, err } - if err := c.publisher.Publish(event); err != nil { - return nil, err - } - return ent, nil }) @@ -193,26 +182,16 @@ func (c *entitlementConnector) DeleteEntitlement(ctx context.Context, namespace return nil, err } - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: spec.ComposeResourcePath(namespace, spec.EntityEntitlement, ent.ID), - Subject: spec.ComposeResourcePath(namespace, spec.EntitySubjectKey, ent.SubjectKey), + err = c.publisher.Publish(ctx, EntitlementDeletedEvent{ + Entitlement: *ent, + Namespace: eventmodels.NamespaceID{ + ID: namespace, }, - EntitlementDeletedEvent{ - Entitlement: *ent, - Namespace: eventmodels.NamespaceID{ - ID: namespace, - }, - }, - ) + }) if err != nil { return nil, err } - if err := c.publisher.Publish(event); err != nil { - return nil, err - } - return ent, nil }) diff --git a/internal/entitlement/events.go b/internal/entitlement/events.go index 6969cfdef..0b2593ae6 100644 --- a/internal/entitlement/events.go +++ b/internal/entitlement/events.go @@ -3,17 +3,13 @@ package entitlement import ( "errors" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" ) const ( - EventSubsystem spec.EventSubsystem = "entitlement" -) - -const ( - entitlementCreatedEventName spec.EventName = "entitlement.created" - entitlementDeletedEventName spec.EventName = "entitlement.deleted" + EventSubsystem metadata.EventSubsystem = "entitlement" ) type entitlementEvent struct { @@ -39,32 +35,54 @@ func (e entitlementEvent) Validate() error { type EntitlementCreatedEvent entitlementEvent -var entitlementCreatedEventSpec = spec.EventTypeSpec{ - Subsystem: EventSubsystem, - Name: entitlementCreatedEventName, - Version: "v1", -} +var ( + _ marshaler.Event = EntitlementCreatedEvent{} -func (e EntitlementCreatedEvent) Spec() *spec.EventTypeSpec { - return &entitlementCreatedEventSpec -} + entitlementCreatedEventName = metadata.GetEventName(metadata.EventType{ + Subsystem: EventSubsystem, + Name: "entitlement.created", + Version: "v1", + }) +) func (e EntitlementCreatedEvent) Validate() error { return entitlementEvent(e).Validate() } -type EntitlementDeletedEvent entitlementEvent - -var entitlementDeletedEventSpec = spec.EventTypeSpec{ - Subsystem: EventSubsystem, - Name: entitlementDeletedEventName, - Version: "v1", +func (e EntitlementCreatedEvent) EventName() string { + return entitlementCreatedEventName } -func (e EntitlementDeletedEvent) Spec() *spec.EventTypeSpec { - return &entitlementDeletedEventSpec +func (e EntitlementCreatedEvent) EventMetadata() metadata.EventMetadata { + return metadata.EventMetadata{ + Source: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntityEntitlement, e.ID), + Subject: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntitySubjectKey, e.SubjectKey), + } } +type EntitlementDeletedEvent entitlementEvent + +var ( + _ marshaler.Event = EntitlementDeletedEvent{} + + entitlementDeletedEventName = metadata.GetEventName(metadata.EventType{ + Subsystem: EventSubsystem, + Name: "entitlement.deleted", + Version: "v1", + }) +) + func (e EntitlementDeletedEvent) Validate() error { return entitlementEvent(e).Validate() } + +func (e EntitlementDeletedEvent) EventName() string { + return entitlementDeletedEventName +} + +func (e EntitlementDeletedEvent) EventMetadata() metadata.EventMetadata { + return metadata.EventMetadata{ + Source: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntityEntitlement, e.ID), + Subject: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntitySubjectKey, e.SubjectKey), + } +} diff --git a/internal/entitlement/metered/connector.go b/internal/entitlement/metered/connector.go index 8ede7517c..22b223e40 100644 --- a/internal/entitlement/metered/connector.go +++ b/internal/entitlement/metered/connector.go @@ -11,7 +11,7 @@ import ( "github.com/openmeterio/openmeter/internal/entitlement" "github.com/openmeterio/openmeter/internal/productcatalog" "github.com/openmeterio/openmeter/internal/streaming" - "github.com/openmeterio/openmeter/openmeter/event/publisher" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/clock" "github.com/openmeterio/openmeter/pkg/convert" "github.com/openmeterio/openmeter/pkg/defaultx" @@ -63,7 +63,7 @@ type connector struct { entitlementRepo entitlement.EntitlementRepo granularity time.Duration - publisher publisher.TopicPublisher + publisher eventbus.Publisher } func NewMeteredEntitlementConnector( @@ -73,7 +73,7 @@ func NewMeteredEntitlementConnector( grantConnector credit.GrantConnector, grantRepo grant.Repo, entitlementRepo entitlement.EntitlementRepo, - publisher publisher.TopicPublisher, + publisher eventbus.Publisher, ) Connector { return &connector{ streamingConnector: streamingConnector, diff --git a/internal/entitlement/metered/events.go b/internal/entitlement/metered/events.go index 14916dcfe..042c9dee7 100644 --- a/internal/entitlement/metered/events.go +++ b/internal/entitlement/metered/events.go @@ -4,16 +4,13 @@ import ( "errors" "time" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" ) const ( - EventSubsystem spec.EventSubsystem = "meteredEntitlement" -) - -const ( - resetEntitlementEventName spec.EventName = "entitlement.reset" + EventSubsystem metadata.EventSubsystem = "meteredEntitlement" ) type EntitlementResetEvent struct { @@ -24,14 +21,25 @@ type EntitlementResetEvent struct { RetainAnchor bool `json:"retainAnchor"` } -var resetEntitlementEventSpec = spec.EventTypeSpec{ - Subsystem: EventSubsystem, - Name: resetEntitlementEventName, - Version: "v1", +var ( + _ marshaler.Event = EntitlementResetEvent{} + + resetEntitlementEventName = metadata.GetEventName(metadata.EventType{ + Subsystem: EventSubsystem, + Name: "entitlement.reset", + Version: "v1", + }) +) + +func (e EntitlementResetEvent) EventName() string { + return resetEntitlementEventName } -func (e EntitlementResetEvent) Spec() *spec.EventTypeSpec { - return &resetEntitlementEventSpec +func (e EntitlementResetEvent) EventMetadata() metadata.EventMetadata { + return metadata.EventMetadata{ + Source: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntityEntitlement, e.EntitlementID), + Subject: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntitySubjectKey, e.Subject.Key), + } } func (e EntitlementResetEvent) Validate() error { diff --git a/internal/entitlement/metered/reset.go b/internal/entitlement/metered/reset.go index 35d689511..3f793be4c 100644 --- a/internal/entitlement/metered/reset.go +++ b/internal/entitlement/metered/reset.go @@ -10,7 +10,6 @@ import ( "github.com/openmeterio/openmeter/internal/credit/grant" "github.com/openmeterio/openmeter/internal/entitlement" eventmodels "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" "github.com/openmeterio/openmeter/pkg/framework/entutils" "github.com/openmeterio/openmeter/pkg/models" ) @@ -45,28 +44,19 @@ func (e *connector) ResetEntitlementUsage(ctx context.Context, entitlementID mod return nil, err } - event, err := spec.NewCloudEvent( - spec.EventSpec{ - Source: spec.ComposeResourcePath(entitlementID.Namespace, spec.EntityEntitlement, entitlementID.ID), - Subject: spec.ComposeResourcePath(entitlementID.Namespace, spec.EntitySubjectKey, ent.SubjectKey), + event := EntitlementResetEvent{ + EntitlementID: entitlementID.ID, + Namespace: eventmodels.NamespaceID{ + ID: entitlementID.Namespace, }, - EntitlementResetEvent{ - EntitlementID: entitlementID.ID, - Namespace: eventmodels.NamespaceID{ - ID: entitlementID.Namespace, - }, - Subject: eventmodels.SubjectKeyAndID{ - Key: ent.SubjectKey, - }, - ResetAt: params.At, - RetainAnchor: params.RetainAnchor, + Subject: eventmodels.SubjectKeyAndID{ + Key: ent.SubjectKey, }, - ) - if err != nil { - return nil, err + ResetAt: params.At, + RetainAnchor: params.RetainAnchor, } - if err := e.publisher.Publish(event); err != nil { + if err := e.publisher.Publish(ctx, event); err != nil { return nil, err } diff --git a/internal/entitlement/metered/utils_test.go b/internal/entitlement/metered/utils_test.go index c9994ba97..ae50e7ac4 100644 --- a/internal/entitlement/metered/utils_test.go +++ b/internal/entitlement/metered/utils_test.go @@ -16,12 +16,12 @@ import ( "github.com/openmeterio/openmeter/internal/entitlement" entitlement_postgresadapter "github.com/openmeterio/openmeter/internal/entitlement/adapter" meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered" - "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/productcatalog" productcatalog_postgresadapter "github.com/openmeterio/openmeter/internal/productcatalog/adapter" streaming_testutils "github.com/openmeterio/openmeter/internal/streaming/testutils" "github.com/openmeterio/openmeter/internal/testutils" "github.com/openmeterio/openmeter/openmeter/meter" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/models" ) @@ -75,6 +75,8 @@ func setupConnector(t *testing.T) (meteredentitlement.Connector, *dependencies) t.Fatalf("failed to migrate database %s", err) } + mockPublisher := eventbus.NewMock(t) + // build adapters owner := meteredentitlement.NewEntitlementGrantOwnerAdapter( featureRepo, @@ -91,7 +93,7 @@ func setupConnector(t *testing.T) (meteredentitlement.Connector, *dependencies) streamingConnector, testLogger, time.Minute, - publisher.NewMockTopicPublisher(t), + mockPublisher, ) connector := meteredentitlement.NewMeteredEntitlementConnector( @@ -101,7 +103,7 @@ func setupConnector(t *testing.T) (meteredentitlement.Connector, *dependencies) creditConnector, grantRepo, entitlementRepo, - publisher.NewMockTopicPublisher(t), + mockPublisher, ) return connector, &dependencies{ diff --git a/internal/entitlement/snapshot/event.go b/internal/entitlement/snapshot/event.go index 95f50bc3c..784701b29 100644 --- a/internal/entitlement/snapshot/event.go +++ b/internal/entitlement/snapshot/event.go @@ -5,16 +5,13 @@ import ( "time" "github.com/openmeterio/openmeter/internal/entitlement" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" "github.com/openmeterio/openmeter/internal/productcatalog" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" "github.com/openmeterio/openmeter/pkg/recurrence" ) -const ( - snapshotEventName spec.EventName = "entitlement.snapshot" -) - type BalanceOperationType string const ( @@ -55,14 +52,24 @@ type SnapshotEvent struct { CurrentUsagePeriod *recurrence.Period `json:"currentUsagePeriod,omitempty"` } -var SnapshotEventSpec = spec.EventTypeSpec{ - Subsystem: entitlement.EventSubsystem, - Name: snapshotEventName, - Version: "v1", +var ( + _ marshaler.Event = SnapshotEvent{} + + snapshotEventName = metadata.GetEventName(metadata.EventType{ + Subsystem: entitlement.EventSubsystem, + Name: "entitlement.snapshot", + Version: "v1", + }) +) + +func (e SnapshotEvent) EventName() string { + return snapshotEventName } -func (e SnapshotEvent) Spec() *spec.EventTypeSpec { - return &SnapshotEventSpec +func (e SnapshotEvent) EventMetadata() metadata.EventMetadata { + return metadata.EventMetadata{ + Subject: metadata.ComposeResourcePath(e.Namespace.ID, metadata.EntitySubjectKey, e.Subject.Key), + } } func (e SnapshotEvent) Validate() error { diff --git a/internal/event/spec/event_type.go b/internal/event/metadata/event_type.go similarity index 64% rename from internal/event/spec/event_type.go rename to internal/event/metadata/event_type.go index 5a7b0b061..6af3297cc 100644 --- a/internal/event/spec/event_type.go +++ b/internal/event/metadata/event_type.go @@ -1,4 +1,4 @@ -package spec +package metadata import ( "fmt" @@ -6,14 +6,12 @@ import ( ) type ( - EventSubsystem string - EventName string - EventVersion string - EventSubjectKind string - EventSpecVersion string + EventSubsystem string + EventName string + EventVersion string ) -type EventTypeSpec struct { +type EventType struct { // Subsystem defines which connector/component is responsible for the event (e.g. ingest, entitlements, etc) Subsystem EventSubsystem @@ -22,21 +20,21 @@ type EventTypeSpec struct { // Version is the version of the event (e.g. v1, v2, etc) Version EventVersion +} + +func (s *EventType) EventName() string { + return fmt.Sprintf("io.openmeter.%s.%s.%s", s.Subsystem, s.Version, s.Name) +} - // cloudEventType is the actual cloud event type, so that we don't have the calculate it - // for each message - cloudEventType string +func (s *EventType) VersionSubsystem() string { + return fmt.Sprintf("io.openmeter.%s.%s", s.Subsystem, s.Version) } -func (s *EventTypeSpec) Type() string { - if s.cloudEventType != "" { - return s.cloudEventType - } - s.cloudEventType = fmt.Sprintf("io.openmeter.%s.%s.%s", s.Subsystem, s.Version, s.Name) - return s.cloudEventType +func GetEventName(spec EventType) string { + return spec.EventName() } -type EventSpec struct { +type EventMetadata struct { // ID of the event ID string diff --git a/internal/event/spec/resourcepath.go b/internal/event/metadata/resourcepath.go similarity index 96% rename from internal/event/spec/resourcepath.go rename to internal/event/metadata/resourcepath.go index a8486afc2..8bb0947ad 100644 --- a/internal/event/spec/resourcepath.go +++ b/internal/event/metadata/resourcepath.go @@ -1,4 +1,4 @@ -package spec +package metadata import ( "fmt" diff --git a/internal/event/publisher.go b/internal/event/publisher.go new file mode 100644 index 000000000..efc1a0970 --- /dev/null +++ b/internal/event/publisher.go @@ -0,0 +1,11 @@ +package event + +import ( + "context" + + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" +) + +type Publisher interface { + Publish(ctx context.Context, event marshaler.Event) error +} diff --git a/internal/event/publisher/publisher.go b/internal/event/publisher/publisher.go deleted file mode 100644 index dc4754c57..000000000 --- a/internal/event/publisher/publisher.go +++ /dev/null @@ -1,124 +0,0 @@ -package publisher - -import ( - "errors" - "testing" - "time" - - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/cloudevents/sdk-go/v2/event" - "github.com/stretchr/testify/assert" - - "github.com/openmeterio/openmeter/internal/watermill/driver/noop" -) - -const ( - CloudEventsHeaderType = "ce_type" - CloudEventsHeaderTime = "ce_time" - CloudEventsHeaderSource = "ce_source" - CloudEventsHeaderSubject = "ce_subject" -) - -type Publisher interface { - ForTopic(topic string) TopicPublisher -} - -type PublisherOptions struct { - // Publisher is the underlying watermill publisher object - Publisher message.Publisher - - // Transform is a function that can be used to transform the message before it is published, mainly used - // for driver specific tweaks. If more are required, we should add a chain function. - Transform TransformFunc -} - -type publisher struct { - publisher message.Publisher - marshaler CloudEventMarshaler -} - -func NewPublisher(opts PublisherOptions) (Publisher, error) { - if opts.Publisher == nil { - return nil, errors.New("publisher is required") - } - - return &publisher{ - publisher: opts.Publisher, - marshaler: NewCloudEventMarshaler(opts.Transform), - }, nil -} - -func NewMockTopicPublisher(t *testing.T) TopicPublisher { - pub, err := NewPublisher(PublisherOptions{ - Publisher: noop.Publisher{}, - }) - - assert.NoError(t, err) - return pub.ForTopic("test") -} - -func (p *publisher) ForTopic(topic string) TopicPublisher { - return &topicPublisher{ - publisher: p.publisher, - topic: topic, - marshaler: p.marshaler, - } -} - -type TopicPublisher interface { - Publish(event event.Event) error -} - -type topicPublisher struct { - publisher message.Publisher - topic string - marshaler CloudEventMarshaler -} - -func (p *topicPublisher) Publish(event event.Event) error { - msg, err := p.marshaler.MarshalEvent(event) - if err != nil { - return err - } - - return p.publisher.Publish(p.topic, msg) -} - -type CloudEventMarshaler interface { - MarshalEvent(event.Event) (*message.Message, error) -} - -type cloudEventMarshaler struct { - transform TransformFunc -} - -func NewCloudEventMarshaler(transform TransformFunc) CloudEventMarshaler { - return &cloudEventMarshaler{ - transform: transform, - } -} - -func (m *cloudEventMarshaler) MarshalEvent(event event.Event) (*message.Message, error) { - payload, err := event.MarshalJSON() - if err != nil { - return nil, err - } - - msg := message.NewMessage(watermill.NewUUID(), payload) - msg.Metadata.Set(CloudEventsHeaderType, event.Type()) - msg.Metadata.Set(CloudEventsHeaderTime, event.Time().In(time.UTC).Format(time.RFC3339)) - msg.Metadata.Set(CloudEventsHeaderSource, event.Source()) - if event.Subject() != "" { - msg.Metadata.Set(CloudEventsHeaderSubject, event.Subject()) - } - - if m.transform != nil { - msg, err = m.transform(msg, event) - if err != nil { - return nil, err - } - } - - return msg, nil -} diff --git a/internal/event/publisher/transformer.go b/internal/event/publisher/transformer.go deleted file mode 100644 index a2a14e6e6..000000000 --- a/internal/event/publisher/transformer.go +++ /dev/null @@ -1,8 +0,0 @@ -package publisher - -import ( - "github.com/ThreeDotsLabs/watermill/message" - "github.com/cloudevents/sdk-go/v2/event" -) - -type TransformFunc func(watermillIn *message.Message, cloudEvent event.Event) (*message.Message, error) diff --git a/internal/event/spec/parser.go b/internal/event/spec/parser.go deleted file mode 100644 index 10fa2f2cc..000000000 --- a/internal/event/spec/parser.go +++ /dev/null @@ -1,106 +0,0 @@ -package spec - -import ( - "errors" - "fmt" - "time" - - "github.com/cloudevents/sdk-go/v2/event" - "github.com/oklog/ulid/v2" -) - -type CloudEventsPayload interface { - Spec() *EventTypeSpec - Validate() error -} - -// NewCloudEvent creates a new CloudEvent with the given event spec and payload -// example usage: -// -// ev, err := CreateCloudEvent(EventSpec{ -// ID: "123", -// Source: "test", -// }, IngestEvent{...}) -func NewCloudEvent(eventSpec EventSpec, payload CloudEventsPayload) (event.Event, error) { - // Mandatory cloud events fields - if eventSpec.Source == "" { - return event.Event{}, errors.New("source is required") - } - - meta := payload.Spec() - ev := newCloudEventFromSpec(meta, eventSpec) - - if err := payload.Validate(); err != nil { - return event.Event{}, err - } - - if err := ev.SetData("application/json", payload); err != nil { - return event.Event{}, err - } - return ev, nil -} - -// newCloudEventFromSpec generates a new cloudevents without data being set based on the event spec -func newCloudEventFromSpec(meta *EventTypeSpec, spec EventSpec) event.Event { - ev := event.New() - ev.SetType(meta.Type()) - ev.SetSpecVersion(event.CloudEventsVersionV1) - - if spec.Time.IsZero() { - ev.SetTime(time.Now()) - } else { - ev.SetTime(spec.Time) - } - - if spec.ID == "" { - ev.SetID(ulid.Make().String()) - } else { - ev.SetID(spec.ID) - } - - ev.SetSource(spec.Source) - - ev.SetSubject(spec.Subject) - return ev -} - -// ParseCloudEvent unmarshals and validates a single CloudEvent into the given payload -// example usage: -// ingest, err := ParseCloudEvent[schema.IngestEvent](ev) -func ParseCloudEvent[PayloadType CloudEventsPayload](ev event.Event) (PayloadType, error) { - var payload PayloadType - - expectedType := payload.Spec().Type() - if expectedType != ev.Type() { - return payload, fmt.Errorf("cannot parse cloud event type %s as %s (expected by target payload)", ev.Type(), expectedType) - } - - if err := ev.DataAs(&payload); err != nil { - return payload, err - } - - if err := payload.Validate(); err != nil { - return payload, err - } - - return payload, nil -} - -type ParsedCloudEvent[PayloadType CloudEventsPayload] struct { - Event event.Event - Payload PayloadType -} - -func ParseCloudEventFromBytes[PayloadType CloudEventsPayload](data []byte) (*ParsedCloudEvent[PayloadType], error) { - cloudEvent := event.Event{} - if err := cloudEvent.UnmarshalJSON(data); err != nil { - return nil, fmt.Errorf("failed to unmarshal CloudEvent: %w", err) - } - - eventBody, err := ParseCloudEvent[PayloadType](cloudEvent) - if err != nil { - return nil, fmt.Errorf("failed to parse payload: %w", err) - } - - return &ParsedCloudEvent[PayloadType]{Event: cloudEvent, Payload: eventBody}, nil -} diff --git a/internal/event/spec/parser_test.go b/internal/event/spec/parser_test.go deleted file mode 100644 index 31f47df55..000000000 --- a/internal/event/spec/parser_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package spec_test - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/openmeterio/openmeter/internal/event/spec" -) - -type event struct { - Namespace string -} - -func (e event) Spec() *spec.EventTypeSpec { - return &spec.EventTypeSpec{ - Subsystem: "subsys", - Name: "test", - Version: "v1", - } -} - -var errNamespaceIsRequired = errors.New("namespace is required") - -func (e event) Validate() error { - if e.Namespace == "" { - return errNamespaceIsRequired - } - return nil -} - -func TestParserSanity(t *testing.T) { - cloudEvent, err := spec.NewCloudEvent( - spec.EventSpec{ - ID: "test", - Source: "somesource", - - Subject: spec.ComposeResourcePath("default", "subject", "ID"), - }, - event{ - Namespace: "test", - }) - - assert.NoError(t, err) - assert.Equal(t, "io.openmeter.subsys.v1.test", cloudEvent.Type()) - assert.Equal(t, "//openmeter.io/namespace/default/subject/ID", cloudEvent.Subject()) - assert.Equal(t, "somesource", cloudEvent.Source()) - - // parsing - parsedEvent, err := spec.ParseCloudEvent[event](cloudEvent) - assert.NoError(t, err) - assert.Equal(t, "test", parsedEvent.Namespace) - - // validation support - _, err = spec.NewCloudEvent( - spec.EventSpec{ - ID: "test", - Source: "somesource", - - Subject: spec.ComposeResourcePath("default", "subject", "ID"), - }, - event{}, - ) - - assert.Error(t, err) - assert.Equal(t, errNamespaceIsRequired, err) - - // ID autogeneration - cloudEvent, err = spec.NewCloudEvent( - spec.EventSpec{ - Source: "somesource", - }, - event{ - Namespace: "test", - }) - - assert.NoError(t, err) - assert.NotEmpty(t, cloudEvent.ID()) -} diff --git a/internal/notification/consumer/consumer.go b/internal/notification/consumer/consumer.go index 65d19f37d..5d6d299ce 100644 --- a/internal/notification/consumer/consumer.go +++ b/internal/notification/consumer/consumer.go @@ -11,10 +11,9 @@ import ( "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/openmeterio/openmeter/internal/entitlement/snapshot" - "github.com/openmeterio/openmeter/internal/event/publisher" - "github.com/openmeterio/openmeter/internal/event/spec" "github.com/openmeterio/openmeter/internal/registry" "github.com/openmeterio/openmeter/internal/watermill/nopublisher" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" ) type Options struct { @@ -22,6 +21,7 @@ type Options struct { Subscriber message.Subscriber Publisher message.Publisher + Marshaler marshaler.Marshaler DLQ *DLQOptions @@ -113,21 +113,22 @@ func (w *Consumer) Close() error { func (w *Consumer) handleSystemEvent(msg *message.Message) error { w.opts.Logger.Debug("received system event", w.messageToLogFields(msg)...) - ceType, found := msg.Metadata[publisher.CloudEventsHeaderType] + ceType, found := msg.Metadata[marshaler.CloudEventsHeaderType] if !found { w.opts.Logger.Warn("missing CloudEvents type, ignoring message") return nil } switch ceType { - case snapshot.SnapshotEvent{}.Spec().Type(): - event, err := spec.ParseCloudEventFromBytes[snapshot.SnapshotEvent](msg.Payload) - if err != nil { + case snapshot.SnapshotEvent{}.EventName(): + event := snapshot.SnapshotEvent{} + + if err := w.opts.Marshaler.Unmarshal(msg, &event); err != nil { w.opts.Logger.Error("failed to parse entitlement created event", w.messageToLogFields(msg)...) return err } - return w.handleSnapshotEvent(msg.Context(), event.Payload) + return w.handleSnapshotEvent(msg.Context(), event) } return nil } diff --git a/internal/registry/entitlement.go b/internal/registry/entitlement.go index 7841722ff..800b98141 100644 --- a/internal/registry/entitlement.go +++ b/internal/registry/entitlement.go @@ -5,10 +5,10 @@ import ( "github.com/openmeterio/openmeter/internal/ent/db" "github.com/openmeterio/openmeter/internal/meter" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/openmeter/credit" "github.com/openmeterio/openmeter/openmeter/entitlement" meteredentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/metered" - "github.com/openmeterio/openmeter/openmeter/event/publisher" "github.com/openmeterio/openmeter/openmeter/productcatalog" "github.com/openmeterio/openmeter/openmeter/streaming" ) @@ -30,5 +30,5 @@ type EntitlementOptions struct { StreamingConnector streaming.Connector Logger *slog.Logger MeterRepository meter.Repository - Publisher publisher.TopicPublisher + Publisher eventbus.Publisher } diff --git a/internal/sink/flushhandler/ingestnotification/events.go b/internal/sink/flushhandler/ingestnotification/events/events.go similarity index 60% rename from internal/sink/flushhandler/ingestnotification/events.go rename to internal/sink/flushhandler/ingestnotification/events/events.go index bcffbc77e..cfd512dcf 100644 --- a/internal/sink/flushhandler/ingestnotification/events.go +++ b/internal/sink/flushhandler/ingestnotification/events/events.go @@ -1,32 +1,35 @@ -package ingestnotification +package events import ( "errors" + "github.com/openmeterio/openmeter/internal/event/metadata" "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" ) const ( - EventSubsystem spec.EventSubsystem = "ingest" -) - -const ( - ingestedEventName spec.EventName = "events.ingested" + EventSubsystem metadata.EventSubsystem = "ingest" ) type EventBatchedIngest struct { Events []IngestEventData `json:"events"` } -var batchIngestEventSpec = spec.EventTypeSpec{ - Subsystem: EventSubsystem, - Name: ingestedEventName, - Version: "v1", -} +var ( + _ marshaler.Event = EventBatchedIngest{} + + batchIngestEventType = metadata.EventType{ + Subsystem: EventSubsystem, + Name: "events.ingested", + Version: "v1", + } + batchIngestEventName = metadata.GetEventName(batchIngestEventType) + EventVersionSubsystem = batchIngestEventType.VersionSubsystem() +) -func (b EventBatchedIngest) Spec() *spec.EventTypeSpec { - return &batchIngestEventSpec +func (b EventBatchedIngest) EventName() string { + return batchIngestEventName } func (b EventBatchedIngest) Validate() error { @@ -45,6 +48,12 @@ func (b EventBatchedIngest) Validate() error { return finalErr } +func (b EventBatchedIngest) EventMetadata() metadata.EventMetadata { + return metadata.EventMetadata{ + Source: metadata.ComposeResourcePathRaw(string(EventSubsystem)), + } +} + type IngestEventData struct { Namespace models.NamespaceID `json:"namespace"` SubjectKey string `json:"subjectKey"` diff --git a/internal/sink/flushhandler/ingestnotification/handler.go b/internal/sink/flushhandler/ingestnotification/handler.go index c5b102be2..7e2af7626 100644 --- a/internal/sink/flushhandler/ingestnotification/handler.go +++ b/internal/sink/flushhandler/ingestnotification/handler.go @@ -8,16 +8,16 @@ import ( "go.opentelemetry.io/otel/metric" eventmodels "github.com/openmeterio/openmeter/internal/event/models" - "github.com/openmeterio/openmeter/internal/event/spec" "github.com/openmeterio/openmeter/internal/sink/flushhandler" + ingestevents "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification/events" sinkmodels "github.com/openmeterio/openmeter/internal/sink/models" - "github.com/openmeterio/openmeter/openmeter/event/publisher" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/models" "github.com/openmeterio/openmeter/pkg/slicesx" ) type handler struct { - publisher publisher.TopicPublisher + publisher eventbus.Publisher logger *slog.Logger config HandlerConfig } @@ -26,7 +26,7 @@ type HandlerConfig struct { MaxEventsInBatch int } -func NewHandler(logger *slog.Logger, metricMeter metric.Meter, publisher publisher.TopicPublisher, config HandlerConfig) (flushhandler.FlushEventHandler, error) { +func NewHandler(logger *slog.Logger, metricMeter metric.Meter, publisher eventbus.Publisher, config HandlerConfig) (flushhandler.FlushEventHandler, error) { handler := &handler{ publisher: publisher, logger: logger, @@ -59,8 +59,8 @@ func (h *handler) OnFlushSuccess(ctx context.Context, events []sinkmodels.SinkMe } // Map the filtered events to the ingest event - iEvents := slicesx.Map(filtered, func(message sinkmodels.SinkMessage) IngestEventData { - return IngestEventData{ + iEvents := slicesx.Map(filtered, func(message sinkmodels.SinkMessage) ingestevents.IngestEventData { + return ingestevents.IngestEventData{ Namespace: eventmodels.NamespaceID{ID: message.Namespace}, SubjectKey: message.Serialized.Subject, MeterSlugs: h.getMeterSlugsFromMeters(message.Meters), @@ -70,18 +70,7 @@ func (h *handler) OnFlushSuccess(ctx context.Context, events []sinkmodels.SinkMe // We need to chunk the events to not exceed message size limits chunkedEvents := slicesx.Chunk(iEvents, h.config.MaxEventsInBatch) for _, chunk := range chunkedEvents { - event, err := spec.NewCloudEvent(spec.EventSpec{ - Source: spec.ComposeResourcePathRaw(string(EventBatchedIngest{}.Spec().Subsystem)), - }, EventBatchedIngest{ - Events: chunk, - }) - if err != nil { - finalErr = errors.Join(finalErr, err) - h.logger.Error("failed to create change notification", "error", err) - continue - } - - if err := h.publisher.Publish(event); err != nil { + if err := h.publisher.Publish(ctx, ingestevents.EventBatchedIngest{Events: chunk}); err != nil { finalErr = errors.Join(finalErr, err) h.logger.Error("failed to publish change notification", "error", err) } diff --git a/internal/watermill/driver/kafka/marshaler.go b/internal/watermill/driver/kafka/marshaler.go index 9a2303013..ab089b810 100644 --- a/internal/watermill/driver/kafka/marshaler.go +++ b/internal/watermill/driver/kafka/marshaler.go @@ -5,6 +5,8 @@ import ( "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka" "github.com/ThreeDotsLabs/watermill/message" "github.com/cloudevents/sdk-go/v2/event" + + "github.com/openmeterio/openmeter/pkg/slicesx" ) const ( @@ -24,6 +26,9 @@ func (m marshalerWithPartitionKey) Marshal(topic string, msg *message.Message) ( partitionKey := msg.Metadata.Get(PartitionKeyMetadataKey) if partitionKey != "" { kafkaMsg.Key = sarama.ByteEncoder(partitionKey) + kafkaMsg.Headers = slicesx.Filter(kafkaMsg.Headers, func(header sarama.RecordHeader) bool { + return string(header.Key) != PartitionKeyMetadataKey + }) } return kafkaMsg, nil diff --git a/internal/watermill/driver/kafka/publisher.go b/internal/watermill/driver/kafka/publisher.go index 9a0d8f2d7..40a652c44 100644 --- a/internal/watermill/driver/kafka/publisher.go +++ b/internal/watermill/driver/kafka/publisher.go @@ -62,11 +62,14 @@ func NewPublisher(ctx context.Context, in PublisherOptions) (*kafka.Publisher, e Brokers: []string{in.KafkaConfig.Broker}, OverwriteSaramaConfig: sarama.NewConfig(), Marshaler: marshalerWithPartitionKey{}, - OTELEnabled: true, // This relies on the global trace provider + Tracer: kafka.NewOTELSaramaTracer(), // This relies on the global trace provider } wmConfig.OverwriteSaramaConfig.Metadata.RefreshFrequency = in.KafkaConfig.TopicMetadataRefreshInterval.Duration() - wmConfig.OverwriteSaramaConfig.ClientID = "openmeter/balance-worker" + if in.ClientID == "" { + return nil, errors.New("client ID is required") + } + wmConfig.OverwriteSaramaConfig.ClientID = fmt.Sprintf("%s-publisher", in.ClientID) // These are globals, so we cannot append the publisher/subscriber name to them sarama.Logger = &SaramaLoggerAdaptor{ diff --git a/internal/watermill/eventbus/eventbus.go b/internal/watermill/eventbus/eventbus.go new file mode 100644 index 000000000..6696e542e --- /dev/null +++ b/internal/watermill/eventbus/eventbus.go @@ -0,0 +1,90 @@ +package eventbus + +import ( + "context" + "log/slog" + "strings" + "testing" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/stretchr/testify/assert" + + "github.com/openmeterio/openmeter/config" + ingestevents "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification/events" + "github.com/openmeterio/openmeter/internal/watermill/driver/noop" + "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" +) + +type Options struct { + Publisher message.Publisher + Config config.EventsConfiguration + Logger *slog.Logger + MarshalerTransformFunc marshaler.TransformFunc +} + +type Publisher interface { + Publish(ctx context.Context, event marshaler.Event) error + + Marshaler() marshaler.Marshaler +} + +type publisher struct { + eventBus *cqrs.EventBus + marshaler marshaler.Marshaler +} + +func (p publisher) Publish(ctx context.Context, event marshaler.Event) error { + return p.eventBus.Publish(ctx, event) +} + +func (p publisher) Marshaler() marshaler.Marshaler { + return p.marshaler +} + +func New(opts Options) (Publisher, error) { + marshaler := marshaler.New(opts.MarshalerTransformFunc) + + ingestVersionSubsystemPrefix := ingestevents.EventVersionSubsystem + "." + + eventBus, err := cqrs.NewEventBusWithConfig(opts.Publisher, cqrs.EventBusConfig{ + GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { + switch { + case strings.HasPrefix(params.EventName, ingestVersionSubsystemPrefix): + return opts.Config.IngestEvents.Topic, nil + default: + return opts.Config.SystemEvents.Topic, nil + } + }, + + Marshaler: marshaler, + Logger: watermill.NewSlogLogger(opts.Logger), + }) + if err != nil { + return nil, err + } + + return publisher{ + eventBus: eventBus, + marshaler: marshaler, + }, nil +} + +func NewMock(t *testing.T) Publisher { + eventBus, err := New(Options{ + Publisher: &noop.Publisher{}, + Config: config.EventsConfiguration{ + SystemEvents: config.EventSubsystemConfiguration{ + Topic: "test", + }, + IngestEvents: config.EventSubsystemConfiguration{ + Topic: "test", + }, + }, + Logger: slog.Default(), + }) + + assert.NoError(t, err) + return eventBus +} diff --git a/openmeter/credit/adapters.go b/openmeter/credit/adapters.go index 9e8a528ea..69f645f68 100644 --- a/openmeter/credit/adapters.go +++ b/openmeter/credit/adapters.go @@ -5,7 +5,7 @@ import ( "time" "github.com/openmeterio/openmeter/internal/credit" - "github.com/openmeterio/openmeter/internal/event/publisher" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/openmeter/streaming" ) @@ -16,7 +16,7 @@ func NewCreditConnector( streamingConnector streaming.Connector, logger *slog.Logger, granularity time.Duration, - publisher publisher.TopicPublisher, + publisher eventbus.Publisher, ) CreditConnector { return credit.NewCreditConnector( grantRepo, diff --git a/openmeter/entitlement/adapters.go b/openmeter/entitlement/adapters.go index a07334d9c..61528d84a 100644 --- a/openmeter/entitlement/adapters.go +++ b/openmeter/entitlement/adapters.go @@ -2,7 +2,7 @@ package entitlement import ( "github.com/openmeterio/openmeter/internal/entitlement" - "github.com/openmeterio/openmeter/openmeter/event/publisher" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/openmeter/meter" "github.com/openmeterio/openmeter/openmeter/productcatalog" ) @@ -14,7 +14,7 @@ func NewEntitlementConnector( metered SubTypeConnector, static SubTypeConnector, boolean SubTypeConnector, - publisher publisher.TopicPublisher, + eventBus eventbus.Publisher, ) EntitlementConnector { - return entitlement.NewEntitlementConnector(edb, fc, meterRepo, metered, static, boolean, publisher) + return entitlement.NewEntitlementConnector(edb, fc, meterRepo, metered, static, boolean, eventBus) } diff --git a/openmeter/entitlement/metered/adapters.go b/openmeter/entitlement/metered/adapters.go index 7be86cca3..d9e3353ac 100644 --- a/openmeter/entitlement/metered/adapters.go +++ b/openmeter/entitlement/metered/adapters.go @@ -4,9 +4,9 @@ import ( "log/slog" meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/openmeter/credit" "github.com/openmeterio/openmeter/openmeter/entitlement" - "github.com/openmeterio/openmeter/openmeter/event/publisher" "github.com/openmeterio/openmeter/openmeter/meter" "github.com/openmeterio/openmeter/openmeter/productcatalog" "github.com/openmeterio/openmeter/openmeter/streaming" @@ -19,7 +19,7 @@ func NewMeteredEntitlementConnector( grantConnector credit.GrantConnector, grantRepo credit.GrantRepo, entitlementRepo entitlement.EntitlementRepo, - publisher publisher.TopicPublisher, + publisher eventbus.Publisher, ) Connector { return meteredentitlement.NewMeteredEntitlementConnector( streamingConnector, diff --git a/openmeter/event/event.go b/openmeter/event/event.go new file mode 100644 index 000000000..0df0b1ace --- /dev/null +++ b/openmeter/event/event.go @@ -0,0 +1,5 @@ +package event + +import "github.com/openmeterio/openmeter/internal/event" + +type Publisher = event.Publisher diff --git a/openmeter/event/publisher/publisher.go b/openmeter/event/publisher/publisher.go deleted file mode 100644 index c5aec9318..000000000 --- a/openmeter/event/publisher/publisher.go +++ /dev/null @@ -1,22 +0,0 @@ -package publisher - -import "github.com/openmeterio/openmeter/internal/event/publisher" - -type ( - Publisher = publisher.Publisher - PublisherOptions = publisher.PublisherOptions - TopicPublisher = publisher.TopicPublisher - CloudEventMarshaler = publisher.CloudEventMarshaler -) - -type ( - TransformFunc = publisher.TransformFunc -) - -func NewPublisher(options PublisherOptions) (Publisher, error) { - return publisher.NewPublisher(options) -} - -func NewCloudEventMarshaler(transform TransformFunc) CloudEventMarshaler { - return publisher.NewCloudEventMarshaler(transform) -} diff --git a/openmeter/sink/flushhandler/ingestnotification/events/events.go b/openmeter/sink/flushhandler/ingestnotification/events/events.go new file mode 100644 index 000000000..c83e35523 --- /dev/null +++ b/openmeter/sink/flushhandler/ingestnotification/events/events.go @@ -0,0 +1,14 @@ +package events + +import "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification/events" + +const ( + EventSubsystem = events.EventSubsystem +) + +var EventVersionSubsystem = events.EventVersionSubsystem + +type ( + IngestEventData = events.IngestEventData + EventBatchedIngest = events.EventBatchedIngest +) diff --git a/openmeter/sink/flushhandler/ingestnotification/ingestnotification.go b/openmeter/sink/flushhandler/ingestnotification/ingestnotification.go index 9300474c7..51da240d2 100644 --- a/openmeter/sink/flushhandler/ingestnotification/ingestnotification.go +++ b/openmeter/sink/flushhandler/ingestnotification/ingestnotification.go @@ -6,22 +6,16 @@ import ( "go.opentelemetry.io/otel/metric" "github.com/openmeterio/openmeter/internal/sink/flushhandler/ingestnotification" - "github.com/openmeterio/openmeter/openmeter/event/publisher" "github.com/openmeterio/openmeter/openmeter/sink/flushhandler" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" ) // Event types -const ( - EventSubsystem = ingestnotification.EventSubsystem -) - type ( - IngestEventData = ingestnotification.IngestEventData - EventBatchedIngest = ingestnotification.EventBatchedIngest - HandlerConfig = ingestnotification.HandlerConfig + HandlerConfig = ingestnotification.HandlerConfig ) // Ingest notification handler -func NewHandler(logger *slog.Logger, metricMeter metric.Meter, publisher publisher.TopicPublisher, config ingestnotification.HandlerConfig) (flushhandler.FlushEventHandler, error) { +func NewHandler(logger *slog.Logger, metricMeter metric.Meter, publisher eventbus.Publisher, config ingestnotification.HandlerConfig) (flushhandler.FlushEventHandler, error) { return ingestnotification.NewHandler(logger, metricMeter, publisher, config) } diff --git a/openmeter/watermill/eventbus/eventbus.go b/openmeter/watermill/eventbus/eventbus.go new file mode 100644 index 000000000..fbb12aa3d --- /dev/null +++ b/openmeter/watermill/eventbus/eventbus.go @@ -0,0 +1,20 @@ +package eventbus + +import ( + "testing" + + "github.com/openmeterio/openmeter/internal/watermill/eventbus" +) + +type ( + Publisher = eventbus.Publisher + Options = eventbus.Options +) + +func New(options Options) (Publisher, error) { + return eventbus.New(options) +} + +func NewMock(t *testing.T) Publisher { + return eventbus.NewMock(t) +} diff --git a/openmeter/watermill/marshaler/marshaler.go b/openmeter/watermill/marshaler/marshaler.go new file mode 100644 index 000000000..6209b875a --- /dev/null +++ b/openmeter/watermill/marshaler/marshaler.go @@ -0,0 +1,154 @@ +package marshaler + +import ( + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/ThreeDotsLabs/watermill/message" + cloudevents "github.com/cloudevents/sdk-go/v2/event" + "github.com/oklog/ulid/v2" + + "github.com/openmeterio/openmeter/internal/event/metadata" +) + +const ( + CloudEventsHeaderType = "ce_type" + CloudEventsHeaderTime = "ce_time" + CloudEventsHeaderSource = "ce_source" + CloudEventsHeaderSubject = "ce_subject" +) + +const ( + UnknownEventName = "io.openmeter.unknown" +) + +type TransformFunc func(watermillIn *message.Message, cloudEvent cloudevents.Event) (*message.Message, error) + +type Marshaler = cqrs.CommandEventMarshaler + +type Event interface { + EventName() string + EventMetadata() metadata.EventMetadata + Validate() error +} + +type marshaler struct { + transform TransformFunc +} + +func New(transform TransformFunc) Marshaler { + return &marshaler{ + transform: transform, + } +} + +func (m *marshaler) Marshal(v interface{}) (*message.Message, error) { + ev, ok := v.(Event) + if !ok { + return nil, errors.New("invalid event type") + } + + // cloud events object + ce, err := NewCloudEvent(ev) + if err != nil { + return nil, err + } + + ceBytes, err := ce.MarshalJSON() + if err != nil { + return nil, err + } + + // watermill message + msg := message.NewMessage(ce.ID(), ceBytes) + + msg.Metadata.Set(CloudEventsHeaderType, ce.Type()) + msg.Metadata.Set(CloudEventsHeaderTime, ce.Time().In(time.UTC).Format(time.RFC3339)) + msg.Metadata.Set(CloudEventsHeaderSource, ce.Source()) + if ce.Subject() != "" { + msg.Metadata.Set(CloudEventsHeaderSubject, ce.Subject()) + } + + if m.transform != nil { + msg, err = m.transform(msg, ce) + if err != nil { + return nil, err + } + } + + return msg, nil +} + +func NewCloudEvent(ev Event) (cloudevents.Event, error) { + metadata := ev.EventMetadata() + // Mandatory cloud events fields + if metadata.Source == "" { + return cloudevents.Event{}, errors.New("source is required") + } + + cloudEvent := cloudevents.New() + cloudEvent.SetType(ev.EventName()) + cloudEvent.SetSpecVersion("1.0") + + if metadata.Time.IsZero() { + cloudEvent.SetTime(time.Now()) + } else { + cloudEvent.SetTime(metadata.Time) + } + + if metadata.ID == "" { + cloudEvent.SetID(ulid.Make().String()) + } else { + cloudEvent.SetID(metadata.ID) + } + + cloudEvent.SetSource(metadata.Source) + + cloudEvent.SetSubject(metadata.Subject) + + if err := ev.Validate(); err != nil { + return cloudevents.Event{}, err + } + + if err := cloudEvent.SetData("application/json", ev); err != nil { + return cloudevents.Event{}, err + } + return cloudEvent, nil +} + +func (m *marshaler) Unmarshal(msg *message.Message, v interface{}) error { + cloudEvent := cloudevents.Event{} + if err := cloudEvent.UnmarshalJSON(msg.Payload); err != nil { + return fmt.Errorf("failed to unmarshal CloudEvent: %w", err) + } + + err := json.Unmarshal(cloudEvent.Data(), v) + if err != nil { + return err + } + + ev, ok := v.(Event) + if !ok { + return errors.New("invalid event type") + } + + return ev.Validate() +} + +func (m *marshaler) Name(v interface{}) string { + ev, ok := v.(Event) + if !ok { + // This event name is passed to most of the cqrs functions, but given that we cannot + // return an error here, we are generating a name that's unlikely to match any event. + return UnknownEventName + } + + return ev.EventName() +} + +func (m *marshaler) NameFromMessage(msg *message.Message) string { + return msg.Metadata.Get(CloudEventsHeaderType) +} diff --git a/openmeter/watermill/marshaler/source.go b/openmeter/watermill/marshaler/source.go new file mode 100644 index 000000000..160b647ea --- /dev/null +++ b/openmeter/watermill/marshaler/source.go @@ -0,0 +1,51 @@ +package marshaler + +import ( + "encoding/json" + "errors" + + "github.com/openmeterio/openmeter/internal/event/metadata" +) + +type eventWithSource struct { + Event `json:",inline"` + + source string `json:"-"` +} + +// WithSource can be used to add the CloudEvents source field to an event. +func WithSource(source string, ev Event) Event { + return &eventWithSource{ + source: source, + Event: ev, + } +} + +func (e *eventWithSource) EventMetadata() metadata.EventMetadata { + metadata := e.Event.EventMetadata() + metadata.Source = e.source + + return metadata +} + +func (e *eventWithSource) Validate() error { + if err := e.Event.Validate(); err != nil { + return err + } + + if e.source == "" { + return errors.New("source must be set") + } + + return nil +} + +func (e *eventWithSource) EventName() string { + return e.Event.EventName() +} + +// MarshalJSON marshals the event only, as JSON library embeds the Event name into the output, +// if the composed object is a pointer to an interface. (e.g. we would get "Event": {} in the payload) +func (e *eventWithSource) MarshalJSON() ([]byte, error) { + return json.Marshal(e.Event) +} diff --git a/openmeter/watermill/marshaler/source_test.go b/openmeter/watermill/marshaler/source_test.go new file mode 100644 index 000000000..c5cb04d82 --- /dev/null +++ b/openmeter/watermill/marshaler/source_test.go @@ -0,0 +1,47 @@ +package marshaler + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/openmeterio/openmeter/internal/event/metadata" +) + +type event struct { + Value string `json:"value"` +} + +func (e *event) EventMetadata() metadata.EventMetadata { + return metadata.EventMetadata{} +} + +func (e *event) Validate() error { + return nil +} + +func (e *event) EventName() string { + return "event" +} + +func TestWithSubject(t *testing.T) { + marshaler := New(nil) + + ev := &event{ + Value: "value", + } + + evWithSource := WithSource("source", ev) + msg, err := marshaler.Marshal(evWithSource) + + // Check if the source is set in the metadata + assert.NoError(t, err) + assert.Equal(t, "source", msg.Metadata.Get(CloudEventsHeaderSource)) + + // Check if the event can be unmarshaled + unmarshaledEvent := &event{} + err = marshaler.Unmarshal(msg, unmarshaledEvent) + assert.NoError(t, err) + + assert.Equal(t, ev, unmarshaledEvent) +} diff --git a/test/entitlement/regression/framework_test.go b/test/entitlement/regression/framework_test.go index 69b8817d3..3680cca7d 100644 --- a/test/entitlement/regression/framework_test.go +++ b/test/entitlement/regression/framework_test.go @@ -16,12 +16,12 @@ import ( booleanentitlement "github.com/openmeterio/openmeter/internal/entitlement/boolean" meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered" staticentitlement "github.com/openmeterio/openmeter/internal/entitlement/static" - "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/productcatalog" productcatalogrepo "github.com/openmeterio/openmeter/internal/productcatalog/adapter" streamingtestutils "github.com/openmeterio/openmeter/internal/streaming/testutils" "github.com/openmeterio/openmeter/internal/testutils" + "github.com/openmeterio/openmeter/internal/watermill/eventbus" "github.com/openmeterio/openmeter/pkg/models" ) @@ -89,6 +89,8 @@ func setupDependencies(t *testing.T) Dependencies { entitlementRepo := entitlementrepo.NewPostgresEntitlementRepo(dbClient) usageResetRepo := entitlementrepo.NewPostgresUsageResetRepo(dbClient) + mockPublisher := eventbus.NewMock(t) + owner := meteredentitlement.NewEntitlementGrantOwnerAdapter( featureRepo, entitlementRepo, @@ -104,7 +106,7 @@ func setupDependencies(t *testing.T) Dependencies { streaming, log, time.Minute, - publisher.NewMockTopicPublisher(t), + mockPublisher, ) meteredEntitlementConnector := meteredentitlement.NewMeteredEntitlementConnector( @@ -114,7 +116,7 @@ func setupDependencies(t *testing.T) Dependencies { creditConnector, grantRepo, entitlementRepo, - publisher.NewMockTopicPublisher(t), + mockPublisher, ) staticEntitlementConnector := staticentitlement.NewStaticEntitlementConnector() @@ -127,7 +129,7 @@ func setupDependencies(t *testing.T) Dependencies { meteredEntitlementConnector, staticEntitlementConnector, booleanEntitlementConnector, - publisher.NewMockTopicPublisher(t), + mockPublisher, ) return Dependencies{