diff --git a/Makefile b/Makefile index 10074c4d4..c1c60fed6 100644 --- a/Makefile +++ b/Makefile @@ -60,6 +60,11 @@ build-balance-worker: ## Build balance-worker binary $(call print-target) go build -o build/balance-worker ./cmd/balance-worker +.PHONY: build-billing-worker +build-billing-worker: ## Build billing-worker binary + $(call print-target) + go build -o build/billing-worker ./cmd/billing-worker + .PHONY: build-notification-service build-notification-service: ## Build notification-service binary $(call print-target) @@ -86,6 +91,12 @@ balance-worker: ## Run balance-worker $(call print-target) air -c ./cmd/balance-worker/.air.toml +.PHONY: billing-worker +billing-worker: ## Run billing-worker + @ if [ config.yaml -ot config.example.yaml ]; then diff -u config.yaml config.example.yaml || (echo "!!! The configuration example changed. Please update your config.yaml file accordingly (or at least touch it). !!!" && false); fi + $(call print-target) + air -c ./cmd/billing-worker/.air.toml + .PHONY: notification-service notification-service: ## Run notification-service @ if [ config.yaml -ot config.example.yaml ]; then diff -u config.yaml config.example.yaml || (echo "!!! The configuration example changed. Please update your config.yaml file accordingly (or at least touch it). !!!" && false); fi diff --git a/app/common/app.go b/app/common/app.go index 6cc70d880..5a02ebf10 100644 --- a/app/common/app.go +++ b/app/common/app.go @@ -1,44 +1,101 @@ package common import ( - "errors" + "context" "fmt" "log/slog" - "net/http" - "github.com/oklog/run" + "github.com/google/wire" "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/app" + appadapter "github.com/openmeterio/openmeter/openmeter/app/adapter" + appsandbox "github.com/openmeterio/openmeter/openmeter/app/sandbox" + appservice "github.com/openmeterio/openmeter/openmeter/app/service" + appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe" + appstripeadapter "github.com/openmeterio/openmeter/openmeter/app/stripe/adapter" + appstripeservice "github.com/openmeterio/openmeter/openmeter/app/stripe/service" + "github.com/openmeterio/openmeter/openmeter/customer" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/namespace" + "github.com/openmeterio/openmeter/openmeter/secret" ) -// Metadata provides information about the service to components that need it (eg. telemetry). -type Metadata struct { - ServiceName string - Version string - Environment string - OpenTelemetryName string -} +var App = wire.NewSet( + wire.FieldsOf(new(config.Configuration), "Apps"), + + NewAppService, + NewAppStripeService, + NewAppSandboxProvisioner, +) + +type AppSandboxProvisioner func() error -func NewMetadata(conf config.Configuration, version string, serviceName string) Metadata { - return Metadata{ - ServiceName: fmt.Sprintf("openmeter-%s", serviceName), - Version: version, - Environment: conf.Environment, - OpenTelemetryName: fmt.Sprintf("openmeter.io/%s", serviceName), +func NewAppService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration) (app.Service, error) { + // TODO: remove this check after enabled by default + if !appsConfig.Enabled || db == nil { + return nil, nil } + + appAdapter, err := appadapter.New(appadapter.Config{ + Client: db, + BaseURL: appsConfig.BaseURL, + }) + if err != nil { + return nil, fmt.Errorf("failed to create app adapter: %w", err) + } + + return appservice.New(appservice.Config{ + Adapter: appAdapter, + }) } -// Runner is a helper struct that runs a group of services. -type Runner struct { - Group run.Group - Logger *slog.Logger +func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, appService app.Service, customerService customer.Service, secretService secret.Service) (appstripe.Service, error) { + // TODO: remove this check after enabled by default + if !appsConfig.Enabled || db == nil { + return nil, nil + } + + appStripeAdapter, err := appstripeadapter.New(appstripeadapter.Config{ + Client: db, + AppService: appService, + CustomerService: customerService, + SecretService: secretService, + }) + if err != nil { + return nil, fmt.Errorf("failed to create appstripe adapter: %w", err) + } + + return appstripeservice.New(appstripeservice.Config{ + Adapter: appStripeAdapter, + AppService: appService, + SecretService: secretService, + }) } -func (r Runner) Run() { - err := r.Group.Run() - if e := (run.SignalError{}); errors.As(err, &e) { - r.Logger.Info("received signal: shutting down", slog.String("signal", e.Signal.String())) - } else if !errors.Is(err, http.ErrServerClosed) { - r.Logger.Error("application stopped due to error", slog.String("error", err.Error())) +func NewAppSandboxProvisioner(ctx context.Context, logger *slog.Logger, appsConfig config.AppsConfiguration, appService app.Service, namespaceManager *namespace.Manager) (AppSandboxProvisioner, error) { + if !appsConfig.Enabled { + return nil, nil + } + + _, err := appsandbox.NewFactory(appsandbox.Config{ + AppService: appService, + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize app sandbox factory: %w", err) } + + return func() error { + app, err := appsandbox.AutoProvision(ctx, appsandbox.AutoProvisionInput{ + Namespace: namespaceManager.GetDefaultNamespace(), + AppService: appService, + }) + if err != nil { + return fmt.Errorf("failed to auto-provision sandbox app: %w", err) + } + + logger.Info("sandbox app auto-provisioned", "app_id", app.GetID().ID) + + return nil + }, nil } diff --git a/app/common/billing.go b/app/common/billing.go new file mode 100644 index 000000000..0a75c0ea3 --- /dev/null +++ b/app/common/billing.go @@ -0,0 +1,58 @@ +package common + +import ( + "fmt" + "log/slog" + + "github.com/google/wire" + + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/app" + appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe" + "github.com/openmeterio/openmeter/openmeter/billing" + billingadapter "github.com/openmeterio/openmeter/openmeter/billing/adapter" + billingservice "github.com/openmeterio/openmeter/openmeter/billing/service" + "github.com/openmeterio/openmeter/openmeter/customer" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/meter" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/openmeter/streaming" +) + +var Billing = wire.NewSet( + BillingService, +) + +func BillingService( + logger *slog.Logger, + db *entdb.Client, + appService app.Service, + appStripeService appstripe.Service, + billingConfig config.BillingConfiguration, + customerService customer.Service, + featureConnector feature.FeatureConnector, + meterRepo meter.Repository, + streamingConnector streaming.Connector, +) (billing.Service, error) { + if !billingConfig.Enabled { + return nil, nil + } + + adapter, err := billingadapter.New(billingadapter.Config{ + Client: db, + Logger: logger, + }) + if err != nil { + return nil, fmt.Errorf("creating billing adapter: %w", err) + } + + return billingservice.New(billingservice.Config{ + Adapter: adapter, + AppService: appService, + CustomerService: customerService, + FeatureService: featureConnector, + Logger: logger, + MeterRepo: meterRepo, + StreamingConnector: streamingConnector, + }) +} diff --git a/app/common/clickhouse.go b/app/common/clickhouse.go index 83197ce5a..026aaad39 100644 --- a/app/common/clickhouse.go +++ b/app/common/clickhouse.go @@ -4,13 +4,12 @@ import ( "fmt" "github.com/ClickHouse/clickhouse-go/v2" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/openmeterio/openmeter/app/config" ) // TODO: add closer function? -func NewClickHouse(conf config.ClickHouseAggregationConfiguration) (driver.Conn, error) { +func NewClickHouse(conf config.ClickHouseAggregationConfiguration) (clickhouse.Conn, error) { conn, err := clickhouse.Open(conf.GetClientOptions()) if err != nil { return nil, fmt.Errorf("failed to initialize clickhouse client: %w", err) diff --git a/app/common/customer.go b/app/common/customer.go new file mode 100644 index 000000000..ae67366ce --- /dev/null +++ b/app/common/customer.go @@ -0,0 +1,36 @@ +package common + +import ( + "fmt" + "log/slog" + + "github.com/google/wire" + + "github.com/openmeterio/openmeter/openmeter/customer" + customeradapter "github.com/openmeterio/openmeter/openmeter/customer/adapter" + customerservice "github.com/openmeterio/openmeter/openmeter/customer/service" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" +) + +var Customer = wire.NewSet( + NewCustomerService, +) + +func NewCustomerService(logger *slog.Logger, db *entdb.Client) (customer.Service, error) { + // TODO: remove this check after enabled by default + if db == nil { + return nil, nil + } + + customerAdapter, err := customeradapter.New(customeradapter.Config{ + Client: db, + Logger: logger.WithGroup("customer.postgres"), + }) + if err != nil { + return nil, fmt.Errorf("failed to create customer adapter: %w", err) + } + + return customerservice.New(customerservice.Config{ + Adapter: customerAdapter, + }) +} diff --git a/app/common/entitlement.go b/app/common/entitlement.go new file mode 100644 index 000000000..fc6c0c110 --- /dev/null +++ b/app/common/entitlement.go @@ -0,0 +1,43 @@ +package common + +import ( + "log/slog" + + "github.com/google/wire" + + "github.com/openmeterio/openmeter/app/config" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/meter" + "github.com/openmeterio/openmeter/openmeter/registry" + registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder" + "github.com/openmeterio/openmeter/openmeter/streaming" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" +) + +var Entitlement = wire.NewSet( + wire.FieldsOf(new(config.Configuration), "Entitlements"), + + NewEntitlementRegistry, +) + +func NewEntitlementRegistry( + logger *slog.Logger, + db *entdb.Client, + entitlementConfig config.EntitlementsConfiguration, + streamingConnector streaming.Connector, + meterRepository meter.Repository, + eventPublisher eventbus.Publisher, +) *registry.Entitlement { + // TODO: remove this check after enabled by default + if db == nil { + return nil + } + + return registrybuilder.GetEntitlementRegistry(registrybuilder.EntitlementOptions{ + DatabaseClient: db, + StreamingConnector: streamingConnector, + MeterRepository: meterRepository, + Logger: logger, + Publisher: eventPublisher, + }) +} diff --git a/app/common/kafka.go b/app/common/kafka.go index c6a842f3f..227258f48 100644 --- a/app/common/kafka.go +++ b/app/common/kafka.go @@ -8,6 +8,10 @@ import ( "go.opentelemetry.io/otel/metric" "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest" + "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/topicresolver" + "github.com/openmeterio/openmeter/openmeter/namespace" + "github.com/openmeterio/openmeter/openmeter/streaming" pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics" ) @@ -83,3 +87,35 @@ func NewKafkaTopicProvisioner(conf pkgkafka.TopicProvisionerConfig) (pkgkafka.To return topicProvisioner, nil } + +func NewNamespaceHandlers( + kafkaHandler *kafkaingest.NamespaceHandler, + clickHouseHandler streaming.Connector, +) []namespace.Handler { + return []namespace.Handler{ + kafkaHandler, + clickHouseHandler, + } +} + +func NewKafkaNamespaceHandler( + topicResolver topicresolver.Resolver, + topicProvisioner pkgkafka.TopicProvisioner, + conf config.KafkaIngestConfiguration, +) (*kafkaingest.NamespaceHandler, error) { + return &kafkaingest.NamespaceHandler{ + TopicResolver: topicResolver, + TopicProvisioner: topicProvisioner, + Partitions: conf.Partitions, + DeletionEnabled: conf.NamespaceDeletionEnabled, + }, nil +} + +func NewNamespacedTopicResolver(config config.Configuration) (*topicresolver.NamespacedTopicResolver, error) { + topicResolver, err := topicresolver.NewNamespacedTopicResolver(config.Ingest.Kafka.EventsTopicTemplate) + if err != nil { + return nil, fmt.Errorf("failed to create topic name resolver: %w", err) + } + + return topicResolver, nil +} diff --git a/app/common/metadata.go b/app/common/metadata.go new file mode 100644 index 000000000..fd25429a3 --- /dev/null +++ b/app/common/metadata.go @@ -0,0 +1,24 @@ +package common + +import ( + "fmt" + + "github.com/openmeterio/openmeter/app/config" +) + +// Metadata provides information about the service to components that need it (eg. telemetry). +type Metadata struct { + ServiceName string + Version string + Environment string + OpenTelemetryName string +} + +func NewMetadata(conf config.Configuration, version string, serviceName string) Metadata { + return Metadata{ + ServiceName: fmt.Sprintf("openmeter-%s", serviceName), + Version: version, + Environment: conf.Environment, + OpenTelemetryName: fmt.Sprintf("openmeter.io/%s", serviceName), + } +} diff --git a/app/common/meter.go b/app/common/meter.go new file mode 100644 index 000000000..5f3dfc762 --- /dev/null +++ b/app/common/meter.go @@ -0,0 +1,13 @@ +package common + +import ( + "github.com/samber/lo" + + "github.com/openmeterio/openmeter/openmeter/meter" + "github.com/openmeterio/openmeter/pkg/models" + "github.com/openmeterio/openmeter/pkg/slicesx" +) + +func NewMeterRepository(meters []*models.Meter) *meter.InMemoryRepository { + return meter.NewInMemoryRepository(slicesx.Map(meters, lo.FromPtr[models.Meter])) +} diff --git a/app/common/namespace.go b/app/common/namespace.go new file mode 100644 index 000000000..c9648729f --- /dev/null +++ b/app/common/namespace.go @@ -0,0 +1,24 @@ +package common + +import ( + "fmt" + + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/namespace" +) + +func NewNamespaceManager( + handlers []namespace.Handler, + conf config.NamespaceConfiguration, +) (*namespace.Manager, error) { + manager, err := namespace.NewManager(namespace.ManagerConfig{ + Handlers: handlers, + DefaultNamespace: conf.Default, + DisableManagement: conf.DisableManagement, + }) + if err != nil { + return nil, fmt.Errorf("create namespace manager: %v", err) + } + + return manager, nil +} diff --git a/app/common/notification.go b/app/common/notification.go new file mode 100644 index 000000000..453f987fa --- /dev/null +++ b/app/common/notification.go @@ -0,0 +1,71 @@ +package common + +import ( + "fmt" + "log/slog" + + "github.com/google/wire" + + "github.com/openmeterio/openmeter/app/config" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/notification" + notificationrepository "github.com/openmeterio/openmeter/openmeter/notification/repository" + notificationservice "github.com/openmeterio/openmeter/openmeter/notification/service" + notificationwebhook "github.com/openmeterio/openmeter/openmeter/notification/webhook" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" +) + +var Notification = wire.NewSet( + wire.FieldsOf(new(config.Configuration), "Notification"), + + NewNotificationService, +) + +func NewNotificationService( + logger *slog.Logger, + db *entdb.Client, + notificationConfig config.NotificationConfiguration, + svixConfig config.SvixConfig, + featureConnector feature.FeatureConnector, +) (notification.Service, error) { + // TODO: remove this check after enabled by default + if db == nil { + return nil, nil + } + + if !notificationConfig.Enabled { + return nil, nil + } + + var notificationRepo notification.Repository + notificationRepo, err := notificationrepository.New(notificationrepository.Config{ + Client: db, + Logger: logger.WithGroup("notification.postgres"), + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize notification repository: %w", err) + } + + var notificationWebhook notificationwebhook.Handler + notificationWebhook, err = notificationwebhook.New(notificationwebhook.Config{ + SvixConfig: svixConfig, + RegistrationTimeout: notificationConfig.Webhook.EventTypeRegistrationTimeout, + SkipRegistrationOnError: notificationConfig.Webhook.SkipEventTypeRegistrationOnError, + Logger: logger.WithGroup("notification.webhook"), + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize notification webhook handler: %w", err) + } + + notificationService, err := notificationservice.New(notificationservice.Config{ + Repository: notificationRepo, + Webhook: notificationWebhook, + FeatureConnector: featureConnector, + Logger: logger.With(slog.String("subsystem", "notification")), + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize notification service: %w", err) + } + + return notificationService, nil +} diff --git a/app/common/openmeter.go b/app/common/openmeter.go deleted file mode 100644 index bf60d27ad..000000000 --- a/app/common/openmeter.go +++ /dev/null @@ -1,253 +0,0 @@ -package common - -import ( - "context" - "fmt" - "log/slog" - - "github.com/ClickHouse/clickhouse-go/v2" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/confluentinc/confluent-kafka-go/v2/kafka" - "github.com/samber/lo" - "go.opentelemetry.io/otel/metric" - - "github.com/openmeterio/openmeter/app/config" - "github.com/openmeterio/openmeter/openmeter/ingest" - "github.com/openmeterio/openmeter/openmeter/ingest/ingestadapter" - "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest" - "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/serializer" - "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/topicresolver" - "github.com/openmeterio/openmeter/openmeter/meter" - "github.com/openmeterio/openmeter/openmeter/namespace" - "github.com/openmeterio/openmeter/openmeter/sink/flushhandler" - "github.com/openmeterio/openmeter/openmeter/sink/flushhandler/ingestnotification" - "github.com/openmeterio/openmeter/openmeter/streaming" - "github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/materialized_view" - "github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/raw_events" - watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" - "github.com/openmeterio/openmeter/openmeter/watermill/driver/noop" - "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" - pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" - "github.com/openmeterio/openmeter/pkg/models" - "github.com/openmeterio/openmeter/pkg/slicesx" -) - -func NewMeterRepository(meters []*models.Meter) *meter.InMemoryRepository { - return meter.NewInMemoryRepository(slicesx.Map(meters, lo.FromPtr[models.Meter])) -} - -func NewStreamingConnector( - ctx context.Context, - conf config.AggregationConfiguration, - clickHouse clickhouse.Conn, - meterRepository meter.Repository, - logger *slog.Logger, -) (streaming.Connector, error) { - var ( - connector streaming.Connector - err error - ) - - switch conf.Engine { - case config.AggregationEngineClickHouseRaw: - connector, err = raw_events.NewConnector(ctx, raw_events.ConnectorConfig{ - ClickHouse: clickHouse, - Database: conf.ClickHouse.Database, - EventsTableName: conf.EventsTableName, - Logger: logger, - AsyncInsert: conf.AsyncInsert, - AsyncInsertWait: conf.AsyncInsertWait, - InsertQuerySettings: conf.InsertQuerySettings, - }) - if err != nil { - return nil, fmt.Errorf("init clickhouse raw engine: %w", err) - } - - case config.AggregationEngineClickHouseMV: - connector, err = materialized_view.NewConnector(ctx, materialized_view.ConnectorConfig{ - ClickHouse: clickHouse, - Database: conf.ClickHouse.Database, - EventsTableName: conf.EventsTableName, - Logger: logger, - AsyncInsert: conf.AsyncInsert, - AsyncInsertWait: conf.AsyncInsertWait, - InsertQuerySettings: conf.InsertQuerySettings, - - Meters: meterRepository, - PopulateMeter: conf.PopulateMeter, - CreateOrReplaceMeter: conf.CreateOrReplaceMeter, - QueryRawEvents: conf.QueryRawEvents, - }) - if err != nil { - return nil, fmt.Errorf("init clickhouse mv engine: %w", err) - } - default: - return nil, fmt.Errorf("invalid aggregation engine: %s", conf.Engine) - } - - return connector, nil -} - -func NewNamespacedTopicResolver(config config.Configuration) (*topicresolver.NamespacedTopicResolver, error) { - topicResolver, err := topicresolver.NewNamespacedTopicResolver(config.Ingest.Kafka.EventsTopicTemplate) - if err != nil { - return nil, fmt.Errorf("failed to create topic name resolver: %w", err) - } - - return topicResolver, nil -} - -func NewKafkaIngestCollector( - config config.KafkaIngestConfiguration, - producer *kafka.Producer, - topicResolver topicresolver.Resolver, - topicProvisioner pkgkafka.TopicProvisioner, -) (*kafkaingest.Collector, error) { - collector, err := kafkaingest.NewCollector( - producer, - serializer.NewJSONSerializer(), - topicResolver, - topicProvisioner, - config.Partitions, - ) - if err != nil { - return nil, fmt.Errorf("failed to initialize kafka ingest: %w", err) - } - - return collector, nil -} - -func NewIngestCollector( - conf config.Configuration, - kafkaCollector *kafkaingest.Collector, - logger *slog.Logger, - meter metric.Meter, -) (ingest.Collector, func(), error) { - collector, err := ingestadapter.WithMetrics(kafkaCollector, meter) - if err != nil { - return nil, nil, fmt.Errorf("init kafka ingest: %w", err) - } - - if conf.Dedupe.Enabled { - deduplicator, err := conf.Dedupe.NewDeduplicator() - if err != nil { - return nil, nil, fmt.Errorf("failed to initialize deduplicator: %w", err) - } - - return ingest.DeduplicatingCollector{ - Collector: collector, - Deduplicator: deduplicator, - }, func() { - collector.Close() - - logger.Info("closing deduplicator") - - err := deduplicator.Close() - if err != nil { - logger.Error("failed to close deduplicator", "error", err) - } - }, nil - } - - // Note: closing function is called by dedupe as well - return collector, func() { collector.Close() }, nil -} - -func NewKafkaNamespaceHandler( - topicResolver topicresolver.Resolver, - topicProvisioner pkgkafka.TopicProvisioner, - conf config.KafkaIngestConfiguration, -) (*kafkaingest.NamespaceHandler, error) { - return &kafkaingest.NamespaceHandler{ - TopicResolver: topicResolver, - TopicProvisioner: topicProvisioner, - Partitions: conf.Partitions, - DeletionEnabled: conf.NamespaceDeletionEnabled, - }, nil -} - -func NewNamespaceHandlers( - kafkaHandler *kafkaingest.NamespaceHandler, - clickHouseHandler streaming.Connector, -) []namespace.Handler { - return []namespace.Handler{ - kafkaHandler, - clickHouseHandler, - } -} - -func NewNamespaceManager( - handlers []namespace.Handler, - conf config.NamespaceConfiguration, -) (*namespace.Manager, error) { - manager, err := namespace.NewManager(namespace.ManagerConfig{ - Handlers: handlers, - DefaultNamespace: conf.Default, - DisableManagement: conf.DisableManagement, - }) - if err != nil { - return nil, fmt.Errorf("create namespace manager: %v", err) - } - - return manager, nil -} - -// TODO: create a separate file or package for each application instead - -func NewServerPublisher( - ctx context.Context, - conf config.EventsConfiguration, - options watermillkafka.PublisherOptions, - logger *slog.Logger, -) (message.Publisher, func(), error) { - if !conf.Enabled { - return &noop.Publisher{}, func() {}, nil - } - - return NewPublisher(ctx, options, logger) -} - -// the sink-worker requires control over how the publisher is closed -func NewSinkWorkerPublisher( - ctx context.Context, - options watermillkafka.PublisherOptions, - logger *slog.Logger, -) (message.Publisher, func(), error) { - publisher, _, err := NewPublisher(ctx, options, logger) - - return publisher, func() {}, err -} - -func NewFlushHandler( - eventsConfig config.EventsConfiguration, - sinkConfig config.SinkConfiguration, - messagePublisher message.Publisher, - eventPublisher eventbus.Publisher, - logger *slog.Logger, - meter metric.Meter, -) (flushhandler.FlushEventHandler, error) { - if !eventsConfig.Enabled { - return nil, nil - } - - flushHandlerMux := flushhandler.NewFlushEventHandlers() - - // We should only close the producer once the ingest events are fully processed - flushHandlerMux.OnDrainComplete(func() { - logger.Info("shutting down kafka producer") - if err := messagePublisher.Close(); err != nil { - logger.Error("failed to close kafka producer", slog.String("error", err.Error())) - } - }) - - ingestNotificationHandler, err := ingestnotification.NewHandler(logger, meter, eventPublisher, ingestnotification.HandlerConfig{ - MaxEventsInBatch: sinkConfig.IngestNotifications.MaxEventsInBatch, - }) - if err != nil { - return nil, err - } - - flushHandlerMux.AddHandler(ingestNotificationHandler) - - return flushHandlerMux, nil -} diff --git a/app/common/openmeter_billingworker.go b/app/common/openmeter_billingworker.go new file mode 100644 index 000000000..68ce471db --- /dev/null +++ b/app/common/openmeter_billingworker.go @@ -0,0 +1,114 @@ +package common + +import ( + "context" + "fmt" + "log/slog" + "syscall" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/google/wire" + "github.com/oklog/run" + + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/billing" + billingworker "github.com/openmeterio/openmeter/openmeter/billing/worker" + watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" + "github.com/openmeterio/openmeter/openmeter/watermill/router" + pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" +) + +var BillingWorker = wire.NewSet( + wire.FieldsOf(new(config.BillingWorkerConfiguration), "ConsumerConfiguration"), + wire.FieldsOf(new(config.BillingConfiguration), "Worker"), + + App, + Customer, + Secret, + + BillingWorkerProvisionTopics, + BillingWorkerSubscriber, + + NewFeatureConnector, + BillingService, + + NewBillingWorkerOptions, + NewBillingWorker, + BillingWorkerGroup, +) + +func BillingWorkerProvisionTopics(conf config.BillingConfiguration) []pkgkafka.TopicConfig { + var provisionTopics []pkgkafka.TopicConfig + + if conf.Worker.DLQ.AutoProvision.Enabled { + provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{ + Name: conf.Worker.DLQ.Topic, + Partitions: conf.Worker.DLQ.AutoProvision.Partitions, + RetentionTime: pkgkafka.TimeDurationMilliSeconds(conf.Worker.DLQ.AutoProvision.Retention), + }) + } + + return provisionTopics +} + +// no closer function: the subscriber is closed by the router/worker +func BillingWorkerSubscriber(conf config.BillingConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error) { + subscriber, err := watermillkafka.NewSubscriber(watermillkafka.SubscriberOptions{ + Broker: brokerOptions, + ConsumerGroupName: conf.Worker.ConsumerGroupName, + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize Kafka subscriber: %w", err) + } + + return subscriber, nil +} + +func NewBillingWorkerOptions( + eventConfig config.EventsConfiguration, + routerOptions router.Options, + eventBus eventbus.Publisher, + billingService billing.Service, + logger *slog.Logger, +) billingworker.WorkerOptions { + return billingworker.WorkerOptions{ + SystemEventsTopic: eventConfig.SystemEvents.Topic, + + Router: routerOptions, + EventBus: eventBus, + BillingService: billingService, + Logger: logger, + } +} + +func NewBillingWorker(workerOptions billingworker.WorkerOptions) (*billingworker.Worker, error) { + worker, err := billingworker.New(workerOptions) + if err != nil { + return nil, fmt.Errorf("failed to initialize worker: %w", err) + } + + return worker, nil +} + +func BillingWorkerGroup( + ctx context.Context, + worker *billingworker.Worker, + telemetryServer TelemetryServer, +) run.Group { + var group run.Group + + group.Add( + func() error { return telemetryServer.ListenAndServe() }, + func(err error) { _ = telemetryServer.Shutdown(ctx) }, + ) + + group.Add( + func() error { return worker.Run(ctx) }, + func(err error) { _ = worker.Close() }, + ) + + group.Add(run.SignalHandler(ctx, syscall.SIGINT, syscall.SIGTERM)) + + return group +} diff --git a/app/common/openmeter_notification.go b/app/common/openmeter_notification.go new file mode 100644 index 000000000..09026bf3c --- /dev/null +++ b/app/common/openmeter_notification.go @@ -0,0 +1,20 @@ +package common + +import ( + "github.com/openmeterio/openmeter/app/config" + pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" +) + +func NotificationServiceProvisionTopics(conf config.NotificationConfiguration) []pkgkafka.TopicConfig { + var provisionTopics []pkgkafka.TopicConfig + + if conf.Consumer.DLQ.AutoProvision.Enabled { + provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{ + Name: conf.Consumer.DLQ.Topic, + Partitions: conf.Consumer.DLQ.AutoProvision.Partitions, + RetentionTime: pkgkafka.TimeDurationMilliSeconds(conf.Consumer.DLQ.AutoProvision.Retention), + }) + } + + return provisionTopics +} diff --git a/app/common/openmeter_provisiontopics.go b/app/common/openmeter_provisiontopics.go deleted file mode 100644 index 746fb6960..000000000 --- a/app/common/openmeter_provisiontopics.go +++ /dev/null @@ -1,44 +0,0 @@ -package common - -import ( - "github.com/openmeterio/openmeter/app/config" - pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" -) - -// TODO: create a separate file or package for each application instead - -func NotificationServiceProvisionTopics(conf config.NotificationConfiguration) []pkgkafka.TopicConfig { - var provisionTopics []pkgkafka.TopicConfig - - if conf.Consumer.DLQ.AutoProvision.Enabled { - provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{ - Name: conf.Consumer.DLQ.Topic, - Partitions: conf.Consumer.DLQ.AutoProvision.Partitions, - RetentionTime: pkgkafka.TimeDurationMilliSeconds(conf.Consumer.DLQ.AutoProvision.Retention), - }) - } - - return provisionTopics -} - -func ServerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig { - var provisionTopics []pkgkafka.TopicConfig - - if conf.SystemEvents.AutoProvision.Enabled { - provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{ - Name: conf.SystemEvents.Topic, - Partitions: conf.SystemEvents.AutoProvision.Partitions, - }) - } - - return provisionTopics -} - -func SinkWorkerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig { - return []pkgkafka.TopicConfig{ - { - Name: conf.IngestEvents.Topic, - Partitions: conf.IngestEvents.AutoProvision.Partitions, - }, - } -} diff --git a/app/common/openmeter_server.go b/app/common/openmeter_server.go new file mode 100644 index 000000000..5f1e3a31f --- /dev/null +++ b/app/common/openmeter_server.go @@ -0,0 +1,105 @@ +package common + +import ( + "context" + "fmt" + "log/slog" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "go.opentelemetry.io/otel/metric" + + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/ingest" + "github.com/openmeterio/openmeter/openmeter/ingest/ingestadapter" + "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest" + "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/serializer" + "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/topicresolver" + watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" + "github.com/openmeterio/openmeter/openmeter/watermill/driver/noop" + pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" +) + +func NewKafkaIngestCollector( + config config.KafkaIngestConfiguration, + producer *kafka.Producer, + topicResolver topicresolver.Resolver, + topicProvisioner pkgkafka.TopicProvisioner, +) (*kafkaingest.Collector, error) { + collector, err := kafkaingest.NewCollector( + producer, + serializer.NewJSONSerializer(), + topicResolver, + topicProvisioner, + config.Partitions, + ) + if err != nil { + return nil, fmt.Errorf("failed to initialize kafka ingest: %w", err) + } + + return collector, nil +} + +func NewIngestCollector( + conf config.Configuration, + kafkaCollector *kafkaingest.Collector, + logger *slog.Logger, + meter metric.Meter, +) (ingest.Collector, func(), error) { + collector, err := ingestadapter.WithMetrics(kafkaCollector, meter) + if err != nil { + return nil, nil, fmt.Errorf("init kafka ingest: %w", err) + } + + if conf.Dedupe.Enabled { + deduplicator, err := conf.Dedupe.NewDeduplicator() + if err != nil { + return nil, nil, fmt.Errorf("failed to initialize deduplicator: %w", err) + } + + return ingest.DeduplicatingCollector{ + Collector: collector, + Deduplicator: deduplicator, + }, func() { + collector.Close() + + logger.Info("closing deduplicator") + + err := deduplicator.Close() + if err != nil { + logger.Error("failed to close deduplicator", "error", err) + } + }, nil + } + + // Note: closing function is called by dedupe as well + return collector, func() { collector.Close() }, nil +} + +// TODO: create a separate file or package for each application instead + +func NewServerPublisher( + ctx context.Context, + conf config.EventsConfiguration, + options watermillkafka.PublisherOptions, + logger *slog.Logger, +) (message.Publisher, func(), error) { + if !conf.Enabled { + return &noop.Publisher{}, func() {}, nil + } + + return NewPublisher(ctx, options, logger) +} + +func ServerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig { + var provisionTopics []pkgkafka.TopicConfig + + if conf.SystemEvents.AutoProvision.Enabled { + provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{ + Name: conf.SystemEvents.Topic, + Partitions: conf.SystemEvents.AutoProvision.Partitions, + }) + } + + return provisionTopics +} diff --git a/app/common/openmeter_sinkworker.go b/app/common/openmeter_sinkworker.go new file mode 100644 index 000000000..b09a5ed15 --- /dev/null +++ b/app/common/openmeter_sinkworker.go @@ -0,0 +1,70 @@ +package common + +import ( + "context" + "log/slog" + + "github.com/ThreeDotsLabs/watermill/message" + "go.opentelemetry.io/otel/metric" + + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/sink/flushhandler" + "github.com/openmeterio/openmeter/openmeter/sink/flushhandler/ingestnotification" + watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" + pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" +) + +// the sink-worker requires control over how the publisher is closed +func NewSinkWorkerPublisher( + ctx context.Context, + options watermillkafka.PublisherOptions, + logger *slog.Logger, +) (message.Publisher, func(), error) { + publisher, _, err := NewPublisher(ctx, options, logger) + + return publisher, func() {}, err +} + +func NewFlushHandler( + eventsConfig config.EventsConfiguration, + sinkConfig config.SinkConfiguration, + messagePublisher message.Publisher, + eventPublisher eventbus.Publisher, + logger *slog.Logger, + meter metric.Meter, +) (flushhandler.FlushEventHandler, error) { + if !eventsConfig.Enabled { + return nil, nil + } + + flushHandlerMux := flushhandler.NewFlushEventHandlers() + + // We should only close the producer once the ingest events are fully processed + flushHandlerMux.OnDrainComplete(func() { + logger.Info("shutting down kafka producer") + if err := messagePublisher.Close(); err != nil { + logger.Error("failed to close kafka producer", slog.String("error", err.Error())) + } + }) + + ingestNotificationHandler, err := ingestnotification.NewHandler(logger, meter, eventPublisher, ingestnotification.HandlerConfig{ + MaxEventsInBatch: sinkConfig.IngestNotifications.MaxEventsInBatch, + }) + if err != nil { + return nil, err + } + + flushHandlerMux.AddHandler(ingestNotificationHandler) + + return flushHandlerMux, nil +} + +func SinkWorkerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig { + return []pkgkafka.TopicConfig{ + { + Name: conf.IngestEvents.Topic, + Partitions: conf.IngestEvents.AutoProvision.Partitions, + }, + } +} diff --git a/app/common/productcatalog.go b/app/common/productcatalog.go new file mode 100644 index 000000000..e7466f6eb --- /dev/null +++ b/app/common/productcatalog.go @@ -0,0 +1,72 @@ +package common + +import ( + "fmt" + "log/slog" + + "github.com/google/wire" + + "github.com/openmeterio/openmeter/app/config" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/meter" + productcatalogpgadapter "github.com/openmeterio/openmeter/openmeter/productcatalog/adapter" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/openmeter/productcatalog/plan" + planadapter "github.com/openmeterio/openmeter/openmeter/productcatalog/plan/adapter" + planservice "github.com/openmeterio/openmeter/openmeter/productcatalog/plan/service" +) + +var ProductCatalog = wire.NewSet( + wire.FieldsOf(new(config.Configuration), "ProductCatalog"), + + Feature, + Plan, +) + +var Feature = wire.NewSet( + NewFeatureConnector, +) + +var Plan = wire.NewSet( + NewPlanService, +) + +func NewFeatureConnector(logger *slog.Logger, db *entdb.Client, meterRepo meter.Repository) feature.FeatureConnector { + // TODO: remove this check after enabled by default + if db == nil { + return nil + } + + featureRepo := productcatalogpgadapter.NewPostgresFeatureRepo(db, logger) + return feature.NewFeatureConnector(featureRepo, meterRepo) +} + +func NewPlanService( + logger *slog.Logger, + db *entdb.Client, + productCatalogConf config.ProductCatalogConfiguration, + featureConnector feature.FeatureConnector, +) (plan.Service, error) { + // TODO: remove this check after enabled by default + if db == nil { + return nil, nil + } + + if !productCatalogConf.Enabled { + return nil, nil + } + + adapter, err := planadapter.New(planadapter.Config{ + Client: db, + Logger: logger.With("subsystem", "productcatalog.plan"), + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize plan adapter: %w", err) + } + + return planservice.New(planservice.Config{ + Feature: featureConnector, + Adapter: adapter, + Logger: logger.With("subsystem", "productcatalog.plan"), + }) +} diff --git a/app/common/runner.go b/app/common/runner.go new file mode 100644 index 000000000..7735320bb --- /dev/null +++ b/app/common/runner.go @@ -0,0 +1,24 @@ +package common + +import ( + "errors" + "log/slog" + "net/http" + + "github.com/oklog/run" +) + +// Runner is a helper struct that runs a group of services. +type Runner struct { + Group run.Group + Logger *slog.Logger +} + +func (r Runner) Run() { + err := r.Group.Run() + if e := (run.SignalError{}); errors.As(err, &e) { + r.Logger.Info("received signal: shutting down", slog.String("signal", e.Signal.String())) + } else if !errors.Is(err, http.ErrServerClosed) { + r.Logger.Error("application stopped due to error", slog.String("error", err.Error())) + } +} diff --git a/app/common/secret.go b/app/common/secret.go new file mode 100644 index 000000000..dcc989056 --- /dev/null +++ b/app/common/secret.go @@ -0,0 +1,24 @@ +package common + +import ( + "log/slog" + + "github.com/google/wire" + + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/secret" + secretadapter "github.com/openmeterio/openmeter/openmeter/secret/adapter" + secretservice "github.com/openmeterio/openmeter/openmeter/secret/service" +) + +var Secret = wire.NewSet( + NewSecretService, +) + +func NewSecretService(logger *slog.Logger, db *entdb.Client) (secret.Service, error) { + secretAdapter := secretadapter.New() + + return secretservice.New(secretservice.Config{ + Adapter: secretAdapter, + }) +} diff --git a/app/common/streaming.go b/app/common/streaming.go new file mode 100644 index 000000000..f839adb3a --- /dev/null +++ b/app/common/streaming.go @@ -0,0 +1,67 @@ +package common + +import ( + "context" + "fmt" + "log/slog" + + "github.com/ClickHouse/clickhouse-go/v2" + + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/meter" + "github.com/openmeterio/openmeter/openmeter/streaming" + "github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/materialized_view" + "github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/raw_events" +) + +func NewStreamingConnector( + ctx context.Context, + conf config.AggregationConfiguration, + clickHouse clickhouse.Conn, + meterRepository meter.Repository, + logger *slog.Logger, +) (streaming.Connector, error) { + var ( + connector streaming.Connector + err error + ) + + switch conf.Engine { + case config.AggregationEngineClickHouseRaw: + connector, err = raw_events.NewConnector(ctx, raw_events.ConnectorConfig{ + ClickHouse: clickHouse, + Database: conf.ClickHouse.Database, + EventsTableName: conf.EventsTableName, + Logger: logger, + AsyncInsert: conf.AsyncInsert, + AsyncInsertWait: conf.AsyncInsertWait, + InsertQuerySettings: conf.InsertQuerySettings, + }) + if err != nil { + return nil, fmt.Errorf("init clickhouse raw engine: %w", err) + } + + case config.AggregationEngineClickHouseMV: + connector, err = materialized_view.NewConnector(ctx, materialized_view.ConnectorConfig{ + ClickHouse: clickHouse, + Database: conf.ClickHouse.Database, + EventsTableName: conf.EventsTableName, + Logger: logger, + AsyncInsert: conf.AsyncInsert, + AsyncInsertWait: conf.AsyncInsertWait, + InsertQuerySettings: conf.InsertQuerySettings, + + Meters: meterRepository, + PopulateMeter: conf.PopulateMeter, + CreateOrReplaceMeter: conf.CreateOrReplaceMeter, + QueryRawEvents: conf.QueryRawEvents, + }) + if err != nil { + return nil, fmt.Errorf("init clickhouse mv engine: %w", err) + } + default: + return nil, fmt.Errorf("invalid aggregation engine: %s", conf.Engine) + } + + return connector, nil +} diff --git a/app/common/subscription.go b/app/common/subscription.go new file mode 100644 index 000000000..a3dcb6e69 --- /dev/null +++ b/app/common/subscription.go @@ -0,0 +1,93 @@ +package common + +import ( + "log/slog" + + "github.com/google/wire" + + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/customer" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/openmeter/productcatalog/plan" + plansubscription "github.com/openmeterio/openmeter/openmeter/productcatalog/subscription" + "github.com/openmeterio/openmeter/openmeter/registry" + "github.com/openmeterio/openmeter/openmeter/subscription" + subscriptionentitlement "github.com/openmeterio/openmeter/openmeter/subscription/adapters/entitlement" + subscriptionrepo "github.com/openmeterio/openmeter/openmeter/subscription/repo" + subscriptionservice "github.com/openmeterio/openmeter/openmeter/subscription/service" +) + +var Subscription = wire.NewSet( + NewSubscriptionService, + NewPlanSubscriptionAdapter, +) + +// Combine Srvice and WorkflowService into one struct +// We do this to able to initialize the Service and WorkflowService together +// and share the same subscriptionRepo. +type SubscriptionServiceWithWorkflow struct { + Service subscription.Service + WorkflowService subscription.WorkflowService +} + +func NewSubscriptionService( + logger *slog.Logger, + db *entdb.Client, + productcatalogConfig config.ProductCatalogConfiguration, + entitlementConfig config.EntitlementsConfiguration, + featureConnector feature.FeatureConnector, + entitlementRegistry *registry.Entitlement, + customerService customer.Service, + planService plan.Service, +) (SubscriptionServiceWithWorkflow, error) { + // TODO: remove this check after enabled by default + if db == nil { + return SubscriptionServiceWithWorkflow{}, nil + } + + if !productcatalogConfig.Enabled || !entitlementConfig.Enabled { + return SubscriptionServiceWithWorkflow{}, nil + } + + subscriptionRepo := subscriptionrepo.NewSubscriptionRepo(db) + subscriptionPhaseRepo := subscriptionrepo.NewSubscriptionPhaseRepo(db) + subscriptionItemRepo := subscriptionrepo.NewSubscriptionItemRepo(db) + + subscriptionEntitlementAdapter := subscriptionentitlement.NewSubscriptionEntitlementAdapter( + entitlementRegistry.Entitlement, + subscriptionItemRepo, + subscriptionItemRepo, + ) + + subscriptionService := subscriptionservice.New(subscriptionservice.ServiceConfig{ + SubscriptionRepo: subscriptionRepo, + SubscriptionPhaseRepo: subscriptionPhaseRepo, + SubscriptionItemRepo: subscriptionItemRepo, + CustomerService: customerService, + EntitlementAdapter: subscriptionEntitlementAdapter, + TransactionManager: subscriptionRepo, + }) + + subscriptionWorkflowService := subscriptionservice.NewWorkflowService(subscriptionservice.WorkflowServiceConfig{ + Service: subscriptionService, + CustomerService: customerService, + TransactionManager: subscriptionRepo, + }) + + return SubscriptionServiceWithWorkflow{ + Service: subscriptionService, + WorkflowService: subscriptionWorkflowService, + }, nil +} + +func NewPlanSubscriptionAdapter( + logger *slog.Logger, + db *entdb.Client, + planService plan.Service, +) plansubscription.Adapter { + return plansubscription.NewPlanSubscriptionAdapter(plansubscription.PlanSubscriptionAdapterConfig{ + PlanService: planService, + Logger: logger.With("subsystem", "subscription.plan.adapter"), + }) +} diff --git a/app/common/svix.go b/app/common/svix.go new file mode 100644 index 000000000..6d2b7dd42 --- /dev/null +++ b/app/common/svix.go @@ -0,0 +1,11 @@ +package common + +import ( + "github.com/google/wire" + + "github.com/openmeterio/openmeter/app/config" +) + +var Svix = wire.NewSet( + wire.FieldsOf(new(config.Configuration), "Svix"), +) diff --git a/app/common/wire.go b/app/common/wire.go index 275322793..8d2ca92fc 100644 --- a/app/common/wire.go +++ b/app/common/wire.go @@ -39,7 +39,7 @@ var Config = wire.NewSet( wire.FieldsOf(new(config.Configuration), "Namespace"), wire.FieldsOf(new(config.Configuration), "Events"), wire.FieldsOf(new(config.Configuration), "BalanceWorker"), - wire.FieldsOf(new(config.Configuration), "Notification"), + wire.FieldsOf(new(config.Configuration), "Billing"), wire.FieldsOf(new(config.Configuration), "Sink"), ) diff --git a/app/config/apps.go b/app/config/apps.go index fd9db6f75..eb5581363 100644 --- a/app/config/apps.go +++ b/app/config/apps.go @@ -1,15 +1,32 @@ package config -import "github.com/spf13/viper" +import ( + "errors" + + "github.com/spf13/viper" +) type AppsConfiguration struct { Enabled bool + // BaseURL is the base URL for the Stripe webhook. + BaseURL string `yaml:"baseURL"` } func (c AppsConfiguration) Validate() error { - return nil + var errs []error + + if !c.Enabled { + return nil + } + + if c.BaseURL == "" { + errs = append(errs, errors.New("base URL is required")) + } + + return errors.Join(errs...) } func ConfigureApps(v *viper.Viper) { v.SetDefault("apps.enabled", false) + v.SetDefault("apps.baseURL", "https://example.com") } diff --git a/app/config/billing.go b/app/config/billing.go index 3ff38cb8e..2e3ebcd0d 100644 --- a/app/config/billing.go +++ b/app/config/billing.go @@ -4,6 +4,7 @@ import "github.com/spf13/viper" type BillingConfiguration struct { Enabled bool + Worker BillingWorkerConfiguration } func (c BillingConfiguration) Validate() error { @@ -12,4 +13,6 @@ func (c BillingConfiguration) Validate() error { func ConfigureBilling(v *viper.Viper) { v.SetDefault("billing.enabled", false) + + ConfigureBillingWorker(v) } diff --git a/app/config/billingworker.go b/app/config/billingworker.go new file mode 100644 index 000000000..5e74c8a98 --- /dev/null +++ b/app/config/billingworker.go @@ -0,0 +1,30 @@ +package config + +import ( + "errors" + + "github.com/spf13/viper" + + "github.com/openmeterio/openmeter/pkg/errorsx" +) + +type BillingWorkerConfiguration struct { + ConsumerConfiguration `mapstructure:",squash"` +} + +func (c BillingWorkerConfiguration) Validate() error { + var errs []error + + if err := c.ConsumerConfiguration.Validate(); err != nil { + errs = append(errs, errorsx.WithPrefix(err, "consumer")) + } + + return errors.Join(errs...) +} + +func ConfigureBillingWorker(v *viper.Viper) { + v.SetDefault("billing.worker.dlq.topic", "om_sys.billing_worker_dlq") + v.SetDefault("billing.worker.consumerGroupName", "om_billing_worker") + + ConfigureConsumer(v, "billing.worker") +} diff --git a/app/config/config.go b/app/config/config.go index 1861b10c3..ad47d41d4 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -35,7 +35,6 @@ type Configuration struct { ProductCatalog ProductCatalogConfiguration Billing BillingConfiguration Apps AppsConfiguration - StripeApp StripeAppConfig Svix SvixConfig } @@ -111,10 +110,6 @@ func (c Configuration) Validate() error { } } - if err := c.StripeApp.Validate(); err != nil { - errs = append(errs, errorsx.WithPrefix(err, "stripe app")) - } - if err := c.ProductCatalog.Validate(); err != nil { errs = append(errs, errorsx.WithPrefix(err, "product catalog")) } @@ -166,7 +161,6 @@ func SetViperDefaults(v *viper.Viper, flags *pflag.FlagSet) { ConfigureEvents(v) ConfigureBalanceWorker(v) ConfigureNotification(v) - ConfigureStripe(v) ConfigureBilling(v) ConfigureProductCatalog(v) ConfigureApps(v) diff --git a/app/config/config_test.go b/app/config/config_test.go index 4909c5a42..5cc0f91da 100644 --- a/app/config/config_test.go +++ b/app/config/config_test.go @@ -43,7 +43,10 @@ func TestComplete(t *testing.T) { Postgres: PostgresConfig{ AutoMigrate: AutoMigrateEnt, }, - Address: "127.0.0.1:8888", + Address: "127.0.0.1:8888", + Apps: AppsConfiguration{ + BaseURL: "https://example.com", + }, Environment: "local", Telemetry: TelemetryConfig{ Address: "127.0.0.1:10000", @@ -129,6 +132,29 @@ func TestComplete(t *testing.T) { AsyncInsert: false, AsyncInsertWait: false, }, + Billing: BillingConfiguration{ + Enabled: false, + Worker: BillingWorkerConfiguration{ + ConsumerConfiguration: ConsumerConfiguration{ + ProcessingTimeout: 30 * time.Second, + Retry: RetryConfiguration{ + InitialInterval: 10 * time.Millisecond, + MaxInterval: time.Second, + MaxElapsedTime: time.Minute, + }, + DLQ: DLQConfiguration{ + Enabled: true, + Topic: "om_sys.billing_worker_dlq", + AutoProvision: DLQAutoProvisionConfiguration{ + Enabled: true, + Partitions: 1, + Retention: 90 * 24 * time.Hour, + }, + }, + ConsumerGroupName: "om_billing_worker", + }, + }, + }, Sink: SinkConfiguration{ GroupId: "openmeter-sink-worker", MinCommitCount: 500, @@ -307,11 +333,6 @@ func TestComplete(t *testing.T) { ServerURL: "http://127.0.0.1:8071", Debug: true, }, - StripeApp: StripeAppConfig{ - IncomingWebhook: StripeAppIncomingWebhookConfig{ - BaseURL: "https://example.com", - }, - }, } assert.Equal(t, expected, actual) diff --git a/app/config/stripe.go b/app/config/stripe.go deleted file mode 100644 index cc12efca6..000000000 --- a/app/config/stripe.go +++ /dev/null @@ -1,45 +0,0 @@ -package config - -import ( - "errors" - "fmt" - - "github.com/spf13/viper" -) - -// StripeAppConfig is the configuration for Stripe. -type StripeAppConfig struct { - IncomingWebhook StripeAppIncomingWebhookConfig `yaml:"incomingWebhook"` -} - -// Validate validates the configuration. -func (c StripeAppConfig) Validate() error { - var errs []error - - if err := c.IncomingWebhook.Validate(); err != nil { - errs = append(errs, fmt.Errorf("incoming webhook: %w", err)) - } - - return errors.Join(errs...) -} - -// StripeAppIncomingWebhookConfig is the configuration for the Stripe webhook. -type StripeAppIncomingWebhookConfig struct { - // BaseURL is the base URL for the Stripe webhook. - BaseURL string `yaml:"baseURL"` -} - -func (c StripeAppIncomingWebhookConfig) Validate() error { - var errs []error - - if c.BaseURL == "" { - errs = append(errs, errors.New("base URL is required")) - } - - return errors.Join(errs...) -} - -// ConfigureStripe configures the default values for Stripe. -func ConfigureStripe(v *viper.Viper) { - v.SetDefault("stripeApp.incomingWebhook.baseURL", "https://example.com") -} diff --git a/cmd/balance-worker/main.go b/cmd/balance-worker/main.go index 6ac4dc6c9..d9787bf8c 100644 --- a/cmd/balance-worker/main.go +++ b/cmd/balance-worker/main.go @@ -61,7 +61,10 @@ func main() { if err != nil { slog.Error("failed to initialize application", "error", err) - cleanup() + // Call cleanup function is may not set yet + if cleanup != nil { + cleanup() + } os.Exit(1) } diff --git a/cmd/billing-worker/.air.toml b/cmd/billing-worker/.air.toml new file mode 100644 index 000000000..6f2097c5a --- /dev/null +++ b/cmd/billing-worker/.air.toml @@ -0,0 +1,44 @@ +root = "." +testdata_dir = "testdata" +tmp_dir = "tmp" + +[build] + args_bin = ["--config", "./config.yaml", "--telemetry-address", ":10002"] + bin = "./tmp/openmeter-billing-worker" + cmd = "go build -tags dynamic -o ./tmp/openmeter-billing-worker ./cmd/billing-worker" + delay = 0 + exclude_dir = ["assets", "ci", "deploy", "docs", "examples", "testdata", "quickstart", "tmp", "vendor", "api/client", "node_modules"] + exclude_file = [] + exclude_regex = ["_test.go"] + exclude_unchanged = false + follow_symlink = false + full_bin = "" + include_dir = [] + include_ext = ["go", "tpl", "tmpl", "html", "yml", "yaml", "sql", "json"] + include_file = [] + kill_delay = "0s" + log = "build-errors.log" + poll = false + poll_interval = 0 + rerun = false + rerun_delay = 500 + send_interrupt = false + stop_on_error = false + +[color] + app = "" + build = "yellow" + main = "magenta" + runner = "green" + watcher = "cyan" + +[log] + main_only = false + time = false + +[misc] + clean_on_exit = false + +[screen] + clear_on_rebuild = false + keep_scroll = true diff --git a/cmd/billing-worker/main.go b/cmd/billing-worker/main.go new file mode 100644 index 000000000..554768e4c --- /dev/null +++ b/cmd/billing-worker/main.go @@ -0,0 +1,105 @@ +package main + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + + "github.com/spf13/pflag" + "github.com/spf13/viper" + + "github.com/openmeterio/openmeter/app/config" +) + +func main() { + v, flags := viper.NewWithOptions(viper.WithDecodeHook(config.DecodeHook())), pflag.NewFlagSet("OpenMeter", pflag.ExitOnError) + ctx := context.Background() + + config.SetViperDefaults(v, flags) + + flags.String("config", "", "Configuration file") + flags.Bool("version", false, "Show version information") + flags.Bool("validate", false, "Validate configuration and exit") + + _ = flags.Parse(os.Args[1:]) + + if v, _ := flags.GetBool("version"); v { + fmt.Printf("%s version %s (%s) built on %s\n", "Open Meter", version, revision, revisionDate) + + os.Exit(0) + } + + if c, _ := flags.GetString("config"); c != "" { + v.SetConfigFile(c) + } + + err := v.ReadInConfig() + if err != nil && !errors.As(err, &viper.ConfigFileNotFoundError{}) { + panic(err) + } + + var conf config.Configuration + err = v.Unmarshal(&conf) + if err != nil { + panic(err) + } + + err = conf.Validate() + if err != nil { + println("configuration error:") + println(err.Error()) + os.Exit(1) + } + + if v, _ := flags.GetBool("validate"); v { + os.Exit(0) + } + + app, cleanup, err := initializeApplication(ctx, conf) + if err != nil { + slog.Error("failed to initialize application", "error", err) + + // Call cleanup function is may not set yet + if cleanup != nil { + cleanup() + } + + os.Exit(1) + } + defer cleanup() + + app.SetGlobals() + + logger := app.Logger + + // Validate service prerequisites + + if !conf.Billing.Enabled { + logger.Error("billing are disabled, exiting") + os.Exit(1) + } + + if !conf.Events.Enabled { + logger.Error("events are disabled, exiting") + os.Exit(1) + } + + // Migrate database + if err := app.Migrate(ctx); err != nil { + logger.Error("failed to initialize database", "error", err) + os.Exit(1) + } + + // Provision sandbox app + if conf.Apps.Enabled { + err = app.AppSandboxProvisioner() + if err != nil { + logger.Error("failed to provision sandbox app", "error", err) + os.Exit(1) + } + } + + app.Run() +} diff --git a/cmd/billing-worker/version.go b/cmd/billing-worker/version.go new file mode 100644 index 000000000..24582ea13 --- /dev/null +++ b/cmd/billing-worker/version.go @@ -0,0 +1,34 @@ +package main + +import "runtime/debug" + +// Provisioned by ldflags. +var version string + +//nolint:gochecknoglobals +var ( + revision string + revisionDate string +) + +//nolint:gochecknoinits,goconst +func init() { + if version == "" { + version = "unknown" + } + + buildInfo, _ := debug.ReadBuildInfo() + + revision = "unknown" + revisionDate = "unknown" + + for _, setting := range buildInfo.Settings { + if setting.Key == "vcs.revision" { + revision = setting.Value + } + + if setting.Key == "vcs.time" { + revisionDate = setting.Value + } + } +} diff --git a/cmd/billing-worker/wire.go b/cmd/billing-worker/wire.go new file mode 100644 index 000000000..8aa799461 --- /dev/null +++ b/cmd/billing-worker/wire.go @@ -0,0 +1,50 @@ +//go:build wireinject +// +build wireinject + +package main + +import ( + "context" + "log/slog" + + "github.com/google/wire" + + "github.com/openmeterio/openmeter/app/common" + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/app" + appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe" +) + +type Application struct { + common.GlobalInitializer + common.Migrator + common.Runner + + App app.Service + AppStripe appstripe.Service + AppSandboxProvisioner common.AppSandboxProvisioner + Logger *slog.Logger +} + +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { + wire.Build( + metadata, + common.Config, + common.Framework, + common.Telemetry, + common.NewDefaultTextMapPropagator, + common.Database, + common.ClickHouse, + common.KafkaTopic, + common.Watermill, + common.WatermillRouter, + common.OpenMeter, + common.BillingWorker, + wire.Struct(new(Application), "*"), + ) + return Application{}, nil, nil +} + +func metadata(conf config.Configuration) common.Metadata { + return common.NewMetadata(conf, version, "billing-worker") +} diff --git a/cmd/billing-worker/wire_gen.go b/cmd/billing-worker/wire_gen.go new file mode 100644 index 000000000..c172f5454 --- /dev/null +++ b/cmd/billing-worker/wire_gen.go @@ -0,0 +1,314 @@ +// Code generated by Wire. DO NOT EDIT. + +//go:generate go run -mod=mod github.com/google/wire/cmd/wire +//go:build !wireinject +// +build !wireinject + +package main + +import ( + "context" + "github.com/openmeterio/openmeter/app/common" + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/app" + "github.com/openmeterio/openmeter/openmeter/app/stripe" + "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" + "github.com/openmeterio/openmeter/openmeter/watermill/router" + "log/slog" +) + +// Injectors from wire.go: + +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { + telemetryConfig := conf.Telemetry + logTelemetryConfig := telemetryConfig.Log + commonMetadata := metadata(conf) + resource := common.NewTelemetryResource(commonMetadata) + loggerProvider, cleanup, err := common.NewLoggerProvider(ctx, logTelemetryConfig, resource) + if err != nil { + return Application{}, nil, err + } + logger := common.NewLogger(logTelemetryConfig, resource, loggerProvider, commonMetadata) + metricsTelemetryConfig := telemetryConfig.Metrics + meterProvider, cleanup2, err := common.NewMeterProvider(ctx, metricsTelemetryConfig, resource, logger) + if err != nil { + cleanup() + return Application{}, nil, err + } + traceTelemetryConfig := telemetryConfig.Trace + tracerProvider, cleanup3, err := common.NewTracerProvider(ctx, traceTelemetryConfig, resource, logger) + if err != nil { + cleanup2() + cleanup() + return Application{}, nil, err + } + textMapPropagator := common.NewDefaultTextMapPropagator() + globalInitializer := common.GlobalInitializer{ + Logger: logger, + MeterProvider: meterProvider, + TracerProvider: tracerProvider, + TextMapPropagator: textMapPropagator, + } + postgresConfig := conf.Postgres + meter := common.NewMeter(meterProvider, commonMetadata) + driver, cleanup4, err := common.NewPostgresDriver(ctx, postgresConfig, meterProvider, meter, tracerProvider, logger) + if err != nil { + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + db := common.NewDB(driver) + entPostgresDriver, cleanup5 := common.NewEntPostgresDriver(db, logger) + client := common.NewEntClient(entPostgresDriver) + migrator := common.Migrator{ + Config: postgresConfig, + Client: client, + Logger: logger, + } + eventsConfiguration := conf.Events + billingConfiguration := conf.Billing + ingestConfiguration := conf.Ingest + kafkaIngestConfiguration := ingestConfiguration.Kafka + kafkaConfiguration := kafkaIngestConfiguration.KafkaConfiguration + brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) + subscriber, err := common.BillingWorkerSubscriber(billingConfiguration, brokerOptions) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + v := common.BillingWorkerProvisionTopics(billingConfiguration) + adminClient, err := common.NewKafkaAdminClient(kafkaConfiguration) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + topicProvisionerConfig := kafkaIngestConfiguration.TopicProvisionerConfig + kafkaTopicProvisionerConfig := common.NewKafkaTopicProvisionerConfig(adminClient, logger, meter, topicProvisionerConfig) + topicProvisioner, err := common.NewKafkaTopicProvisioner(kafkaTopicProvisionerConfig) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + publisherOptions := kafka.PublisherOptions{ + Broker: brokerOptions, + ProvisionTopics: v, + TopicProvisioner: topicProvisioner, + } + publisher, cleanup6, err := common.NewPublisher(ctx, publisherOptions, logger) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + billingWorkerConfiguration := billingConfiguration.Worker + consumerConfiguration := billingWorkerConfiguration.ConsumerConfiguration + options := router.Options{ + Subscriber: subscriber, + Publisher: publisher, + Logger: logger, + MetricMeter: meter, + Config: consumerConfiguration, + } + eventbusPublisher, err := common.NewEventBusPublisher(publisher, eventsConfiguration, logger) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + appsConfiguration := conf.Apps + service, err := common.NewAppService(logger, client, appsConfiguration) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + customerService, err := common.NewCustomerService(logger, client) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + secretService, err := common.NewSecretService(logger, client) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + appstripeService, err := common.NewAppStripeService(logger, client, appsConfiguration, service, customerService, secretService) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + v2 := conf.Meters + inMemoryRepository := common.NewMeterRepository(v2) + featureConnector := common.NewFeatureConnector(logger, client, inMemoryRepository) + aggregationConfiguration := conf.Aggregation + clickHouseAggregationConfiguration := aggregationConfiguration.ClickHouse + v3, err := common.NewClickHouse(clickHouseAggregationConfiguration) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + connector, err := common.NewStreamingConnector(ctx, aggregationConfiguration, v3, inMemoryRepository, logger) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + billingService, err := common.BillingService(logger, client, service, appstripeService, billingConfiguration, customerService, featureConnector, inMemoryRepository, connector) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + workerOptions := common.NewBillingWorkerOptions(eventsConfiguration, options, eventbusPublisher, billingService, logger) + worker, err := common.NewBillingWorker(workerOptions) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + health := common.NewHealthChecker(logger) + telemetryHandler := common.NewTelemetryHandler(metricsTelemetryConfig, health) + v4, cleanup7 := common.NewTelemetryServer(telemetryConfig, telemetryHandler) + group := common.BillingWorkerGroup(ctx, worker, v4) + runner := common.Runner{ + Group: group, + Logger: logger, + } + namespacedTopicResolver, err := common.NewNamespacedTopicResolver(conf) + if err != nil { + cleanup7() + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + namespaceHandler, err := common.NewKafkaNamespaceHandler(namespacedTopicResolver, topicProvisioner, kafkaIngestConfiguration) + if err != nil { + cleanup7() + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + v5 := common.NewNamespaceHandlers(namespaceHandler, connector) + namespaceConfiguration := conf.Namespace + manager, err := common.NewNamespaceManager(v5, namespaceConfiguration) + if err != nil { + cleanup7() + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + appSandboxProvisioner, err := common.NewAppSandboxProvisioner(ctx, logger, appsConfiguration, service, manager) + if err != nil { + cleanup7() + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + application := Application{ + GlobalInitializer: globalInitializer, + Migrator: migrator, + Runner: runner, + App: service, + AppStripe: appstripeService, + AppSandboxProvisioner: appSandboxProvisioner, + Logger: logger, + } + return application, func() { + cleanup7() + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + }, nil +} + +// wire.go: + +type Application struct { + common.GlobalInitializer + common.Migrator + common.Runner + + App app.Service + AppStripe appstripe.Service + AppSandboxProvisioner common.AppSandboxProvisioner + Logger *slog.Logger +} + +func metadata(conf config.Configuration) common.Metadata { + return common.NewMetadata(conf, version, "billing-worker") +} diff --git a/cmd/notification-service/main.go b/cmd/notification-service/main.go index 5c0c49f85..67774ec5d 100644 --- a/cmd/notification-service/main.go +++ b/cmd/notification-service/main.go @@ -15,10 +15,6 @@ import ( "github.com/openmeterio/openmeter/app/config" "github.com/openmeterio/openmeter/openmeter/notification/consumer" - notificationrepository "github.com/openmeterio/openmeter/openmeter/notification/repository" - notificationservice "github.com/openmeterio/openmeter/openmeter/notification/service" - notificationwebhook "github.com/openmeterio/openmeter/openmeter/notification/webhook" - registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder" watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" "github.com/openmeterio/openmeter/openmeter/watermill/router" ) @@ -103,47 +99,6 @@ func main() { os.Exit(1) } - // Dependencies: entitlement - entitlementConnRegistry := registrybuilder.GetEntitlementRegistry(registrybuilder.EntitlementOptions{ - DatabaseClient: app.EntClient, - StreamingConnector: app.StreamingConnector, - MeterRepository: app.MeterRepository, - Logger: logger, - Publisher: app.EventPublisher, - }) - - // Dependencies: notification - notificationRepo, err := notificationrepository.New(notificationrepository.Config{ - Client: app.EntClient, - Logger: logger.WithGroup("notification.postgres"), - }) - if err != nil { - logger.Error("failed to initialize notification repository", "error", err) - os.Exit(1) - } - - notificationWebhook, err := notificationwebhook.New(notificationwebhook.Config{ - SvixConfig: conf.Svix, - RegistrationTimeout: conf.Notification.Webhook.EventTypeRegistrationTimeout, - SkipRegistrationOnError: conf.Notification.Webhook.SkipEventTypeRegistrationOnError, - Logger: logger.WithGroup("notification.webhook"), - }) - if err != nil { - logger.Error("failed to initialize notification repository", "error", err) - os.Exit(1) - } - - notificationService, err := notificationservice.New(notificationservice.Config{ - Repository: notificationRepo, - Webhook: notificationWebhook, - FeatureConnector: entitlementConnRegistry.Feature, - Logger: logger.With(slog.String("subsystem", "notification")), - }) - if err != nil { - logger.Error("failed to initialize notification service", "error", err) - os.Exit(1) - } - // Initialize consumer consumerOptions := consumer.Options{ SystemEventsTopic: conf.Events.SystemEvents.Topic, @@ -157,7 +112,7 @@ func main() { }, Marshaler: app.EventPublisher.Marshaler(), - Notification: notificationService, + Notification: app.Notification, Logger: logger, } diff --git a/cmd/notification-service/wire.go b/cmd/notification-service/wire.go index bb7bef6eb..c1914060c 100644 --- a/cmd/notification-service/wire.go +++ b/cmd/notification-service/wire.go @@ -15,6 +15,8 @@ import ( "github.com/openmeterio/openmeter/app/config" "github.com/openmeterio/openmeter/openmeter/ent/db" "github.com/openmeterio/openmeter/openmeter/meter" + "github.com/openmeterio/openmeter/openmeter/notification" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" "github.com/openmeterio/openmeter/openmeter/streaming" watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" @@ -24,33 +26,36 @@ type Application struct { common.GlobalInitializer common.Migrator - Metadata common.Metadata - - StreamingConnector streaming.Connector - MeterRepository meter.Repository - EntClient *db.Client - TelemetryServer common.TelemetryServer BrokerOptions watermillkafka.BrokerOptions - MessagePublisher message.Publisher EventPublisher eventbus.Publisher - - Logger *slog.Logger - Meter metric.Meter + EntClient *db.Client + FeatureConnector feature.FeatureConnector + Logger *slog.Logger + MessagePublisher message.Publisher + Meter metric.Meter + Metadata common.Metadata + MeterRepository meter.Repository + Notification notification.Service + StreamingConnector streaming.Connector + TelemetryServer common.TelemetryServer } func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { wire.Build( metadata, common.Config, - common.Framework, - common.Telemetry, - common.NewDefaultTextMapPropagator, - common.Database, common.ClickHouse, + common.Database, + common.Feature, + common.Framework, common.KafkaTopic, + common.NewDefaultTextMapPropagator, + common.Notification, common.NotificationServiceProvisionTopics, - common.Watermill, + common.Svix, common.OpenMeter, + common.Telemetry, + common.Watermill, wire.Struct(new(Application), "*"), ) return Application{}, nil, nil diff --git a/cmd/notification-service/wire_gen.go b/cmd/notification-service/wire_gen.go index 5b3769f42..ee2ee462d 100644 --- a/cmd/notification-service/wire_gen.go +++ b/cmd/notification-service/wire_gen.go @@ -13,6 +13,8 @@ import ( "github.com/openmeterio/openmeter/app/config" "github.com/openmeterio/openmeter/openmeter/ent/db" "github.com/openmeterio/openmeter/openmeter/meter" + "github.com/openmeterio/openmeter/openmeter/notification" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" "github.com/openmeterio/openmeter/openmeter/streaming" "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" @@ -69,9 +71,13 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl Client: client, Logger: logger, } - aggregationConfiguration := conf.Aggregation - clickHouseAggregationConfiguration := aggregationConfiguration.ClickHouse - v, err := common.NewClickHouse(clickHouseAggregationConfiguration) + ingestConfiguration := conf.Ingest + kafkaIngestConfiguration := ingestConfiguration.Kafka + kafkaConfiguration := kafkaIngestConfiguration.KafkaConfiguration + brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) + notificationConfiguration := conf.Notification + v := common.NotificationServiceProvisionTopics(notificationConfiguration) + adminClient, err := common.NewKafkaAdminClient(kafkaConfiguration) if err != nil { cleanup5() cleanup4() @@ -80,9 +86,9 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - v2 := conf.Meters - inMemoryRepository := common.NewMeterRepository(v2) - connector, err := common.NewStreamingConnector(ctx, aggregationConfiguration, v, inMemoryRepository, logger) + topicProvisionerConfig := kafkaIngestConfiguration.TopicProvisionerConfig + kafkaTopicProvisionerConfig := common.NewKafkaTopicProvisionerConfig(adminClient, logger, meter, topicProvisionerConfig) + topicProvisioner, err := common.NewKafkaTopicProvisioner(kafkaTopicProvisionerConfig) if err != nil { cleanup5() cleanup4() @@ -91,18 +97,13 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - health := common.NewHealthChecker(logger) - telemetryHandler := common.NewTelemetryHandler(metricsTelemetryConfig, health) - v3, cleanup6 := common.NewTelemetryServer(telemetryConfig, telemetryHandler) - ingestConfiguration := conf.Ingest - kafkaIngestConfiguration := ingestConfiguration.Kafka - kafkaConfiguration := kafkaIngestConfiguration.KafkaConfiguration - brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) - notificationConfiguration := conf.Notification - v4 := common.NotificationServiceProvisionTopics(notificationConfiguration) - adminClient, err := common.NewKafkaAdminClient(kafkaConfiguration) + publisherOptions := kafka.PublisherOptions{ + Broker: brokerOptions, + ProvisionTopics: v, + TopicProvisioner: topicProvisioner, + } + publisher, cleanup6, err := common.NewPublisher(ctx, publisherOptions, logger) if err != nil { - cleanup6() cleanup5() cleanup4() cleanup3() @@ -110,9 +111,8 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - topicProvisionerConfig := kafkaIngestConfiguration.TopicProvisionerConfig - kafkaTopicProvisionerConfig := common.NewKafkaTopicProvisionerConfig(adminClient, logger, meter, topicProvisionerConfig) - topicProvisioner, err := common.NewKafkaTopicProvisioner(kafkaTopicProvisionerConfig) + eventsConfiguration := conf.Events + eventbusPublisher, err := common.NewEventBusPublisher(publisher, eventsConfiguration, logger) if err != nil { cleanup6() cleanup5() @@ -122,12 +122,23 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - publisherOptions := kafka.PublisherOptions{ - Broker: brokerOptions, - ProvisionTopics: v4, - TopicProvisioner: topicProvisioner, + v2 := conf.Meters + inMemoryRepository := common.NewMeterRepository(v2) + featureConnector := common.NewFeatureConnector(logger, client, inMemoryRepository) + v3 := conf.Svix + service, err := common.NewNotificationService(logger, client, notificationConfiguration, v3, featureConnector) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err } - publisher, cleanup7, err := common.NewPublisher(ctx, publisherOptions, logger) + aggregationConfiguration := conf.Aggregation + clickHouseAggregationConfiguration := aggregationConfiguration.ClickHouse + v4, err := common.NewClickHouse(clickHouseAggregationConfiguration) if err != nil { cleanup6() cleanup5() @@ -137,10 +148,8 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - eventsConfiguration := conf.Events - eventbusPublisher, err := common.NewEventBusPublisher(publisher, eventsConfiguration, logger) + connector, err := common.NewStreamingConnector(ctx, aggregationConfiguration, v4, inMemoryRepository, logger) if err != nil { - cleanup7() cleanup6() cleanup5() cleanup4() @@ -149,19 +158,24 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } + health := common.NewHealthChecker(logger) + telemetryHandler := common.NewTelemetryHandler(metricsTelemetryConfig, health) + v5, cleanup7 := common.NewTelemetryServer(telemetryConfig, telemetryHandler) application := Application{ GlobalInitializer: globalInitializer, Migrator: migrator, - Metadata: commonMetadata, - StreamingConnector: connector, - MeterRepository: inMemoryRepository, - EntClient: client, - TelemetryServer: v3, BrokerOptions: brokerOptions, - MessagePublisher: publisher, EventPublisher: eventbusPublisher, + EntClient: client, + FeatureConnector: featureConnector, Logger: logger, + MessagePublisher: publisher, Meter: meter, + Metadata: commonMetadata, + MeterRepository: inMemoryRepository, + Notification: service, + StreamingConnector: connector, + TelemetryServer: v5, } return application, func() { cleanup7() @@ -180,18 +194,18 @@ type Application struct { common.GlobalInitializer common.Migrator - Metadata common.Metadata - - StreamingConnector streaming.Connector - MeterRepository meter.Repository - EntClient *db.Client - TelemetryServer common.TelemetryServer BrokerOptions kafka.BrokerOptions - MessagePublisher message.Publisher EventPublisher eventbus.Publisher - - Logger *slog.Logger - Meter metric.Meter + EntClient *db.Client + FeatureConnector feature.FeatureConnector + Logger *slog.Logger + MessagePublisher message.Publisher + Meter metric.Meter + Metadata common.Metadata + MeterRepository meter.Repository + Notification notification.Service + StreamingConnector streaming.Connector + TelemetryServer common.TelemetryServer } func metadata(conf config.Configuration) common.Metadata { diff --git a/cmd/server/main.go b/cmd/server/main.go index df8500f51..4f5891b7e 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -16,44 +16,15 @@ import ( "github.com/spf13/viper" "github.com/openmeterio/openmeter/app/config" - apppkg "github.com/openmeterio/openmeter/openmeter/app" - appadapter "github.com/openmeterio/openmeter/openmeter/app/adapter" - appsandbox "github.com/openmeterio/openmeter/openmeter/app/sandbox" - appservice "github.com/openmeterio/openmeter/openmeter/app/service" - appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe" - appstripeadapter "github.com/openmeterio/openmeter/openmeter/app/stripe/adapter" - appstripeservice "github.com/openmeterio/openmeter/openmeter/app/stripe/service" - "github.com/openmeterio/openmeter/openmeter/billing" - billingadapter "github.com/openmeterio/openmeter/openmeter/billing/adapter" - billingservice "github.com/openmeterio/openmeter/openmeter/billing/service" - "github.com/openmeterio/openmeter/openmeter/customer" - customeradapter "github.com/openmeterio/openmeter/openmeter/customer/adapter" - customerservice "github.com/openmeterio/openmeter/openmeter/customer/service" "github.com/openmeterio/openmeter/openmeter/debug" "github.com/openmeterio/openmeter/openmeter/ingest" "github.com/openmeterio/openmeter/openmeter/ingest/ingestdriver" "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest" "github.com/openmeterio/openmeter/openmeter/namespace" "github.com/openmeterio/openmeter/openmeter/namespace/namespacedriver" - "github.com/openmeterio/openmeter/openmeter/notification" - notificationrepository "github.com/openmeterio/openmeter/openmeter/notification/repository" - notificationservice "github.com/openmeterio/openmeter/openmeter/notification/service" - notificationwebhook "github.com/openmeterio/openmeter/openmeter/notification/webhook" - plan "github.com/openmeterio/openmeter/openmeter/productcatalog/plan" - planadapter "github.com/openmeterio/openmeter/openmeter/productcatalog/plan/adapter" - planservice "github.com/openmeterio/openmeter/openmeter/productcatalog/plan/service" - plansubscription "github.com/openmeterio/openmeter/openmeter/productcatalog/subscription" - "github.com/openmeterio/openmeter/openmeter/registry" - registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder" - secretadapter "github.com/openmeterio/openmeter/openmeter/secret/adapter" - secretservice "github.com/openmeterio/openmeter/openmeter/secret/service" "github.com/openmeterio/openmeter/openmeter/server" "github.com/openmeterio/openmeter/openmeter/server/authenticator" "github.com/openmeterio/openmeter/openmeter/server/router" - "github.com/openmeterio/openmeter/openmeter/subscription" - subscriptionentitlement "github.com/openmeterio/openmeter/openmeter/subscription/adapters/entitlement" - subscriptionrepo "github.com/openmeterio/openmeter/openmeter/subscription/repo" - subscriptionservice "github.com/openmeterio/openmeter/openmeter/subscription/service" "github.com/openmeterio/openmeter/pkg/errorsx" ) @@ -105,7 +76,10 @@ func main() { if err != nil { slog.Error("failed to initialize application", "error", err) - cleanup() + // Call cleanup function is may not set yet + if cleanup != nil { + cleanup() + } os.Exit(1) } @@ -156,264 +130,20 @@ func main() { } } + // Initialize debug connector debugConnector := debug.NewDebugConnector(app.StreamingConnector) - entitlementConnRegistry := ®istry.Entitlement{} + // Migrate database if err := app.Migrate(ctx); err != nil { logger.Error("failed to initialize database", "error", err) os.Exit(1) } - if conf.Entitlements.Enabled { - entitlementConnRegistry = registrybuilder.GetEntitlementRegistry(registrybuilder.EntitlementOptions{ - DatabaseClient: app.EntClient, - StreamingConnector: app.StreamingConnector, - MeterRepository: app.MeterRepository, - Logger: logger, - Publisher: app.EventPublisher, - }) - } - - // Initialize Customer - var customerService customer.CustomerService - - if app.EntClient != nil { - var customerAdapter customer.Adapter - customerAdapter, err = customeradapter.New(customeradapter.Config{ - Client: app.EntClient, - Logger: logger.WithGroup("customer.postgres"), - }) - if err != nil { - logger.Error("failed to initialize customer repository", "error", err) - os.Exit(1) - } - - customerService, err = customerservice.New(customerservice.Config{ - Adapter: customerAdapter, - }) - if err != nil { - logger.Error("failed to initialize customer service", "error", err) - os.Exit(1) - } - } - - // Initialize Secret - secretService, err := secretservice.New(secretservice.Config{ - Adapter: secretadapter.New(), - }) - if err != nil { - logger.Error("failed to initialize secret service", "error", err) - os.Exit(1) - } - - // Initialize App - var appService apppkg.Service - - if conf.Apps.Enabled { - var appAdapter apppkg.Adapter - appAdapter, err = appadapter.New(appadapter.Config{ - Client: app.EntClient, - BaseURL: conf.StripeApp.IncomingWebhook.BaseURL, - }) - if err != nil { - logger.Error("failed to initialize app repository", "error", err) - os.Exit(1) - } - - appService, err = appservice.New(appservice.Config{ - Adapter: appAdapter, - }) - if err != nil { - logger.Error("failed to initialize app service", "error", err) - os.Exit(1) - } - } - - // Initialize AppStripe - var appStripeService appstripe.Service - + // Provision sandbox app if conf.Apps.Enabled { - var appStripeAdapter appstripe.Adapter - appStripeAdapter, err = appstripeadapter.New(appstripeadapter.Config{ - Client: app.EntClient, - AppService: appService, - CustomerService: customerService, - SecretService: secretService, - }) - if err != nil { - logger.Error("failed to initialize app stripe repository", "error", err) - os.Exit(1) - } - - appStripeService, err = appstripeservice.New(appstripeservice.Config{ - Adapter: appStripeAdapter, - AppService: appService, - SecretService: secretService, - }) - if err != nil { - logger.Error("failed to initialize app stripe service", "error", err) - os.Exit(1) - } - } - - // Initialize AppSandbox - if conf.Apps.Enabled { - _, err = appsandbox.NewFactory(appsandbox.Config{ - AppService: appService, - }) - if err != nil { - logger.Error("failed to initialize app sandbox factory", "error", err) - os.Exit(1) - } - - app, err := appsandbox.AutoProvision(ctx, appsandbox.AutoProvisionInput{ - Namespace: app.NamespaceManager.GetDefaultNamespace(), - AppService: appService, - }) - if err != nil { - logger.Error("failed to auto-provision sandbox app", "error", err) - os.Exit(1) - } - - logger.Info("sandbox app auto-provisioned", "app_id", app.GetID().ID) - } - - // Initialize Notification - var notificationService notification.Service - - if conf.Notification.Enabled { - if !conf.Entitlements.Enabled { - logger.Error("failed to initialize notification service: entitlements must be enabled") - os.Exit(1) - } - - // CreatingPG client is done as part of entitlements initialization - if app.EntClient == nil { - logger.Error("failed to initialize notification service: postgres client is not initialized") - os.Exit(1) - } - - var notificationRepo notification.Repository - notificationRepo, err = notificationrepository.New(notificationrepository.Config{ - Client: app.EntClient, - Logger: logger.WithGroup("notification.postgres"), - }) - if err != nil { - logger.Error("failed to initialize notification repository", "error", err) - os.Exit(1) - } - - var notificationWebhook notificationwebhook.Handler - notificationWebhook, err = notificationwebhook.New(notificationwebhook.Config{ - SvixConfig: conf.Svix, - RegistrationTimeout: conf.Notification.Webhook.EventTypeRegistrationTimeout, - SkipRegistrationOnError: conf.Notification.Webhook.SkipEventTypeRegistrationOnError, - Logger: logger.WithGroup("notification.webhook"), - }) - if err != nil { - logger.Error("failed to initialize notification webhook handler", "error", err) - os.Exit(1) - } - - notificationService, err = notificationservice.New(notificationservice.Config{ - Repository: notificationRepo, - Webhook: notificationWebhook, - FeatureConnector: entitlementConnRegistry.Feature, - Logger: logger.With(slog.String("subsystem", "notification")), - }) - if err != nil { - logger.Error("failed to initialize notification service", "error", err) - os.Exit(1) - } - defer func() { - if err = notificationService.Close(); err != nil { - logger.Error("failed to close notification service", "error", err) - } - }() - } - - // Initialize plans & subscriptions - var planService plan.Service - if conf.ProductCatalog.Enabled { - adapter, err := planadapter.New(planadapter.Config{ - Client: app.EntClient, - Logger: logger.With("subsystem", "productcatalog.plan"), - }) - if err != nil { - logger.Error("failed to initialize plan adapter", "error", err) - os.Exit(1) - } - - planService, err = planservice.New(planservice.Config{ - Feature: entitlementConnRegistry.Feature, - Adapter: adapter, - Logger: logger.With("subsystem", "productcatalog.plan"), - }) - if err != nil { - logger.Error("failed to initialize plan service", "error", err) - os.Exit(1) - } - } - - // Initialize subscriptions - var subscriptionService subscription.Service - var subscriptionWorkflowService subscription.WorkflowService - var planSubscriptionAdapter plansubscription.Adapter - if conf.ProductCatalog.Enabled { - subscriptionRepo := subscriptionrepo.NewSubscriptionRepo(app.EntClient) - subscriptionPhaseRepo := subscriptionrepo.NewSubscriptionPhaseRepo(app.EntClient) - subscriptionItemRepo := subscriptionrepo.NewSubscriptionItemRepo(app.EntClient) - - subscriptionEntitlementAdapter := subscriptionentitlement.NewSubscriptionEntitlementAdapter( - entitlementConnRegistry.Entitlement, - subscriptionItemRepo, - subscriptionItemRepo, - ) - - planSubscriptionAdapter = plansubscription.NewPlanSubscriptionAdapter(plansubscription.PlanSubscriptionAdapterConfig{ - PlanService: planService, - Logger: logger.With("subsystem", "subscription.plan.adapter"), - }) - - subscriptionService = subscriptionservice.New(subscriptionservice.ServiceConfig{ - SubscriptionRepo: subscriptionRepo, - SubscriptionPhaseRepo: subscriptionPhaseRepo, - SubscriptionItemRepo: subscriptionItemRepo, - CustomerService: customerService, - EntitlementAdapter: subscriptionEntitlementAdapter, - TransactionManager: subscriptionRepo, - }) - - subscriptionWorkflowService = subscriptionservice.NewWorkflowService(subscriptionservice.WorkflowServiceConfig{ - Service: subscriptionService, - CustomerService: customerService, - TransactionManager: subscriptionRepo, - }) - } - - // Initialize billing - var billingService billing.Service - if conf.Billing.Enabled { - adapter, err := billingadapter.New(billingadapter.Config{ - Client: app.EntClient, - Logger: logger.With("subsystem", "billing.adapter"), - }) - if err != nil { - logger.Error("failed to initialize billing adapter", "error", err) - os.Exit(1) - } - - billingService, err = billingservice.New(billingservice.Config{ - Adapter: adapter, - CustomerService: customerService, - AppService: appService, - Logger: logger.With("subsystem", "billing.service"), - FeatureService: entitlementConnRegistry.Feature, - MeterRepo: app.MeterRepository, - StreamingConnector: app.StreamingConnector, - }) + err = app.AppSandboxProvisioner() if err != nil { - logger.Error("failed to initialize billing service", "error", err) + logger.Error("failed to provision sandbox app", "error", err) os.Exit(1) } } @@ -428,22 +158,22 @@ func main() { PortalCORSEnabled: conf.Portal.CORS.Enabled, ErrorHandler: errorsx.NewSlogHandler(logger), // deps - App: appService, - AppStripe: appStripeService, - Billing: billingService, - Customer: customerService, + App: app.App, + AppStripe: app.AppStripe, + Billing: app.Billing, + Customer: app.Customer, DebugConnector: debugConnector, - EntitlementBalanceConnector: entitlementConnRegistry.MeteredEntitlement, - EntitlementConnector: entitlementConnRegistry.Entitlement, - SubscriptionService: subscriptionService, - SubscriptionWorkflowService: subscriptionWorkflowService, - SubscriptionPlanAdapter: planSubscriptionAdapter, + EntitlementBalanceConnector: app.EntitlementRegistry.MeteredEntitlement, + EntitlementConnector: app.EntitlementRegistry.Entitlement, + SubscriptionService: app.Subscription.Service, + SubscriptionWorkflowService: app.Subscription.WorkflowService, + SubscriptionPlanAdapter: app.SubscriptionPlanAdapter, Logger: logger, - FeatureConnector: entitlementConnRegistry.Feature, - GrantConnector: entitlementConnRegistry.Grant, - GrantRepo: entitlementConnRegistry.GrantRepo, - Notification: notificationService, - Plan: planService, + FeatureConnector: app.EntitlementRegistry.Feature, + GrantConnector: app.EntitlementRegistry.Grant, + GrantRepo: app.EntitlementRegistry.GrantRepo, + Notification: app.Notification, + Plan: app.Plan, // modules EntitlementsEnabled: conf.Entitlements.Enabled, NotificationEnabled: conf.Notification.Enabled, diff --git a/cmd/server/wire.go b/cmd/server/wire.go index 6a7d0445a..bd14272dd 100644 --- a/cmd/server/wire.go +++ b/cmd/server/wire.go @@ -14,10 +14,20 @@ import ( "github.com/openmeterio/openmeter/app/common" "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/app" + appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe" + "github.com/openmeterio/openmeter/openmeter/billing" + "github.com/openmeterio/openmeter/openmeter/customer" "github.com/openmeterio/openmeter/openmeter/ent/db" "github.com/openmeterio/openmeter/openmeter/ingest" "github.com/openmeterio/openmeter/openmeter/meter" "github.com/openmeterio/openmeter/openmeter/namespace" + "github.com/openmeterio/openmeter/openmeter/notification" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/openmeter/productcatalog/plan" + plansubscription "github.com/openmeterio/openmeter/openmeter/productcatalog/subscription" + "github.com/openmeterio/openmeter/openmeter/registry" + "github.com/openmeterio/openmeter/openmeter/secret" "github.com/openmeterio/openmeter/openmeter/streaming" "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics" @@ -27,40 +37,57 @@ type Application struct { common.GlobalInitializer common.Migrator - StreamingConnector streaming.Connector - MeterRepository meter.Repository - EntClient *db.Client - TelemetryServer common.TelemetryServer - KafkaProducer *kafka.Producer - KafkaMetrics *kafkametrics.Metrics - EventPublisher eventbus.Publisher - - IngestCollector ingest.Collector - - NamespaceHandlers []namespace.Handler - NamespaceManager *namespace.Manager - - Logger *slog.Logger - Meter metric.Meter - - RouterHook func(chi.Router) + App app.Service + AppStripe appstripe.Service + AppSandboxProvisioner common.AppSandboxProvisioner + Customer customer.Service + Billing billing.Service + EntClient *db.Client + EventPublisher eventbus.Publisher + EntitlementRegistry *registry.Entitlement + FeatureConnector feature.FeatureConnector + IngestCollector ingest.Collector + KafkaProducer *kafka.Producer + KafkaMetrics *kafkametrics.Metrics + Logger *slog.Logger + MeterRepository meter.Repository + NamespaceHandlers []namespace.Handler + NamespaceManager *namespace.Manager + Notification notification.Service + Meter metric.Meter + Plan plan.Service + Subscription common.SubscriptionServiceWithWorkflow + SubscriptionPlanAdapter plansubscription.Adapter + RouterHook func(chi.Router) + Secret secret.Service + StreamingConnector streaming.Connector + TelemetryServer common.TelemetryServer } func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { wire.Build( metadata, + common.App, + common.Billing, + common.ClickHouse, common.Config, + common.Customer, + common.Database, + common.Entitlement, common.Framework, - common.Telemetry, + common.Kafka, common.NewDefaultTextMapPropagator, + common.NewServerPublisher, common.NewTelemetryRouterHook, - common.Database, - common.ClickHouse, - common.Kafka, + common.Notification, + common.OpenMeter, + common.ProductCatalog, + common.Subscription, + common.Svix, + common.Secret, common.ServerProvisionTopics, + common.Telemetry, common.WatermillNoPublisher, - common.NewServerPublisher, - common.OpenMeter, wire.Struct(new(Application), "*"), ) diff --git a/cmd/server/wire_gen.go b/cmd/server/wire_gen.go index 53720d00a..a44734df4 100644 --- a/cmd/server/wire_gen.go +++ b/cmd/server/wire_gen.go @@ -12,10 +12,20 @@ import ( "github.com/go-chi/chi/v5" "github.com/openmeterio/openmeter/app/common" "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/app" + "github.com/openmeterio/openmeter/openmeter/app/stripe" + "github.com/openmeterio/openmeter/openmeter/billing" + "github.com/openmeterio/openmeter/openmeter/customer" "github.com/openmeterio/openmeter/openmeter/ent/db" "github.com/openmeterio/openmeter/openmeter/ingest" "github.com/openmeterio/openmeter/openmeter/meter" "github.com/openmeterio/openmeter/openmeter/namespace" + "github.com/openmeterio/openmeter/openmeter/notification" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/openmeter/productcatalog/plan" + "github.com/openmeterio/openmeter/openmeter/productcatalog/subscription" + "github.com/openmeterio/openmeter/openmeter/registry" + "github.com/openmeterio/openmeter/openmeter/secret" "github.com/openmeterio/openmeter/openmeter/streaming" "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" @@ -73,9 +83,8 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl Client: client, Logger: logger, } - aggregationConfiguration := conf.Aggregation - clickHouseAggregationConfiguration := aggregationConfiguration.ClickHouse - v, err := common.NewClickHouse(clickHouseAggregationConfiguration) + appsConfiguration := conf.Apps + service, err := common.NewAppService(logger, client, appsConfiguration) if err != nil { cleanup5() cleanup4() @@ -84,9 +93,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - v2 := conf.Meters - inMemoryRepository := common.NewMeterRepository(v2) - connector, err := common.NewStreamingConnector(ctx, aggregationConfiguration, v, inMemoryRepository, logger) + customerService, err := common.NewCustomerService(logger, client) if err != nil { cleanup5() cleanup4() @@ -95,12 +102,8 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - health := common.NewHealthChecker(logger) - telemetryHandler := common.NewTelemetryHandler(metricsTelemetryConfig, health) - v3, cleanup6 := common.NewTelemetryServer(telemetryConfig, telemetryHandler) - producer, err := common.NewKafkaProducer(conf, logger) + secretService, err := common.NewSecretService(logger, client) if err != nil { - cleanup6() cleanup5() cleanup4() cleanup3() @@ -108,9 +111,17 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - metrics, err := common.NewKafkaMetrics(meter) + appstripeService, err := common.NewAppStripeService(logger, client, appsConfiguration, service, customerService, secretService) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + namespacedTopicResolver, err := common.NewNamespacedTopicResolver(conf) if err != nil { - cleanup6() cleanup5() cleanup4() cleanup3() @@ -118,15 +129,11 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - eventsConfiguration := conf.Events ingestConfiguration := conf.Ingest kafkaIngestConfiguration := ingestConfiguration.Kafka kafkaConfiguration := kafkaIngestConfiguration.KafkaConfiguration - brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) - v4 := common.ServerProvisionTopics(eventsConfiguration) adminClient, err := common.NewKafkaAdminClient(kafkaConfiguration) if err != nil { - cleanup6() cleanup5() cleanup4() cleanup3() @@ -138,7 +145,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl kafkaTopicProvisionerConfig := common.NewKafkaTopicProvisionerConfig(adminClient, logger, meter, topicProvisionerConfig) topicProvisioner, err := common.NewKafkaTopicProvisioner(kafkaTopicProvisionerConfig) if err != nil { - cleanup6() cleanup5() cleanup4() cleanup3() @@ -146,14 +152,78 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } + namespaceHandler, err := common.NewKafkaNamespaceHandler(namespacedTopicResolver, topicProvisioner, kafkaIngestConfiguration) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + aggregationConfiguration := conf.Aggregation + clickHouseAggregationConfiguration := aggregationConfiguration.ClickHouse + v, err := common.NewClickHouse(clickHouseAggregationConfiguration) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + v2 := conf.Meters + inMemoryRepository := common.NewMeterRepository(v2) + connector, err := common.NewStreamingConnector(ctx, aggregationConfiguration, v, inMemoryRepository, logger) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + v3 := common.NewNamespaceHandlers(namespaceHandler, connector) + namespaceConfiguration := conf.Namespace + manager, err := common.NewNamespaceManager(v3, namespaceConfiguration) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + appSandboxProvisioner, err := common.NewAppSandboxProvisioner(ctx, logger, appsConfiguration, service, manager) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + billingConfiguration := conf.Billing + featureConnector := common.NewFeatureConnector(logger, client, inMemoryRepository) + billingService, err := common.BillingService(logger, client, service, appstripeService, billingConfiguration, customerService, featureConnector, inMemoryRepository, connector) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + eventsConfiguration := conf.Events + brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) + v4 := common.ServerProvisionTopics(eventsConfiguration) publisherOptions := kafka.PublisherOptions{ Broker: brokerOptions, ProvisionTopics: v4, TopicProvisioner: topicProvisioner, } - publisher, cleanup7, err := common.NewServerPublisher(ctx, eventsConfiguration, publisherOptions, logger) + publisher, cleanup6, err := common.NewServerPublisher(ctx, eventsConfiguration, publisherOptions, logger) if err != nil { - cleanup6() cleanup5() cleanup4() cleanup3() @@ -163,7 +233,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl } eventbusPublisher, err := common.NewEventBusPublisher(publisher, eventsConfiguration, logger) if err != nil { - cleanup7() cleanup6() cleanup5() cleanup4() @@ -172,9 +241,10 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - namespacedTopicResolver, err := common.NewNamespacedTopicResolver(conf) + entitlementsConfiguration := conf.Entitlements + entitlement := common.NewEntitlementRegistry(logger, client, entitlementsConfiguration, connector, inMemoryRepository, eventbusPublisher) + producer, err := common.NewKafkaProducer(conf, logger) if err != nil { - cleanup7() cleanup6() cleanup5() cleanup4() @@ -184,6 +254,26 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl return Application{}, nil, err } collector, err := common.NewKafkaIngestCollector(kafkaIngestConfiguration, producer, namespacedTopicResolver, topicProvisioner) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + ingestCollector, cleanup7, err := common.NewIngestCollector(conf, collector, logger, meter) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + metrics, err := common.NewKafkaMetrics(meter) if err != nil { cleanup7() cleanup6() @@ -194,7 +284,9 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - ingestCollector, cleanup8, err := common.NewIngestCollector(conf, collector, logger, meter) + notificationConfiguration := conf.Notification + v5 := conf.Svix + notificationService, err := common.NewNotificationService(logger, client, notificationConfiguration, v5, featureConnector) if err != nil { cleanup7() cleanup6() @@ -205,9 +297,9 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - namespaceHandler, err := common.NewKafkaNamespaceHandler(namespacedTopicResolver, topicProvisioner, kafkaIngestConfiguration) + productCatalogConfiguration := conf.ProductCatalog + planService, err := common.NewPlanService(logger, client, productCatalogConfiguration, featureConnector) if err != nil { - cleanup8() cleanup7() cleanup6() cleanup5() @@ -217,11 +309,8 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - v5 := common.NewNamespaceHandlers(namespaceHandler, connector) - namespaceConfiguration := conf.Namespace - manager, err := common.NewNamespaceManager(v5, namespaceConfiguration) + subscriptionServiceWithWorkflow, err := common.NewSubscriptionService(logger, client, productCatalogConfiguration, entitlementsConfiguration, featureConnector, entitlement, customerService, planService) if err != nil { - cleanup8() cleanup7() cleanup6() cleanup5() @@ -231,23 +320,39 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } + adapter := common.NewPlanSubscriptionAdapter(logger, client, planService) v6 := common.NewTelemetryRouterHook(meterProvider, tracerProvider) + health := common.NewHealthChecker(logger) + telemetryHandler := common.NewTelemetryHandler(metricsTelemetryConfig, health) + v7, cleanup8 := common.NewTelemetryServer(telemetryConfig, telemetryHandler) application := Application{ - GlobalInitializer: globalInitializer, - Migrator: migrator, - StreamingConnector: connector, - MeterRepository: inMemoryRepository, - EntClient: client, - TelemetryServer: v3, - KafkaProducer: producer, - KafkaMetrics: metrics, - EventPublisher: eventbusPublisher, - IngestCollector: ingestCollector, - NamespaceHandlers: v5, - NamespaceManager: manager, - Logger: logger, - Meter: meter, - RouterHook: v6, + GlobalInitializer: globalInitializer, + Migrator: migrator, + App: service, + AppStripe: appstripeService, + AppSandboxProvisioner: appSandboxProvisioner, + Customer: customerService, + Billing: billingService, + EntClient: client, + EventPublisher: eventbusPublisher, + EntitlementRegistry: entitlement, + FeatureConnector: featureConnector, + IngestCollector: ingestCollector, + KafkaProducer: producer, + KafkaMetrics: metrics, + Logger: logger, + MeterRepository: inMemoryRepository, + NamespaceHandlers: v3, + NamespaceManager: manager, + Notification: notificationService, + Meter: meter, + Plan: planService, + Subscription: subscriptionServiceWithWorkflow, + SubscriptionPlanAdapter: adapter, + RouterHook: v6, + Secret: secretService, + StreamingConnector: connector, + TelemetryServer: v7, } return application, func() { cleanup8() @@ -267,23 +372,31 @@ type Application struct { common.GlobalInitializer common.Migrator - StreamingConnector streaming.Connector - MeterRepository meter.Repository - EntClient *db.Client - TelemetryServer common.TelemetryServer - KafkaProducer *kafka2.Producer - KafkaMetrics *metrics.Metrics - EventPublisher eventbus.Publisher - - IngestCollector ingest.Collector - - NamespaceHandlers []namespace.Handler - NamespaceManager *namespace.Manager - - Logger *slog.Logger - Meter metric.Meter - - RouterHook func(chi.Router) + App app.Service + AppStripe appstripe.Service + AppSandboxProvisioner common.AppSandboxProvisioner + Customer customer.Service + Billing billing.Service + EntClient *db.Client + EventPublisher eventbus.Publisher + EntitlementRegistry *registry.Entitlement + FeatureConnector feature.FeatureConnector + IngestCollector ingest.Collector + KafkaProducer *kafka2.Producer + KafkaMetrics *metrics.Metrics + Logger *slog.Logger + MeterRepository meter.Repository + NamespaceHandlers []namespace.Handler + NamespaceManager *namespace.Manager + Notification notification.Service + Meter metric.Meter + Plan plan.Service + Subscription common.SubscriptionServiceWithWorkflow + SubscriptionPlanAdapter plansubscription.Adapter + RouterHook func(chi.Router) + Secret secret.Service + StreamingConnector streaming.Connector + TelemetryServer common.TelemetryServer } func metadata(conf config.Configuration) common.Metadata { diff --git a/cmd/sink-worker/main.go b/cmd/sink-worker/main.go index e7c346a19..9d84e47f8 100644 --- a/cmd/sink-worker/main.go +++ b/cmd/sink-worker/main.go @@ -79,7 +79,10 @@ func main() { if err != nil { slog.Error("failed to initialize application", "error", err) - cleanup() + // Call cleanup function is may not set yet + if cleanup != nil { + cleanup() + } os.Exit(1) } diff --git a/openmeter/billing/worker/worker.go b/openmeter/billing/worker/worker.go new file mode 100644 index 000000000..c9876455b --- /dev/null +++ b/openmeter/billing/worker/worker.go @@ -0,0 +1,119 @@ +package billingworker + +import ( + "context" + "fmt" + "log/slog" + + "github.com/ThreeDotsLabs/watermill/message" + + "github.com/openmeterio/openmeter/openmeter/billing" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" + "github.com/openmeterio/openmeter/openmeter/watermill/grouphandler" + "github.com/openmeterio/openmeter/openmeter/watermill/router" +) + +type WorkerOptions struct { + SystemEventsTopic string + + Router router.Options + EventBus eventbus.Publisher + + Logger *slog.Logger + + BillingService billing.Service + // External connectors +} + +func (w WorkerOptions) Validate() error { + if w.SystemEventsTopic == "" { + return fmt.Errorf("system events topic is required") + } + + if err := w.Router.Validate(); err != nil { + return fmt.Errorf("router: %w", err) + } + + if w.EventBus == nil { + return fmt.Errorf("event bus is required") + } + + if w.Logger == nil { + return fmt.Errorf("logger is required") + } + + if w.BillingService == nil { + return fmt.Errorf("billing service is required") + } + + return nil +} + +type Worker struct { + router *message.Router + + billingService billing.Service +} + +func New(opts WorkerOptions) (*Worker, error) { + if err := opts.Validate(); err != nil { + return nil, err + } + + worker := &Worker{ + billingService: opts.BillingService, + } + + router, err := router.NewDefaultRouter(opts.Router) + if err != nil { + return nil, err + } + + worker.router = router + + eventHandler, err := worker.eventHandler(opts) + if err != nil { + return nil, err + } + + router.AddNoPublisherHandler( + "billing_worker_system_events", + opts.SystemEventsTopic, + opts.Router.Subscriber, + eventHandler, + ) + + return worker, nil +} + +func (w *Worker) eventHandler(opts WorkerOptions) (message.NoPublishHandlerFunc, error) { + return grouphandler.NewNoPublishingHandler( + opts.EventBus.Marshaler(), + opts.Router.MetricMeter, + + /* TODO: + + // Entitlement created event + grouphandler.NewGroupEventHandler(func(ctx context.Context, event *entitlement.EntitlementCreatedEvent) error { + return w.opts.EventBus. + WithContext(ctx). + PublishIfNoError(w.handleEntitlementEvent( + ctx, + NamespacedID{Namespace: event.Namespace.ID, ID: event.ID}, + metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, event.ID), + )) + }), */ + ) +} + +func (w *Worker) Run(ctx context.Context) error { + return w.router.Run(ctx) +} + +func (w *Worker) Close() error { + if err := w.router.Close(); err != nil { + return err + } + + return nil +}