From 13133616975ef0e2c1de41dfdb3bd2319a302bdf Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Thu, 12 Dec 2024 15:11:29 +0100 Subject: [PATCH 1/4] feat: billing worker --- app/common/billing.go | 151 ++++++++++++++++++++ app/common/customer.go | 1 + app/common/openmeter.go | 20 +++ app/common/wire.go | 1 + app/config/billing.go | 1 + app/config/billingworker.go | 29 ++++ app/config/config.go | 1 + cmd/billing-worker/.air.toml | 44 ++++++ cmd/billing-worker/main.go | 87 ++++++++++++ cmd/billing-worker/version.go | 34 +++++ cmd/billing-worker/wire.go | 45 ++++++ cmd/billing-worker/wire_gen.go | 218 +++++++++++++++++++++++++++++ openmeter/billing/worker/worker.go | 119 ++++++++++++++++ 13 files changed, 751 insertions(+) create mode 100644 app/common/billing.go create mode 100644 app/common/customer.go create mode 100644 app/config/billingworker.go create mode 100644 cmd/billing-worker/.air.toml create mode 100644 cmd/billing-worker/main.go create mode 100644 cmd/billing-worker/version.go create mode 100644 cmd/billing-worker/wire.go create mode 100644 cmd/billing-worker/wire_gen.go create mode 100644 openmeter/billing/worker/worker.go diff --git a/app/common/billing.go b/app/common/billing.go new file mode 100644 index 000000000..b78575c00 --- /dev/null +++ b/app/common/billing.go @@ -0,0 +1,151 @@ +package common + +import ( + "context" + "fmt" + "log/slog" + "syscall" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/google/wire" + "github.com/oklog/run" + + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/app" + "github.com/openmeterio/openmeter/openmeter/billing" + billingadapter "github.com/openmeterio/openmeter/openmeter/billing/adapter" + billingservice "github.com/openmeterio/openmeter/openmeter/billing/service" + billingworker "github.com/openmeterio/openmeter/openmeter/billing/worker" + "github.com/openmeterio/openmeter/openmeter/customer" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/meter" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/openmeter/streaming" + watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" + "github.com/openmeterio/openmeter/openmeter/watermill/router" + pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" +) + +func BillingWorkerProvisionTopics(conf config.BillingConfiguration) []pkgkafka.TopicConfig { + var provisionTopics []pkgkafka.TopicConfig + + if conf.Worker.DLQ.AutoProvision.Enabled { + provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{ + Name: conf.Worker.DLQ.Topic, + Partitions: conf.Worker.DLQ.AutoProvision.Partitions, + RetentionTime: pkgkafka.TimeDurationMilliSeconds(conf.Worker.DLQ.AutoProvision.Retention), + }) + } + + return provisionTopics +} + +// no closer function: the subscriber is closed by the router/worker +func BillingWorkerSubscriber(conf config.BillingConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error) { + subscriber, err := watermillkafka.NewSubscriber(watermillkafka.SubscriberOptions{ + Broker: brokerOptions, + ConsumerGroupName: conf.Worker.ConsumerGroupName, + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize Kafka subscriber: %w", err) + } + + return subscriber, nil +} + +func NewBillingWorkerOptions( + eventConfig config.EventsConfiguration, + routerOptions router.Options, + eventBus eventbus.Publisher, + billingService billing.Service, + logger *slog.Logger, +) billingworker.WorkerOptions { + return billingworker.WorkerOptions{ + SystemEventsTopic: eventConfig.SystemEvents.Topic, + + Router: routerOptions, + EventBus: eventBus, + BillingService: billingService, + Logger: logger, + } +} + +func NewBillingWorker(workerOptions billingworker.WorkerOptions) (*billingworker.Worker, error) { + worker, err := billingworker.New(workerOptions) + if err != nil { + return nil, fmt.Errorf("failed to initialize worker: %w", err) + } + + return worker, nil +} + +func BillingWorkerGroup( + ctx context.Context, + worker *billingworker.Worker, + telemetryServer TelemetryServer, +) run.Group { + var group run.Group + + group.Add( + func() error { return telemetryServer.ListenAndServe() }, + func(err error) { _ = telemetryServer.Shutdown(ctx) }, + ) + + group.Add( + func() error { return worker.Run(ctx) }, + func(err error) { _ = worker.Close() }, + ) + + group.Add(run.SignalHandler(ctx, syscall.SIGINT, syscall.SIGTERM)) + + return group +} + +func BillingService( + logger *slog.Logger, + db *entdb.Client, + billingConfig config.BillingConfiguration, + customerService customer.Service, + appService app.Service, + featureConnector feature.FeatureConnector, + meterRepo meter.Repository, + streamingConnector streaming.Connector, +) (billing.Service, error) { + if !billingConfig.Enabled { + return nil, nil + } + + adapter, err := billingadapter.New(billingadapter.Config{ + Client: db, + Logger: logger, + }) + if err != nil { + return nil, fmt.Errorf("creating billing adapter: %w", err) + } + + return billingservice.New(billingservice.Config{ + Adapter: adapter, + CustomerService: customerService, + AppService: appService, + Logger: logger, + FeatureService: featureConnector, + MeterRepo: meterRepo, + StreamingConnector: streamingConnector, + }) +} + +var BillingWorker = wire.NewSet( + wire.FieldsOf(new(config.BillingWorkerConfiguration), "ConsumerConfiguration"), + wire.FieldsOf(new(config.BillingConfiguration), "Worker"), + + BillingWorkerProvisionTopics, + BillingWorkerSubscriber, + + NewCustomerService, + BillingService, + + NewBillingWorkerOptions, + NewBillingWorker, + BillingWorkerGroup, +) diff --git a/app/common/customer.go b/app/common/customer.go new file mode 100644 index 000000000..805d0c79a --- /dev/null +++ b/app/common/customer.go @@ -0,0 +1 @@ +package common diff --git a/app/common/openmeter.go b/app/common/openmeter.go index bf60d27ad..2e8860490 100644 --- a/app/common/openmeter.go +++ b/app/common/openmeter.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "os" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ThreeDotsLabs/watermill/message" @@ -12,6 +13,10 @@ import ( "go.opentelemetry.io/otel/metric" "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/customer" + customeradapter "github.com/openmeterio/openmeter/openmeter/customer/adapter" + customerservice "github.com/openmeterio/openmeter/openmeter/customer/service" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" "github.com/openmeterio/openmeter/openmeter/ingest" "github.com/openmeterio/openmeter/openmeter/ingest/ingestadapter" "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest" @@ -251,3 +256,18 @@ func NewFlushHandler( return flushHandlerMux, nil } + +func NewCustomerService(logger *slog.Logger, db *entdb.Client) (customer.Service, error) { + customerAdapter, err := customeradapter.New(customeradapter.Config{ + Client: db, + Logger: logger.WithGroup("customer.postgres"), + }) + if err != nil { + logger.Error("failed to initialize customer repository", "error", err) + os.Exit(1) + } + + return customerservice.New(customerservice.Config{ + Adapter: customerAdapter, + }) +} diff --git a/app/common/wire.go b/app/common/wire.go index 275322793..d00ea879e 100644 --- a/app/common/wire.go +++ b/app/common/wire.go @@ -39,6 +39,7 @@ var Config = wire.NewSet( wire.FieldsOf(new(config.Configuration), "Namespace"), wire.FieldsOf(new(config.Configuration), "Events"), wire.FieldsOf(new(config.Configuration), "BalanceWorker"), + wire.FieldsOf(new(config.Configuration), "Billing"), wire.FieldsOf(new(config.Configuration), "Notification"), wire.FieldsOf(new(config.Configuration), "Sink"), ) diff --git a/app/config/billing.go b/app/config/billing.go index 3ff38cb8e..a15c4e1c9 100644 --- a/app/config/billing.go +++ b/app/config/billing.go @@ -4,6 +4,7 @@ import "github.com/spf13/viper" type BillingConfiguration struct { Enabled bool + Worker BillingWorkerConfiguration } func (c BillingConfiguration) Validate() error { diff --git a/app/config/billingworker.go b/app/config/billingworker.go new file mode 100644 index 000000000..8d01dd3c7 --- /dev/null +++ b/app/config/billingworker.go @@ -0,0 +1,29 @@ +package config + +import ( + "errors" + + "github.com/spf13/viper" + + "github.com/openmeterio/openmeter/pkg/errorsx" +) + +type BillingWorkerConfiguration struct { + ConsumerConfiguration `mapstructure:",squash"` +} + +func (c BillingWorkerConfiguration) Validate() error { + var errs []error + + if err := c.ConsumerConfiguration.Validate(); err != nil { + errs = append(errs, errorsx.WithPrefix(err, "consumer")) + } + + return errors.Join(errs...) +} + +func ConfigureBillingWorker(v *viper.Viper) { + ConfigureConsumer(v, "billingWorker") + v.SetDefault("billingWorker.dlq.topic", "om_sys.billing_worker_dlq") + v.SetDefault("billingWorker.consumerGroupName", "om_billing_worker") +} diff --git a/app/config/config.go b/app/config/config.go index 1861b10c3..172074f66 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -165,6 +165,7 @@ func SetViperDefaults(v *viper.Viper, flags *pflag.FlagSet) { ConfigurePortal(v) ConfigureEvents(v) ConfigureBalanceWorker(v) + ConfigureBillingWorker(v) ConfigureNotification(v) ConfigureStripe(v) ConfigureBilling(v) diff --git a/cmd/billing-worker/.air.toml b/cmd/billing-worker/.air.toml new file mode 100644 index 000000000..cb007a3db --- /dev/null +++ b/cmd/billing-worker/.air.toml @@ -0,0 +1,44 @@ +root = "." +testdata_dir = "testdata" +tmp_dir = "tmp" + +[build] + args_bin = ["--config", "./config.yaml", "--telemetry-address", ":10002"] + bin = "./tmp/openmeter-balance-worker" + cmd = "go build -tags dynamic -o ./tmp/openmeter-balance-worker ./cmd/balance-worker" + delay = 0 + exclude_dir = ["assets", "ci", "deploy", "docs", "examples", "testdata", "quickstart", "tmp", "vendor", "api/client", "node_modules"] + exclude_file = [] + exclude_regex = ["_test.go"] + exclude_unchanged = false + follow_symlink = false + full_bin = "" + include_dir = [] + include_ext = ["go", "tpl", "tmpl", "html", "yml", "yaml", "sql", "json"] + include_file = [] + kill_delay = "0s" + log = "build-errors.log" + poll = false + poll_interval = 0 + rerun = false + rerun_delay = 500 + send_interrupt = false + stop_on_error = false + +[color] + app = "" + build = "yellow" + main = "magenta" + runner = "green" + watcher = "cyan" + +[log] + main_only = false + time = false + +[misc] + clean_on_exit = false + +[screen] + clear_on_rebuild = false + keep_scroll = true diff --git a/cmd/billing-worker/main.go b/cmd/billing-worker/main.go new file mode 100644 index 000000000..6ac4dc6c9 --- /dev/null +++ b/cmd/billing-worker/main.go @@ -0,0 +1,87 @@ +package main + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + + "github.com/spf13/pflag" + "github.com/spf13/viper" + + "github.com/openmeterio/openmeter/app/config" +) + +func main() { + v, flags := viper.NewWithOptions(viper.WithDecodeHook(config.DecodeHook())), pflag.NewFlagSet("OpenMeter", pflag.ExitOnError) + ctx := context.Background() + + config.SetViperDefaults(v, flags) + + flags.String("config", "", "Configuration file") + flags.Bool("version", false, "Show version information") + flags.Bool("validate", false, "Validate configuration and exit") + + _ = flags.Parse(os.Args[1:]) + + if v, _ := flags.GetBool("version"); v { + fmt.Printf("%s version %s (%s) built on %s\n", "Open Meter", version, revision, revisionDate) + + os.Exit(0) + } + + if c, _ := flags.GetString("config"); c != "" { + v.SetConfigFile(c) + } + + err := v.ReadInConfig() + if err != nil && !errors.As(err, &viper.ConfigFileNotFoundError{}) { + panic(err) + } + + var conf config.Configuration + err = v.Unmarshal(&conf) + if err != nil { + panic(err) + } + + err = conf.Validate() + if err != nil { + println("configuration error:") + println(err.Error()) + os.Exit(1) + } + + if v, _ := flags.GetBool("validate"); v { + os.Exit(0) + } + + app, cleanup, err := initializeApplication(ctx, conf) + if err != nil { + slog.Error("failed to initialize application", "error", err) + + cleanup() + + os.Exit(1) + } + defer cleanup() + + app.SetGlobals() + + logger := app.Logger + + // Validate service prerequisites + + if !conf.Events.Enabled { + logger.Error("events are disabled, exiting") + os.Exit(1) + } + + if err := app.Migrate(ctx); err != nil { + logger.Error("failed to initialize database", "error", err) + os.Exit(1) + } + + app.Run() +} diff --git a/cmd/billing-worker/version.go b/cmd/billing-worker/version.go new file mode 100644 index 000000000..24582ea13 --- /dev/null +++ b/cmd/billing-worker/version.go @@ -0,0 +1,34 @@ +package main + +import "runtime/debug" + +// Provisioned by ldflags. +var version string + +//nolint:gochecknoglobals +var ( + revision string + revisionDate string +) + +//nolint:gochecknoinits,goconst +func init() { + if version == "" { + version = "unknown" + } + + buildInfo, _ := debug.ReadBuildInfo() + + revision = "unknown" + revisionDate = "unknown" + + for _, setting := range buildInfo.Settings { + if setting.Key == "vcs.revision" { + revision = setting.Value + } + + if setting.Key == "vcs.time" { + revisionDate = setting.Value + } + } +} diff --git a/cmd/billing-worker/wire.go b/cmd/billing-worker/wire.go new file mode 100644 index 000000000..2cdf3d3e2 --- /dev/null +++ b/cmd/billing-worker/wire.go @@ -0,0 +1,45 @@ +//go:build wireinject +// +build wireinject + +package main + +import ( + "context" + "log/slog" + + "github.com/google/wire" + + "github.com/openmeterio/openmeter/app/common" + "github.com/openmeterio/openmeter/app/config" +) + +type Application struct { + common.GlobalInitializer + common.Migrator + common.Runner + + Logger *slog.Logger +} + +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { + wire.Build( + metadata, + common.Config, + common.Framework, + common.Telemetry, + common.NewDefaultTextMapPropagator, + common.Database, + common.ClickHouse, + common.KafkaTopic, + common.Watermill, + common.WatermillRouter, + common.OpenMeter, + common.BillingWorker, + wire.Struct(new(Application), "*"), + ) + return Application{}, nil, nil +} + +func metadata(conf config.Configuration) common.Metadata { + return common.NewMetadata(conf, version, "billing-worker") +} diff --git a/cmd/billing-worker/wire_gen.go b/cmd/billing-worker/wire_gen.go new file mode 100644 index 000000000..87c05c09e --- /dev/null +++ b/cmd/billing-worker/wire_gen.go @@ -0,0 +1,218 @@ +// Code generated by Wire. DO NOT EDIT. + +//go:generate go run -mod=mod github.com/google/wire/cmd/wire +//go:build !wireinject +// +build !wireinject + +package main + +import ( + "context" + "github.com/openmeterio/openmeter/app/common" + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/registry/builder" + "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" + "github.com/openmeterio/openmeter/openmeter/watermill/router" + "log/slog" +) + +// Injectors from wire.go: + +func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { + telemetryConfig := conf.Telemetry + logTelemetryConfig := telemetryConfig.Log + commonMetadata := metadata(conf) + resource := common.NewTelemetryResource(commonMetadata) + loggerProvider, cleanup, err := common.NewLoggerProvider(ctx, logTelemetryConfig, resource) + if err != nil { + return Application{}, nil, err + } + logger := common.NewLogger(logTelemetryConfig, resource, loggerProvider, commonMetadata) + metricsTelemetryConfig := telemetryConfig.Metrics + meterProvider, cleanup2, err := common.NewMeterProvider(ctx, metricsTelemetryConfig, resource, logger) + if err != nil { + cleanup() + return Application{}, nil, err + } + traceTelemetryConfig := telemetryConfig.Trace + tracerProvider, cleanup3, err := common.NewTracerProvider(ctx, traceTelemetryConfig, resource, logger) + if err != nil { + cleanup2() + cleanup() + return Application{}, nil, err + } + textMapPropagator := common.NewDefaultTextMapPropagator() + globalInitializer := common.GlobalInitializer{ + Logger: logger, + MeterProvider: meterProvider, + TracerProvider: tracerProvider, + TextMapPropagator: textMapPropagator, + } + postgresConfig := conf.Postgres + meter := common.NewMeter(meterProvider, commonMetadata) + driver, cleanup4, err := common.NewPostgresDriver(ctx, postgresConfig, meterProvider, meter, tracerProvider, logger) + if err != nil { + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + db := common.NewDB(driver) + entPostgresDriver, cleanup5 := common.NewEntPostgresDriver(db, logger) + client := common.NewEntClient(entPostgresDriver) + migrator := common.Migrator{ + Config: postgresConfig, + Client: client, + Logger: logger, + } + eventsConfiguration := conf.Events + balanceWorkerConfiguration := conf.BalanceWorker + ingestConfiguration := conf.Ingest + kafkaIngestConfiguration := ingestConfiguration.Kafka + kafkaConfiguration := kafkaIngestConfiguration.KafkaConfiguration + brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) + subscriber, err := common.BalanceWorkerSubscriber(balanceWorkerConfiguration, brokerOptions) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + v := common.BalanceWorkerProvisionTopics(balanceWorkerConfiguration) + adminClient, err := common.NewKafkaAdminClient(kafkaConfiguration) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + topicProvisionerConfig := kafkaIngestConfiguration.TopicProvisionerConfig + kafkaTopicProvisionerConfig := common.NewKafkaTopicProvisionerConfig(adminClient, logger, meter, topicProvisionerConfig) + topicProvisioner, err := common.NewKafkaTopicProvisioner(kafkaTopicProvisionerConfig) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + publisherOptions := kafka.PublisherOptions{ + Broker: brokerOptions, + ProvisionTopics: v, + TopicProvisioner: topicProvisioner, + } + publisher, cleanup6, err := common.NewPublisher(ctx, publisherOptions, logger) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + consumerConfiguration := balanceWorkerConfiguration.ConsumerConfiguration + options := router.Options{ + Subscriber: subscriber, + Publisher: publisher, + Logger: logger, + MetricMeter: meter, + Config: consumerConfiguration, + } + eventbusPublisher, err := common.NewEventBusPublisher(publisher, eventsConfiguration, logger) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + aggregationConfiguration := conf.Aggregation + clickHouseAggregationConfiguration := aggregationConfiguration.ClickHouse + v2, err := common.NewClickHouse(clickHouseAggregationConfiguration) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + v3 := conf.Meters + inMemoryRepository := common.NewMeterRepository(v3) + connector, err := common.NewStreamingConnector(ctx, aggregationConfiguration, v2, inMemoryRepository, logger) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + entitlementOptions := registrybuilder.EntitlementOptions{ + DatabaseClient: client, + StreamingConnector: connector, + Logger: logger, + MeterRepository: inMemoryRepository, + Publisher: eventbusPublisher, + } + entitlement := registrybuilder.GetEntitlementRegistry(entitlementOptions) + entitlementRepo := common.NewEntitlementRepo(client) + subjectResolver := common.SubjectResolver() + workerOptions := common.NewBalanceWorkerOptions(eventsConfiguration, options, eventbusPublisher, entitlement, entitlementRepo, subjectResolver, logger) + worker, err := common.NewBalanceWorker(workerOptions) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + health := common.NewHealthChecker(logger) + telemetryHandler := common.NewTelemetryHandler(metricsTelemetryConfig, health) + v4, cleanup7 := common.NewTelemetryServer(telemetryConfig, telemetryHandler) + group := common.BalanceWorkerGroup(ctx, worker, v4) + runner := common.Runner{ + Group: group, + Logger: logger, + } + application := Application{ + GlobalInitializer: globalInitializer, + Migrator: migrator, + Runner: runner, + Logger: logger, + } + return application, func() { + cleanup7() + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + }, nil +} + +// wire.go: + +type Application struct { + common.GlobalInitializer + common.Migrator + common.Runner + + Logger *slog.Logger +} + +func metadata(conf config.Configuration) common.Metadata { + return common.NewMetadata(conf, version, "balance-worker") +} diff --git a/openmeter/billing/worker/worker.go b/openmeter/billing/worker/worker.go new file mode 100644 index 000000000..c9876455b --- /dev/null +++ b/openmeter/billing/worker/worker.go @@ -0,0 +1,119 @@ +package billingworker + +import ( + "context" + "fmt" + "log/slog" + + "github.com/ThreeDotsLabs/watermill/message" + + "github.com/openmeterio/openmeter/openmeter/billing" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" + "github.com/openmeterio/openmeter/openmeter/watermill/grouphandler" + "github.com/openmeterio/openmeter/openmeter/watermill/router" +) + +type WorkerOptions struct { + SystemEventsTopic string + + Router router.Options + EventBus eventbus.Publisher + + Logger *slog.Logger + + BillingService billing.Service + // External connectors +} + +func (w WorkerOptions) Validate() error { + if w.SystemEventsTopic == "" { + return fmt.Errorf("system events topic is required") + } + + if err := w.Router.Validate(); err != nil { + return fmt.Errorf("router: %w", err) + } + + if w.EventBus == nil { + return fmt.Errorf("event bus is required") + } + + if w.Logger == nil { + return fmt.Errorf("logger is required") + } + + if w.BillingService == nil { + return fmt.Errorf("billing service is required") + } + + return nil +} + +type Worker struct { + router *message.Router + + billingService billing.Service +} + +func New(opts WorkerOptions) (*Worker, error) { + if err := opts.Validate(); err != nil { + return nil, err + } + + worker := &Worker{ + billingService: opts.BillingService, + } + + router, err := router.NewDefaultRouter(opts.Router) + if err != nil { + return nil, err + } + + worker.router = router + + eventHandler, err := worker.eventHandler(opts) + if err != nil { + return nil, err + } + + router.AddNoPublisherHandler( + "billing_worker_system_events", + opts.SystemEventsTopic, + opts.Router.Subscriber, + eventHandler, + ) + + return worker, nil +} + +func (w *Worker) eventHandler(opts WorkerOptions) (message.NoPublishHandlerFunc, error) { + return grouphandler.NewNoPublishingHandler( + opts.EventBus.Marshaler(), + opts.Router.MetricMeter, + + /* TODO: + + // Entitlement created event + grouphandler.NewGroupEventHandler(func(ctx context.Context, event *entitlement.EntitlementCreatedEvent) error { + return w.opts.EventBus. + WithContext(ctx). + PublishIfNoError(w.handleEntitlementEvent( + ctx, + NamespacedID{Namespace: event.Namespace.ID, ID: event.ID}, + metadata.ComposeResourcePath(event.Namespace.ID, metadata.EntityEntitlement, event.ID), + )) + }), */ + ) +} + +func (w *Worker) Run(ctx context.Context) error { + return w.router.Run(ctx) +} + +func (w *Worker) Close() error { + if err := w.router.Close(); err != nil { + return err + } + + return nil +} From 72b9c342f45adf2adcf6a044077201cce3f2d63f Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Thu, 12 Dec 2024 16:19:36 +0100 Subject: [PATCH 2/4] fix(billing): wire --- app/common/apps.go | 87 +++++++++++++++++++++++ app/common/billing.go | 16 ++++- app/common/customer.go | 24 +++++++ app/common/openmeter.go | 20 ------ app/common/productcatalog.go | 16 +++++ app/common/secret.go | 19 +++++ app/common/wire.go | 1 + app/config/apps.go | 21 +++++- app/config/config.go | 6 -- app/config/config_test.go | 10 ++- app/config/stripe.go | 45 ------------ cmd/billing-worker/wire_gen.go | 122 ++++++++++++++++++++++++++++----- cmd/server/main.go | 2 +- 13 files changed, 287 insertions(+), 102 deletions(-) create mode 100644 app/common/apps.go create mode 100644 app/common/productcatalog.go create mode 100644 app/common/secret.go delete mode 100644 app/config/stripe.go diff --git a/app/common/apps.go b/app/common/apps.go new file mode 100644 index 000000000..afc656e77 --- /dev/null +++ b/app/common/apps.go @@ -0,0 +1,87 @@ +package common + +import ( + "context" + "fmt" + "log/slog" + + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/app" + appadapter "github.com/openmeterio/openmeter/openmeter/app/adapter" + appsandbox "github.com/openmeterio/openmeter/openmeter/app/sandbox" + appservice "github.com/openmeterio/openmeter/openmeter/app/service" + appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe" + appstripeadapter "github.com/openmeterio/openmeter/openmeter/app/stripe/adapter" + appstripeservice "github.com/openmeterio/openmeter/openmeter/app/stripe/service" + "github.com/openmeterio/openmeter/openmeter/customer" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/namespace" + "github.com/openmeterio/openmeter/openmeter/secret" +) + +func NewAppService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration) (app.Service, error) { + appAdapter, err := appadapter.New(appadapter.Config{ + Client: db, + BaseURL: appsConfig.BaseURL, + }) + if err != nil { + return nil, fmt.Errorf("failed to create app adapter: %w", err) + } + + return appservice.New(appservice.Config{ + Adapter: appAdapter, + }) +} + +func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appService app.Service, customerService customer.Service, secretService secret.Service) (appstripe.Service, error) { + appStripeAdapter, err := appstripeadapter.New(appstripeadapter.Config{ + Client: db, + AppService: appService, + CustomerService: customerService, + SecretService: secretService, + }) + if err != nil { + return nil, fmt.Errorf("failed to create appstripe adapter: %w", err) + } + + _, err = appstripeservice.New(appstripeservice.Config{ + Adapter: appStripeAdapter, + AppService: appService, + SecretService: secretService, + }) + if err != nil { + return nil, fmt.Errorf("failed to create appstripe service: %w", err) + } + + return appstripeservice.New(appstripeservice.Config{ + Adapter: appStripeAdapter, + AppService: appService, + SecretService: secretService, + }) +} + +func NewAppSandbox(ctx context.Context, logger *slog.Logger, db *entdb.Client, appService app.Service, namespaceManager *namespace.Manager) (appsandbox.App, error) { + _, err := appsandbox.NewFactory(appsandbox.Config{ + AppService: appService, + }) + if err != nil { + return appsandbox.App{}, fmt.Errorf("failed to initialize app sandbox factory: %w", err) + } + + app, err := appsandbox.AutoProvision(ctx, appsandbox.AutoProvisionInput{ + Namespace: namespaceManager.GetDefaultNamespace(), + AppService: appService, + }) + if err != nil { + return appsandbox.App{}, fmt.Errorf("failed to auto-provision sandbox app: %w", err) + } + + logger.Info("sandbox app auto-provisioned", "app_id", app.GetID().ID) + + appSandbox, ok := app.(appsandbox.App) + if !ok { + return appsandbox.App{}, fmt.Errorf("failed to cast app to sandbox app") + } + + return appSandbox, nil +} diff --git a/app/common/billing.go b/app/common/billing.go index b78575c00..d612278de 100644 --- a/app/common/billing.go +++ b/app/common/billing.go @@ -12,6 +12,8 @@ import ( "github.com/openmeterio/openmeter/app/config" "github.com/openmeterio/openmeter/openmeter/app" + appsandbox "github.com/openmeterio/openmeter/openmeter/app/sandbox" + appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe" "github.com/openmeterio/openmeter/openmeter/billing" billingadapter "github.com/openmeterio/openmeter/openmeter/billing/adapter" billingservice "github.com/openmeterio/openmeter/openmeter/billing/service" @@ -105,9 +107,11 @@ func BillingWorkerGroup( func BillingService( logger *slog.Logger, db *entdb.Client, + appService app.Service, + appStripeService appstripe.Service, + appSandbox appsandbox.App, billingConfig config.BillingConfiguration, customerService customer.Service, - appService app.Service, featureConnector feature.FeatureConnector, meterRepo meter.Repository, streamingConnector streaming.Connector, @@ -126,10 +130,10 @@ func BillingService( return billingservice.New(billingservice.Config{ Adapter: adapter, - CustomerService: customerService, AppService: appService, - Logger: logger, + CustomerService: customerService, FeatureService: featureConnector, + Logger: logger, MeterRepo: meterRepo, StreamingConnector: streamingConnector, }) @@ -143,6 +147,12 @@ var BillingWorker = wire.NewSet( BillingWorkerSubscriber, NewCustomerService, + NewAppService, + NewAppStripeService, + NewAppSandbox, + NewFeatureConnector, + NewSecretService, + BillingService, NewBillingWorkerOptions, diff --git a/app/common/customer.go b/app/common/customer.go index 805d0c79a..d4a501b47 100644 --- a/app/common/customer.go +++ b/app/common/customer.go @@ -1 +1,25 @@ package common + +import ( + "fmt" + "log/slog" + + "github.com/openmeterio/openmeter/openmeter/customer" + customeradapter "github.com/openmeterio/openmeter/openmeter/customer/adapter" + customerservice "github.com/openmeterio/openmeter/openmeter/customer/service" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" +) + +func NewCustomerService(logger *slog.Logger, db *entdb.Client) (customer.Service, error) { + customerAdapter, err := customeradapter.New(customeradapter.Config{ + Client: db, + Logger: logger.WithGroup("customer.postgres"), + }) + if err != nil { + return nil, fmt.Errorf("failed to create customer adapter: %w", err) + } + + return customerservice.New(customerservice.Config{ + Adapter: customerAdapter, + }) +} diff --git a/app/common/openmeter.go b/app/common/openmeter.go index 2e8860490..bf60d27ad 100644 --- a/app/common/openmeter.go +++ b/app/common/openmeter.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log/slog" - "os" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ThreeDotsLabs/watermill/message" @@ -13,10 +12,6 @@ import ( "go.opentelemetry.io/otel/metric" "github.com/openmeterio/openmeter/app/config" - "github.com/openmeterio/openmeter/openmeter/customer" - customeradapter "github.com/openmeterio/openmeter/openmeter/customer/adapter" - customerservice "github.com/openmeterio/openmeter/openmeter/customer/service" - entdb "github.com/openmeterio/openmeter/openmeter/ent/db" "github.com/openmeterio/openmeter/openmeter/ingest" "github.com/openmeterio/openmeter/openmeter/ingest/ingestadapter" "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest" @@ -256,18 +251,3 @@ func NewFlushHandler( return flushHandlerMux, nil } - -func NewCustomerService(logger *slog.Logger, db *entdb.Client) (customer.Service, error) { - customerAdapter, err := customeradapter.New(customeradapter.Config{ - Client: db, - Logger: logger.WithGroup("customer.postgres"), - }) - if err != nil { - logger.Error("failed to initialize customer repository", "error", err) - os.Exit(1) - } - - return customerservice.New(customerservice.Config{ - Adapter: customerAdapter, - }) -} diff --git a/app/common/productcatalog.go b/app/common/productcatalog.go new file mode 100644 index 000000000..23162f398 --- /dev/null +++ b/app/common/productcatalog.go @@ -0,0 +1,16 @@ +package common + +import ( + "log/slog" + + "github.com/openmeterio/openmeter/app/config" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/meter" + productcatalogpgadapter "github.com/openmeterio/openmeter/openmeter/productcatalog/adapter" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" +) + +func NewFeatureConnector(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, meterRepo meter.Repository) feature.FeatureConnector { + featureRepo := productcatalogpgadapter.NewPostgresFeatureRepo(db, logger) + return feature.NewFeatureConnector(featureRepo, meterRepo) +} diff --git a/app/common/secret.go b/app/common/secret.go new file mode 100644 index 000000000..2d7d9d803 --- /dev/null +++ b/app/common/secret.go @@ -0,0 +1,19 @@ +package common + +import ( + "log/slog" + + "github.com/openmeterio/openmeter/app/config" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/secret" + secretadapter "github.com/openmeterio/openmeter/openmeter/secret/adapter" + secretservice "github.com/openmeterio/openmeter/openmeter/secret/service" +) + +func NewSecretService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration) (secret.Service, error) { + secretAdapter := secretadapter.New() + + return secretservice.New(secretservice.Config{ + Adapter: secretAdapter, + }) +} diff --git a/app/common/wire.go b/app/common/wire.go index d00ea879e..47edbcc7b 100644 --- a/app/common/wire.go +++ b/app/common/wire.go @@ -22,6 +22,7 @@ import ( ) var Config = wire.NewSet( + wire.FieldsOf(new(config.Configuration), "Apps"), wire.FieldsOf(new(config.Configuration), "Aggregation"), wire.FieldsOf(new(config.AggregationConfiguration), "ClickHouse"), diff --git a/app/config/apps.go b/app/config/apps.go index fd9db6f75..eb5581363 100644 --- a/app/config/apps.go +++ b/app/config/apps.go @@ -1,15 +1,32 @@ package config -import "github.com/spf13/viper" +import ( + "errors" + + "github.com/spf13/viper" +) type AppsConfiguration struct { Enabled bool + // BaseURL is the base URL for the Stripe webhook. + BaseURL string `yaml:"baseURL"` } func (c AppsConfiguration) Validate() error { - return nil + var errs []error + + if !c.Enabled { + return nil + } + + if c.BaseURL == "" { + errs = append(errs, errors.New("base URL is required")) + } + + return errors.Join(errs...) } func ConfigureApps(v *viper.Viper) { v.SetDefault("apps.enabled", false) + v.SetDefault("apps.baseURL", "https://example.com") } diff --git a/app/config/config.go b/app/config/config.go index 172074f66..57fc47a23 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -35,7 +35,6 @@ type Configuration struct { ProductCatalog ProductCatalogConfiguration Billing BillingConfiguration Apps AppsConfiguration - StripeApp StripeAppConfig Svix SvixConfig } @@ -111,10 +110,6 @@ func (c Configuration) Validate() error { } } - if err := c.StripeApp.Validate(); err != nil { - errs = append(errs, errorsx.WithPrefix(err, "stripe app")) - } - if err := c.ProductCatalog.Validate(); err != nil { errs = append(errs, errorsx.WithPrefix(err, "product catalog")) } @@ -167,7 +162,6 @@ func SetViperDefaults(v *viper.Viper, flags *pflag.FlagSet) { ConfigureBalanceWorker(v) ConfigureBillingWorker(v) ConfigureNotification(v) - ConfigureStripe(v) ConfigureBilling(v) ConfigureProductCatalog(v) ConfigureApps(v) diff --git a/app/config/config_test.go b/app/config/config_test.go index 4909c5a42..2bde498f9 100644 --- a/app/config/config_test.go +++ b/app/config/config_test.go @@ -43,7 +43,10 @@ func TestComplete(t *testing.T) { Postgres: PostgresConfig{ AutoMigrate: AutoMigrateEnt, }, - Address: "127.0.0.1:8888", + Address: "127.0.0.1:8888", + Apps: AppsConfiguration{ + BaseURL: "https://example.com", + }, Environment: "local", Telemetry: TelemetryConfig{ Address: "127.0.0.1:10000", @@ -307,11 +310,6 @@ func TestComplete(t *testing.T) { ServerURL: "http://127.0.0.1:8071", Debug: true, }, - StripeApp: StripeAppConfig{ - IncomingWebhook: StripeAppIncomingWebhookConfig{ - BaseURL: "https://example.com", - }, - }, } assert.Equal(t, expected, actual) diff --git a/app/config/stripe.go b/app/config/stripe.go deleted file mode 100644 index cc12efca6..000000000 --- a/app/config/stripe.go +++ /dev/null @@ -1,45 +0,0 @@ -package config - -import ( - "errors" - "fmt" - - "github.com/spf13/viper" -) - -// StripeAppConfig is the configuration for Stripe. -type StripeAppConfig struct { - IncomingWebhook StripeAppIncomingWebhookConfig `yaml:"incomingWebhook"` -} - -// Validate validates the configuration. -func (c StripeAppConfig) Validate() error { - var errs []error - - if err := c.IncomingWebhook.Validate(); err != nil { - errs = append(errs, fmt.Errorf("incoming webhook: %w", err)) - } - - return errors.Join(errs...) -} - -// StripeAppIncomingWebhookConfig is the configuration for the Stripe webhook. -type StripeAppIncomingWebhookConfig struct { - // BaseURL is the base URL for the Stripe webhook. - BaseURL string `yaml:"baseURL"` -} - -func (c StripeAppIncomingWebhookConfig) Validate() error { - var errs []error - - if c.BaseURL == "" { - errs = append(errs, errors.New("base URL is required")) - } - - return errors.Join(errs...) -} - -// ConfigureStripe configures the default values for Stripe. -func ConfigureStripe(v *viper.Viper) { - v.SetDefault("stripeApp.incomingWebhook.baseURL", "https://example.com") -} diff --git a/cmd/billing-worker/wire_gen.go b/cmd/billing-worker/wire_gen.go index 87c05c09e..d7f19e74c 100644 --- a/cmd/billing-worker/wire_gen.go +++ b/cmd/billing-worker/wire_gen.go @@ -10,7 +10,6 @@ import ( "context" "github.com/openmeterio/openmeter/app/common" "github.com/openmeterio/openmeter/app/config" - "github.com/openmeterio/openmeter/openmeter/registry/builder" "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" "github.com/openmeterio/openmeter/openmeter/watermill/router" "log/slog" @@ -66,12 +65,12 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl Logger: logger, } eventsConfiguration := conf.Events - balanceWorkerConfiguration := conf.BalanceWorker + billingConfiguration := conf.Billing ingestConfiguration := conf.Ingest kafkaIngestConfiguration := ingestConfiguration.Kafka kafkaConfiguration := kafkaIngestConfiguration.KafkaConfiguration brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) - subscriber, err := common.BalanceWorkerSubscriber(balanceWorkerConfiguration, brokerOptions) + subscriber, err := common.BillingWorkerSubscriber(billingConfiguration, brokerOptions) if err != nil { cleanup5() cleanup4() @@ -80,7 +79,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - v := common.BalanceWorkerProvisionTopics(balanceWorkerConfiguration) + v := common.BillingWorkerProvisionTopics(billingConfiguration) adminClient, err := common.NewKafkaAdminClient(kafkaConfiguration) if err != nil { cleanup5() @@ -115,7 +114,8 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - consumerConfiguration := balanceWorkerConfiguration.ConsumerConfiguration + billingWorkerConfiguration := billingConfiguration.Worker + consumerConfiguration := billingWorkerConfiguration.ConsumerConfiguration options := router.Options{ Subscriber: subscriber, Publisher: publisher, @@ -133,6 +133,67 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } + appsConfiguration := conf.Apps + service, err := common.NewAppService(logger, client, appsConfiguration) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + customerService, err := common.NewCustomerService(logger, client) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + secretService, err := common.NewSecretService(logger, client, appsConfiguration) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + appstripeService, err := common.NewAppStripeService(logger, client, service, customerService, secretService) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + namespacedTopicResolver, err := common.NewNamespacedTopicResolver(conf) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + namespaceHandler, err := common.NewKafkaNamespaceHandler(namespacedTopicResolver, topicProvisioner, kafkaIngestConfiguration) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } aggregationConfiguration := conf.Aggregation clickHouseAggregationConfiguration := aggregationConfiguration.ClickHouse v2, err := common.NewClickHouse(clickHouseAggregationConfiguration) @@ -157,18 +218,41 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - entitlementOptions := registrybuilder.EntitlementOptions{ - DatabaseClient: client, - StreamingConnector: connector, - Logger: logger, - MeterRepository: inMemoryRepository, - Publisher: eventbusPublisher, + v4 := common.NewNamespaceHandlers(namespaceHandler, connector) + namespaceConfiguration := conf.Namespace + manager, err := common.NewNamespaceManager(v4, namespaceConfiguration) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + app, err := common.NewAppSandbox(ctx, logger, client, service, manager) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + featureConnector := common.NewFeatureConnector(logger, client, appsConfiguration, inMemoryRepository) + billingService, err := common.BillingService(logger, client, service, appstripeService, app, billingConfiguration, customerService, featureConnector, inMemoryRepository, connector) + if err != nil { + cleanup6() + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err } - entitlement := registrybuilder.GetEntitlementRegistry(entitlementOptions) - entitlementRepo := common.NewEntitlementRepo(client) - subjectResolver := common.SubjectResolver() - workerOptions := common.NewBalanceWorkerOptions(eventsConfiguration, options, eventbusPublisher, entitlement, entitlementRepo, subjectResolver, logger) - worker, err := common.NewBalanceWorker(workerOptions) + workerOptions := common.NewBillingWorkerOptions(eventsConfiguration, options, eventbusPublisher, billingService, logger) + worker, err := common.NewBillingWorker(workerOptions) if err != nil { cleanup6() cleanup5() @@ -180,8 +264,8 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl } health := common.NewHealthChecker(logger) telemetryHandler := common.NewTelemetryHandler(metricsTelemetryConfig, health) - v4, cleanup7 := common.NewTelemetryServer(telemetryConfig, telemetryHandler) - group := common.BalanceWorkerGroup(ctx, worker, v4) + v5, cleanup7 := common.NewTelemetryServer(telemetryConfig, telemetryHandler) + group := common.BillingWorkerGroup(ctx, worker, v5) runner := common.Runner{ Group: group, Logger: logger, @@ -214,5 +298,5 @@ type Application struct { } func metadata(conf config.Configuration) common.Metadata { - return common.NewMetadata(conf, version, "balance-worker") + return common.NewMetadata(conf, version, "billing-worker") } diff --git a/cmd/server/main.go b/cmd/server/main.go index df8500f51..7bb06bdf2 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -213,7 +213,7 @@ func main() { var appAdapter apppkg.Adapter appAdapter, err = appadapter.New(appadapter.Config{ Client: app.EntClient, - BaseURL: conf.StripeApp.IncomingWebhook.BaseURL, + BaseURL: conf.Apps.BaseURL, }) if err != nil { logger.Error("failed to initialize app repository", "error", err) From 68c706cebc83b6add66ab32e14f5d097c454a14b Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Fri, 13 Dec 2024 14:31:06 +0100 Subject: [PATCH 3/4] feat(billing): wires --- Makefile | 11 ++ app/common/app.go | 110 ++++++++--- app/common/apps.go | 87 -------- app/common/billing.go | 109 +--------- app/common/customer.go | 11 ++ app/common/kafka.go | 36 ++++ app/common/metadata.go | 24 +++ app/common/meter.go | 13 ++ app/common/namespace.go | 24 +++ app/common/openmeter.go | 253 ------------------------ app/common/openmeter_billingworker.go | 114 +++++++++++ app/common/openmeter_notification.go | 20 ++ app/common/openmeter_provisiontopics.go | 44 ----- app/common/openmeter_server.go | 105 ++++++++++ app/common/openmeter_sinkworker.go | 70 +++++++ app/common/productcatalog.go | 53 ++++- app/common/runner.go | 24 +++ app/common/secret.go | 9 +- app/common/streaming.go | 67 +++++++ app/common/wire.go | 1 - app/config/billing.go | 2 + app/config/billingworker.go | 7 +- app/config/config.go | 1 - cmd/billing-worker/.air.toml | 4 +- cmd/billing-worker/main.go | 5 + cmd/billing-worker/wire_gen.go | 8 +- cmd/server/main.go | 176 +---------------- cmd/server/wire.go | 57 ++++-- cmd/server/wire_gen.go | 186 +++++++++++------ 29 files changed, 856 insertions(+), 775 deletions(-) delete mode 100644 app/common/apps.go create mode 100644 app/common/metadata.go create mode 100644 app/common/meter.go create mode 100644 app/common/namespace.go delete mode 100644 app/common/openmeter.go create mode 100644 app/common/openmeter_billingworker.go create mode 100644 app/common/openmeter_notification.go delete mode 100644 app/common/openmeter_provisiontopics.go create mode 100644 app/common/openmeter_server.go create mode 100644 app/common/openmeter_sinkworker.go create mode 100644 app/common/runner.go create mode 100644 app/common/streaming.go diff --git a/Makefile b/Makefile index 10074c4d4..c1c60fed6 100644 --- a/Makefile +++ b/Makefile @@ -60,6 +60,11 @@ build-balance-worker: ## Build balance-worker binary $(call print-target) go build -o build/balance-worker ./cmd/balance-worker +.PHONY: build-billing-worker +build-billing-worker: ## Build billing-worker binary + $(call print-target) + go build -o build/billing-worker ./cmd/billing-worker + .PHONY: build-notification-service build-notification-service: ## Build notification-service binary $(call print-target) @@ -86,6 +91,12 @@ balance-worker: ## Run balance-worker $(call print-target) air -c ./cmd/balance-worker/.air.toml +.PHONY: billing-worker +billing-worker: ## Run billing-worker + @ if [ config.yaml -ot config.example.yaml ]; then diff -u config.yaml config.example.yaml || (echo "!!! The configuration example changed. Please update your config.yaml file accordingly (or at least touch it). !!!" && false); fi + $(call print-target) + air -c ./cmd/billing-worker/.air.toml + .PHONY: notification-service notification-service: ## Run notification-service @ if [ config.yaml -ot config.example.yaml ]; then diff -u config.yaml config.example.yaml || (echo "!!! The configuration example changed. Please update your config.yaml file accordingly (or at least touch it). !!!" && false); fi diff --git a/app/common/app.go b/app/common/app.go index 6cc70d880..066ad10e5 100644 --- a/app/common/app.go +++ b/app/common/app.go @@ -1,44 +1,102 @@ package common import ( - "errors" + "context" "fmt" "log/slog" - "net/http" - "github.com/oklog/run" + "github.com/google/wire" "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/app" + appadapter "github.com/openmeterio/openmeter/openmeter/app/adapter" + appsandbox "github.com/openmeterio/openmeter/openmeter/app/sandbox" + appservice "github.com/openmeterio/openmeter/openmeter/app/service" + appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe" + appstripeadapter "github.com/openmeterio/openmeter/openmeter/app/stripe/adapter" + appstripeservice "github.com/openmeterio/openmeter/openmeter/app/stripe/service" + "github.com/openmeterio/openmeter/openmeter/customer" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" + "github.com/openmeterio/openmeter/openmeter/namespace" + "github.com/openmeterio/openmeter/openmeter/secret" ) -// Metadata provides information about the service to components that need it (eg. telemetry). -type Metadata struct { - ServiceName string - Version string - Environment string - OpenTelemetryName string -} +var App = wire.NewSet( + wire.FieldsOf(new(config.Configuration), "Apps"), + + NewAppService, + NewAppStripeService, + NewAppSandbox, +) + +func NewAppService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration) (app.Service, error) { + // TODO: remove this check after enabled by default + if !appsConfig.Enabled || db == nil { + return nil, nil + } -func NewMetadata(conf config.Configuration, version string, serviceName string) Metadata { - return Metadata{ - ServiceName: fmt.Sprintf("openmeter-%s", serviceName), - Version: version, - Environment: conf.Environment, - OpenTelemetryName: fmt.Sprintf("openmeter.io/%s", serviceName), + appAdapter, err := appadapter.New(appadapter.Config{ + Client: db, + BaseURL: appsConfig.BaseURL, + }) + if err != nil { + return nil, fmt.Errorf("failed to create app adapter: %w", err) } + + return appservice.New(appservice.Config{ + Adapter: appAdapter, + }) } -// Runner is a helper struct that runs a group of services. -type Runner struct { - Group run.Group - Logger *slog.Logger +func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, appService app.Service, customerService customer.Service, secretService secret.Service) (appstripe.Service, error) { + // TODO: remove this check after enabled by default + if !appsConfig.Enabled || db == nil { + return nil, nil + } + + appStripeAdapter, err := appstripeadapter.New(appstripeadapter.Config{ + Client: db, + AppService: appService, + CustomerService: customerService, + SecretService: secretService, + }) + if err != nil { + return nil, fmt.Errorf("failed to create appstripe adapter: %w", err) + } + + return appstripeservice.New(appstripeservice.Config{ + Adapter: appStripeAdapter, + AppService: appService, + SecretService: secretService, + }) } -func (r Runner) Run() { - err := r.Group.Run() - if e := (run.SignalError{}); errors.As(err, &e) { - r.Logger.Info("received signal: shutting down", slog.String("signal", e.Signal.String())) - } else if !errors.Is(err, http.ErrServerClosed) { - r.Logger.Error("application stopped due to error", slog.String("error", err.Error())) +func NewAppSandbox(ctx context.Context, logger *slog.Logger, appsConfig config.AppsConfiguration, appService app.Service, namespaceManager *namespace.Manager) (*appsandbox.App, error) { + if !appsConfig.Enabled { + return nil, nil + } + + _, err := appsandbox.NewFactory(appsandbox.Config{ + AppService: appService, + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize app sandbox factory: %w", err) + } + + app, err := appsandbox.AutoProvision(ctx, appsandbox.AutoProvisionInput{ + Namespace: namespaceManager.GetDefaultNamespace(), + AppService: appService, + }) + if err != nil { + return nil, fmt.Errorf("failed to auto-provision sandbox app: %w", err) } + + logger.Info("sandbox app auto-provisioned", "app_id", app.GetID().ID) + + appSandbox, ok := app.(appsandbox.App) + if !ok { + return nil, fmt.Errorf("failed to cast app to sandbox app") + } + + return &appSandbox, nil } diff --git a/app/common/apps.go b/app/common/apps.go deleted file mode 100644 index afc656e77..000000000 --- a/app/common/apps.go +++ /dev/null @@ -1,87 +0,0 @@ -package common - -import ( - "context" - "fmt" - "log/slog" - - "github.com/openmeterio/openmeter/app/config" - "github.com/openmeterio/openmeter/openmeter/app" - appadapter "github.com/openmeterio/openmeter/openmeter/app/adapter" - appsandbox "github.com/openmeterio/openmeter/openmeter/app/sandbox" - appservice "github.com/openmeterio/openmeter/openmeter/app/service" - appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe" - appstripeadapter "github.com/openmeterio/openmeter/openmeter/app/stripe/adapter" - appstripeservice "github.com/openmeterio/openmeter/openmeter/app/stripe/service" - "github.com/openmeterio/openmeter/openmeter/customer" - entdb "github.com/openmeterio/openmeter/openmeter/ent/db" - "github.com/openmeterio/openmeter/openmeter/namespace" - "github.com/openmeterio/openmeter/openmeter/secret" -) - -func NewAppService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration) (app.Service, error) { - appAdapter, err := appadapter.New(appadapter.Config{ - Client: db, - BaseURL: appsConfig.BaseURL, - }) - if err != nil { - return nil, fmt.Errorf("failed to create app adapter: %w", err) - } - - return appservice.New(appservice.Config{ - Adapter: appAdapter, - }) -} - -func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appService app.Service, customerService customer.Service, secretService secret.Service) (appstripe.Service, error) { - appStripeAdapter, err := appstripeadapter.New(appstripeadapter.Config{ - Client: db, - AppService: appService, - CustomerService: customerService, - SecretService: secretService, - }) - if err != nil { - return nil, fmt.Errorf("failed to create appstripe adapter: %w", err) - } - - _, err = appstripeservice.New(appstripeservice.Config{ - Adapter: appStripeAdapter, - AppService: appService, - SecretService: secretService, - }) - if err != nil { - return nil, fmt.Errorf("failed to create appstripe service: %w", err) - } - - return appstripeservice.New(appstripeservice.Config{ - Adapter: appStripeAdapter, - AppService: appService, - SecretService: secretService, - }) -} - -func NewAppSandbox(ctx context.Context, logger *slog.Logger, db *entdb.Client, appService app.Service, namespaceManager *namespace.Manager) (appsandbox.App, error) { - _, err := appsandbox.NewFactory(appsandbox.Config{ - AppService: appService, - }) - if err != nil { - return appsandbox.App{}, fmt.Errorf("failed to initialize app sandbox factory: %w", err) - } - - app, err := appsandbox.AutoProvision(ctx, appsandbox.AutoProvisionInput{ - Namespace: namespaceManager.GetDefaultNamespace(), - AppService: appService, - }) - if err != nil { - return appsandbox.App{}, fmt.Errorf("failed to auto-provision sandbox app: %w", err) - } - - logger.Info("sandbox app auto-provisioned", "app_id", app.GetID().ID) - - appSandbox, ok := app.(appsandbox.App) - if !ok { - return appsandbox.App{}, fmt.Errorf("failed to cast app to sandbox app") - } - - return appSandbox, nil -} diff --git a/app/common/billing.go b/app/common/billing.go index d612278de..ae7bc3653 100644 --- a/app/common/billing.go +++ b/app/common/billing.go @@ -1,14 +1,10 @@ package common import ( - "context" "fmt" "log/slog" - "syscall" - "github.com/ThreeDotsLabs/watermill/message" "github.com/google/wire" - "github.com/oklog/run" "github.com/openmeterio/openmeter/app/config" "github.com/openmeterio/openmeter/openmeter/app" @@ -17,99 +13,23 @@ import ( "github.com/openmeterio/openmeter/openmeter/billing" billingadapter "github.com/openmeterio/openmeter/openmeter/billing/adapter" billingservice "github.com/openmeterio/openmeter/openmeter/billing/service" - billingworker "github.com/openmeterio/openmeter/openmeter/billing/worker" "github.com/openmeterio/openmeter/openmeter/customer" entdb "github.com/openmeterio/openmeter/openmeter/ent/db" "github.com/openmeterio/openmeter/openmeter/meter" "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" "github.com/openmeterio/openmeter/openmeter/streaming" - watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" - "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" - "github.com/openmeterio/openmeter/openmeter/watermill/router" - pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" ) -func BillingWorkerProvisionTopics(conf config.BillingConfiguration) []pkgkafka.TopicConfig { - var provisionTopics []pkgkafka.TopicConfig - - if conf.Worker.DLQ.AutoProvision.Enabled { - provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{ - Name: conf.Worker.DLQ.Topic, - Partitions: conf.Worker.DLQ.AutoProvision.Partitions, - RetentionTime: pkgkafka.TimeDurationMilliSeconds(conf.Worker.DLQ.AutoProvision.Retention), - }) - } - - return provisionTopics -} - -// no closer function: the subscriber is closed by the router/worker -func BillingWorkerSubscriber(conf config.BillingConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error) { - subscriber, err := watermillkafka.NewSubscriber(watermillkafka.SubscriberOptions{ - Broker: brokerOptions, - ConsumerGroupName: conf.Worker.ConsumerGroupName, - }) - if err != nil { - return nil, fmt.Errorf("failed to initialize Kafka subscriber: %w", err) - } - - return subscriber, nil -} - -func NewBillingWorkerOptions( - eventConfig config.EventsConfiguration, - routerOptions router.Options, - eventBus eventbus.Publisher, - billingService billing.Service, - logger *slog.Logger, -) billingworker.WorkerOptions { - return billingworker.WorkerOptions{ - SystemEventsTopic: eventConfig.SystemEvents.Topic, - - Router: routerOptions, - EventBus: eventBus, - BillingService: billingService, - Logger: logger, - } -} - -func NewBillingWorker(workerOptions billingworker.WorkerOptions) (*billingworker.Worker, error) { - worker, err := billingworker.New(workerOptions) - if err != nil { - return nil, fmt.Errorf("failed to initialize worker: %w", err) - } - - return worker, nil -} - -func BillingWorkerGroup( - ctx context.Context, - worker *billingworker.Worker, - telemetryServer TelemetryServer, -) run.Group { - var group run.Group - - group.Add( - func() error { return telemetryServer.ListenAndServe() }, - func(err error) { _ = telemetryServer.Shutdown(ctx) }, - ) - - group.Add( - func() error { return worker.Run(ctx) }, - func(err error) { _ = worker.Close() }, - ) - - group.Add(run.SignalHandler(ctx, syscall.SIGINT, syscall.SIGTERM)) - - return group -} +var Billing = wire.NewSet( + BillingService, +) func BillingService( logger *slog.Logger, db *entdb.Client, appService app.Service, appStripeService appstripe.Service, - appSandbox appsandbox.App, + appSandbox *appsandbox.App, billingConfig config.BillingConfiguration, customerService customer.Service, featureConnector feature.FeatureConnector, @@ -138,24 +58,3 @@ func BillingService( StreamingConnector: streamingConnector, }) } - -var BillingWorker = wire.NewSet( - wire.FieldsOf(new(config.BillingWorkerConfiguration), "ConsumerConfiguration"), - wire.FieldsOf(new(config.BillingConfiguration), "Worker"), - - BillingWorkerProvisionTopics, - BillingWorkerSubscriber, - - NewCustomerService, - NewAppService, - NewAppStripeService, - NewAppSandbox, - NewFeatureConnector, - NewSecretService, - - BillingService, - - NewBillingWorkerOptions, - NewBillingWorker, - BillingWorkerGroup, -) diff --git a/app/common/customer.go b/app/common/customer.go index d4a501b47..ae67366ce 100644 --- a/app/common/customer.go +++ b/app/common/customer.go @@ -4,13 +4,24 @@ import ( "fmt" "log/slog" + "github.com/google/wire" + "github.com/openmeterio/openmeter/openmeter/customer" customeradapter "github.com/openmeterio/openmeter/openmeter/customer/adapter" customerservice "github.com/openmeterio/openmeter/openmeter/customer/service" entdb "github.com/openmeterio/openmeter/openmeter/ent/db" ) +var Customer = wire.NewSet( + NewCustomerService, +) + func NewCustomerService(logger *slog.Logger, db *entdb.Client) (customer.Service, error) { + // TODO: remove this check after enabled by default + if db == nil { + return nil, nil + } + customerAdapter, err := customeradapter.New(customeradapter.Config{ Client: db, Logger: logger.WithGroup("customer.postgres"), diff --git a/app/common/kafka.go b/app/common/kafka.go index c6a842f3f..227258f48 100644 --- a/app/common/kafka.go +++ b/app/common/kafka.go @@ -8,6 +8,10 @@ import ( "go.opentelemetry.io/otel/metric" "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest" + "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/topicresolver" + "github.com/openmeterio/openmeter/openmeter/namespace" + "github.com/openmeterio/openmeter/openmeter/streaming" pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics" ) @@ -83,3 +87,35 @@ func NewKafkaTopicProvisioner(conf pkgkafka.TopicProvisionerConfig) (pkgkafka.To return topicProvisioner, nil } + +func NewNamespaceHandlers( + kafkaHandler *kafkaingest.NamespaceHandler, + clickHouseHandler streaming.Connector, +) []namespace.Handler { + return []namespace.Handler{ + kafkaHandler, + clickHouseHandler, + } +} + +func NewKafkaNamespaceHandler( + topicResolver topicresolver.Resolver, + topicProvisioner pkgkafka.TopicProvisioner, + conf config.KafkaIngestConfiguration, +) (*kafkaingest.NamespaceHandler, error) { + return &kafkaingest.NamespaceHandler{ + TopicResolver: topicResolver, + TopicProvisioner: topicProvisioner, + Partitions: conf.Partitions, + DeletionEnabled: conf.NamespaceDeletionEnabled, + }, nil +} + +func NewNamespacedTopicResolver(config config.Configuration) (*topicresolver.NamespacedTopicResolver, error) { + topicResolver, err := topicresolver.NewNamespacedTopicResolver(config.Ingest.Kafka.EventsTopicTemplate) + if err != nil { + return nil, fmt.Errorf("failed to create topic name resolver: %w", err) + } + + return topicResolver, nil +} diff --git a/app/common/metadata.go b/app/common/metadata.go new file mode 100644 index 000000000..fd25429a3 --- /dev/null +++ b/app/common/metadata.go @@ -0,0 +1,24 @@ +package common + +import ( + "fmt" + + "github.com/openmeterio/openmeter/app/config" +) + +// Metadata provides information about the service to components that need it (eg. telemetry). +type Metadata struct { + ServiceName string + Version string + Environment string + OpenTelemetryName string +} + +func NewMetadata(conf config.Configuration, version string, serviceName string) Metadata { + return Metadata{ + ServiceName: fmt.Sprintf("openmeter-%s", serviceName), + Version: version, + Environment: conf.Environment, + OpenTelemetryName: fmt.Sprintf("openmeter.io/%s", serviceName), + } +} diff --git a/app/common/meter.go b/app/common/meter.go new file mode 100644 index 000000000..5f3dfc762 --- /dev/null +++ b/app/common/meter.go @@ -0,0 +1,13 @@ +package common + +import ( + "github.com/samber/lo" + + "github.com/openmeterio/openmeter/openmeter/meter" + "github.com/openmeterio/openmeter/pkg/models" + "github.com/openmeterio/openmeter/pkg/slicesx" +) + +func NewMeterRepository(meters []*models.Meter) *meter.InMemoryRepository { + return meter.NewInMemoryRepository(slicesx.Map(meters, lo.FromPtr[models.Meter])) +} diff --git a/app/common/namespace.go b/app/common/namespace.go new file mode 100644 index 000000000..c9648729f --- /dev/null +++ b/app/common/namespace.go @@ -0,0 +1,24 @@ +package common + +import ( + "fmt" + + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/namespace" +) + +func NewNamespaceManager( + handlers []namespace.Handler, + conf config.NamespaceConfiguration, +) (*namespace.Manager, error) { + manager, err := namespace.NewManager(namespace.ManagerConfig{ + Handlers: handlers, + DefaultNamespace: conf.Default, + DisableManagement: conf.DisableManagement, + }) + if err != nil { + return nil, fmt.Errorf("create namespace manager: %v", err) + } + + return manager, nil +} diff --git a/app/common/openmeter.go b/app/common/openmeter.go deleted file mode 100644 index bf60d27ad..000000000 --- a/app/common/openmeter.go +++ /dev/null @@ -1,253 +0,0 @@ -package common - -import ( - "context" - "fmt" - "log/slog" - - "github.com/ClickHouse/clickhouse-go/v2" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/confluentinc/confluent-kafka-go/v2/kafka" - "github.com/samber/lo" - "go.opentelemetry.io/otel/metric" - - "github.com/openmeterio/openmeter/app/config" - "github.com/openmeterio/openmeter/openmeter/ingest" - "github.com/openmeterio/openmeter/openmeter/ingest/ingestadapter" - "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest" - "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/serializer" - "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/topicresolver" - "github.com/openmeterio/openmeter/openmeter/meter" - "github.com/openmeterio/openmeter/openmeter/namespace" - "github.com/openmeterio/openmeter/openmeter/sink/flushhandler" - "github.com/openmeterio/openmeter/openmeter/sink/flushhandler/ingestnotification" - "github.com/openmeterio/openmeter/openmeter/streaming" - "github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/materialized_view" - "github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/raw_events" - watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" - "github.com/openmeterio/openmeter/openmeter/watermill/driver/noop" - "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" - pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" - "github.com/openmeterio/openmeter/pkg/models" - "github.com/openmeterio/openmeter/pkg/slicesx" -) - -func NewMeterRepository(meters []*models.Meter) *meter.InMemoryRepository { - return meter.NewInMemoryRepository(slicesx.Map(meters, lo.FromPtr[models.Meter])) -} - -func NewStreamingConnector( - ctx context.Context, - conf config.AggregationConfiguration, - clickHouse clickhouse.Conn, - meterRepository meter.Repository, - logger *slog.Logger, -) (streaming.Connector, error) { - var ( - connector streaming.Connector - err error - ) - - switch conf.Engine { - case config.AggregationEngineClickHouseRaw: - connector, err = raw_events.NewConnector(ctx, raw_events.ConnectorConfig{ - ClickHouse: clickHouse, - Database: conf.ClickHouse.Database, - EventsTableName: conf.EventsTableName, - Logger: logger, - AsyncInsert: conf.AsyncInsert, - AsyncInsertWait: conf.AsyncInsertWait, - InsertQuerySettings: conf.InsertQuerySettings, - }) - if err != nil { - return nil, fmt.Errorf("init clickhouse raw engine: %w", err) - } - - case config.AggregationEngineClickHouseMV: - connector, err = materialized_view.NewConnector(ctx, materialized_view.ConnectorConfig{ - ClickHouse: clickHouse, - Database: conf.ClickHouse.Database, - EventsTableName: conf.EventsTableName, - Logger: logger, - AsyncInsert: conf.AsyncInsert, - AsyncInsertWait: conf.AsyncInsertWait, - InsertQuerySettings: conf.InsertQuerySettings, - - Meters: meterRepository, - PopulateMeter: conf.PopulateMeter, - CreateOrReplaceMeter: conf.CreateOrReplaceMeter, - QueryRawEvents: conf.QueryRawEvents, - }) - if err != nil { - return nil, fmt.Errorf("init clickhouse mv engine: %w", err) - } - default: - return nil, fmt.Errorf("invalid aggregation engine: %s", conf.Engine) - } - - return connector, nil -} - -func NewNamespacedTopicResolver(config config.Configuration) (*topicresolver.NamespacedTopicResolver, error) { - topicResolver, err := topicresolver.NewNamespacedTopicResolver(config.Ingest.Kafka.EventsTopicTemplate) - if err != nil { - return nil, fmt.Errorf("failed to create topic name resolver: %w", err) - } - - return topicResolver, nil -} - -func NewKafkaIngestCollector( - config config.KafkaIngestConfiguration, - producer *kafka.Producer, - topicResolver topicresolver.Resolver, - topicProvisioner pkgkafka.TopicProvisioner, -) (*kafkaingest.Collector, error) { - collector, err := kafkaingest.NewCollector( - producer, - serializer.NewJSONSerializer(), - topicResolver, - topicProvisioner, - config.Partitions, - ) - if err != nil { - return nil, fmt.Errorf("failed to initialize kafka ingest: %w", err) - } - - return collector, nil -} - -func NewIngestCollector( - conf config.Configuration, - kafkaCollector *kafkaingest.Collector, - logger *slog.Logger, - meter metric.Meter, -) (ingest.Collector, func(), error) { - collector, err := ingestadapter.WithMetrics(kafkaCollector, meter) - if err != nil { - return nil, nil, fmt.Errorf("init kafka ingest: %w", err) - } - - if conf.Dedupe.Enabled { - deduplicator, err := conf.Dedupe.NewDeduplicator() - if err != nil { - return nil, nil, fmt.Errorf("failed to initialize deduplicator: %w", err) - } - - return ingest.DeduplicatingCollector{ - Collector: collector, - Deduplicator: deduplicator, - }, func() { - collector.Close() - - logger.Info("closing deduplicator") - - err := deduplicator.Close() - if err != nil { - logger.Error("failed to close deduplicator", "error", err) - } - }, nil - } - - // Note: closing function is called by dedupe as well - return collector, func() { collector.Close() }, nil -} - -func NewKafkaNamespaceHandler( - topicResolver topicresolver.Resolver, - topicProvisioner pkgkafka.TopicProvisioner, - conf config.KafkaIngestConfiguration, -) (*kafkaingest.NamespaceHandler, error) { - return &kafkaingest.NamespaceHandler{ - TopicResolver: topicResolver, - TopicProvisioner: topicProvisioner, - Partitions: conf.Partitions, - DeletionEnabled: conf.NamespaceDeletionEnabled, - }, nil -} - -func NewNamespaceHandlers( - kafkaHandler *kafkaingest.NamespaceHandler, - clickHouseHandler streaming.Connector, -) []namespace.Handler { - return []namespace.Handler{ - kafkaHandler, - clickHouseHandler, - } -} - -func NewNamespaceManager( - handlers []namespace.Handler, - conf config.NamespaceConfiguration, -) (*namespace.Manager, error) { - manager, err := namespace.NewManager(namespace.ManagerConfig{ - Handlers: handlers, - DefaultNamespace: conf.Default, - DisableManagement: conf.DisableManagement, - }) - if err != nil { - return nil, fmt.Errorf("create namespace manager: %v", err) - } - - return manager, nil -} - -// TODO: create a separate file or package for each application instead - -func NewServerPublisher( - ctx context.Context, - conf config.EventsConfiguration, - options watermillkafka.PublisherOptions, - logger *slog.Logger, -) (message.Publisher, func(), error) { - if !conf.Enabled { - return &noop.Publisher{}, func() {}, nil - } - - return NewPublisher(ctx, options, logger) -} - -// the sink-worker requires control over how the publisher is closed -func NewSinkWorkerPublisher( - ctx context.Context, - options watermillkafka.PublisherOptions, - logger *slog.Logger, -) (message.Publisher, func(), error) { - publisher, _, err := NewPublisher(ctx, options, logger) - - return publisher, func() {}, err -} - -func NewFlushHandler( - eventsConfig config.EventsConfiguration, - sinkConfig config.SinkConfiguration, - messagePublisher message.Publisher, - eventPublisher eventbus.Publisher, - logger *slog.Logger, - meter metric.Meter, -) (flushhandler.FlushEventHandler, error) { - if !eventsConfig.Enabled { - return nil, nil - } - - flushHandlerMux := flushhandler.NewFlushEventHandlers() - - // We should only close the producer once the ingest events are fully processed - flushHandlerMux.OnDrainComplete(func() { - logger.Info("shutting down kafka producer") - if err := messagePublisher.Close(); err != nil { - logger.Error("failed to close kafka producer", slog.String("error", err.Error())) - } - }) - - ingestNotificationHandler, err := ingestnotification.NewHandler(logger, meter, eventPublisher, ingestnotification.HandlerConfig{ - MaxEventsInBatch: sinkConfig.IngestNotifications.MaxEventsInBatch, - }) - if err != nil { - return nil, err - } - - flushHandlerMux.AddHandler(ingestNotificationHandler) - - return flushHandlerMux, nil -} diff --git a/app/common/openmeter_billingworker.go b/app/common/openmeter_billingworker.go new file mode 100644 index 000000000..68ce471db --- /dev/null +++ b/app/common/openmeter_billingworker.go @@ -0,0 +1,114 @@ +package common + +import ( + "context" + "fmt" + "log/slog" + "syscall" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/google/wire" + "github.com/oklog/run" + + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/billing" + billingworker "github.com/openmeterio/openmeter/openmeter/billing/worker" + watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" + "github.com/openmeterio/openmeter/openmeter/watermill/router" + pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" +) + +var BillingWorker = wire.NewSet( + wire.FieldsOf(new(config.BillingWorkerConfiguration), "ConsumerConfiguration"), + wire.FieldsOf(new(config.BillingConfiguration), "Worker"), + + App, + Customer, + Secret, + + BillingWorkerProvisionTopics, + BillingWorkerSubscriber, + + NewFeatureConnector, + BillingService, + + NewBillingWorkerOptions, + NewBillingWorker, + BillingWorkerGroup, +) + +func BillingWorkerProvisionTopics(conf config.BillingConfiguration) []pkgkafka.TopicConfig { + var provisionTopics []pkgkafka.TopicConfig + + if conf.Worker.DLQ.AutoProvision.Enabled { + provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{ + Name: conf.Worker.DLQ.Topic, + Partitions: conf.Worker.DLQ.AutoProvision.Partitions, + RetentionTime: pkgkafka.TimeDurationMilliSeconds(conf.Worker.DLQ.AutoProvision.Retention), + }) + } + + return provisionTopics +} + +// no closer function: the subscriber is closed by the router/worker +func BillingWorkerSubscriber(conf config.BillingConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error) { + subscriber, err := watermillkafka.NewSubscriber(watermillkafka.SubscriberOptions{ + Broker: brokerOptions, + ConsumerGroupName: conf.Worker.ConsumerGroupName, + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize Kafka subscriber: %w", err) + } + + return subscriber, nil +} + +func NewBillingWorkerOptions( + eventConfig config.EventsConfiguration, + routerOptions router.Options, + eventBus eventbus.Publisher, + billingService billing.Service, + logger *slog.Logger, +) billingworker.WorkerOptions { + return billingworker.WorkerOptions{ + SystemEventsTopic: eventConfig.SystemEvents.Topic, + + Router: routerOptions, + EventBus: eventBus, + BillingService: billingService, + Logger: logger, + } +} + +func NewBillingWorker(workerOptions billingworker.WorkerOptions) (*billingworker.Worker, error) { + worker, err := billingworker.New(workerOptions) + if err != nil { + return nil, fmt.Errorf("failed to initialize worker: %w", err) + } + + return worker, nil +} + +func BillingWorkerGroup( + ctx context.Context, + worker *billingworker.Worker, + telemetryServer TelemetryServer, +) run.Group { + var group run.Group + + group.Add( + func() error { return telemetryServer.ListenAndServe() }, + func(err error) { _ = telemetryServer.Shutdown(ctx) }, + ) + + group.Add( + func() error { return worker.Run(ctx) }, + func(err error) { _ = worker.Close() }, + ) + + group.Add(run.SignalHandler(ctx, syscall.SIGINT, syscall.SIGTERM)) + + return group +} diff --git a/app/common/openmeter_notification.go b/app/common/openmeter_notification.go new file mode 100644 index 000000000..09026bf3c --- /dev/null +++ b/app/common/openmeter_notification.go @@ -0,0 +1,20 @@ +package common + +import ( + "github.com/openmeterio/openmeter/app/config" + pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" +) + +func NotificationServiceProvisionTopics(conf config.NotificationConfiguration) []pkgkafka.TopicConfig { + var provisionTopics []pkgkafka.TopicConfig + + if conf.Consumer.DLQ.AutoProvision.Enabled { + provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{ + Name: conf.Consumer.DLQ.Topic, + Partitions: conf.Consumer.DLQ.AutoProvision.Partitions, + RetentionTime: pkgkafka.TimeDurationMilliSeconds(conf.Consumer.DLQ.AutoProvision.Retention), + }) + } + + return provisionTopics +} diff --git a/app/common/openmeter_provisiontopics.go b/app/common/openmeter_provisiontopics.go deleted file mode 100644 index 746fb6960..000000000 --- a/app/common/openmeter_provisiontopics.go +++ /dev/null @@ -1,44 +0,0 @@ -package common - -import ( - "github.com/openmeterio/openmeter/app/config" - pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" -) - -// TODO: create a separate file or package for each application instead - -func NotificationServiceProvisionTopics(conf config.NotificationConfiguration) []pkgkafka.TopicConfig { - var provisionTopics []pkgkafka.TopicConfig - - if conf.Consumer.DLQ.AutoProvision.Enabled { - provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{ - Name: conf.Consumer.DLQ.Topic, - Partitions: conf.Consumer.DLQ.AutoProvision.Partitions, - RetentionTime: pkgkafka.TimeDurationMilliSeconds(conf.Consumer.DLQ.AutoProvision.Retention), - }) - } - - return provisionTopics -} - -func ServerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig { - var provisionTopics []pkgkafka.TopicConfig - - if conf.SystemEvents.AutoProvision.Enabled { - provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{ - Name: conf.SystemEvents.Topic, - Partitions: conf.SystemEvents.AutoProvision.Partitions, - }) - } - - return provisionTopics -} - -func SinkWorkerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig { - return []pkgkafka.TopicConfig{ - { - Name: conf.IngestEvents.Topic, - Partitions: conf.IngestEvents.AutoProvision.Partitions, - }, - } -} diff --git a/app/common/openmeter_server.go b/app/common/openmeter_server.go new file mode 100644 index 000000000..5f1e3a31f --- /dev/null +++ b/app/common/openmeter_server.go @@ -0,0 +1,105 @@ +package common + +import ( + "context" + "fmt" + "log/slog" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "go.opentelemetry.io/otel/metric" + + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/ingest" + "github.com/openmeterio/openmeter/openmeter/ingest/ingestadapter" + "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest" + "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/serializer" + "github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/topicresolver" + watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" + "github.com/openmeterio/openmeter/openmeter/watermill/driver/noop" + pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" +) + +func NewKafkaIngestCollector( + config config.KafkaIngestConfiguration, + producer *kafka.Producer, + topicResolver topicresolver.Resolver, + topicProvisioner pkgkafka.TopicProvisioner, +) (*kafkaingest.Collector, error) { + collector, err := kafkaingest.NewCollector( + producer, + serializer.NewJSONSerializer(), + topicResolver, + topicProvisioner, + config.Partitions, + ) + if err != nil { + return nil, fmt.Errorf("failed to initialize kafka ingest: %w", err) + } + + return collector, nil +} + +func NewIngestCollector( + conf config.Configuration, + kafkaCollector *kafkaingest.Collector, + logger *slog.Logger, + meter metric.Meter, +) (ingest.Collector, func(), error) { + collector, err := ingestadapter.WithMetrics(kafkaCollector, meter) + if err != nil { + return nil, nil, fmt.Errorf("init kafka ingest: %w", err) + } + + if conf.Dedupe.Enabled { + deduplicator, err := conf.Dedupe.NewDeduplicator() + if err != nil { + return nil, nil, fmt.Errorf("failed to initialize deduplicator: %w", err) + } + + return ingest.DeduplicatingCollector{ + Collector: collector, + Deduplicator: deduplicator, + }, func() { + collector.Close() + + logger.Info("closing deduplicator") + + err := deduplicator.Close() + if err != nil { + logger.Error("failed to close deduplicator", "error", err) + } + }, nil + } + + // Note: closing function is called by dedupe as well + return collector, func() { collector.Close() }, nil +} + +// TODO: create a separate file or package for each application instead + +func NewServerPublisher( + ctx context.Context, + conf config.EventsConfiguration, + options watermillkafka.PublisherOptions, + logger *slog.Logger, +) (message.Publisher, func(), error) { + if !conf.Enabled { + return &noop.Publisher{}, func() {}, nil + } + + return NewPublisher(ctx, options, logger) +} + +func ServerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig { + var provisionTopics []pkgkafka.TopicConfig + + if conf.SystemEvents.AutoProvision.Enabled { + provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{ + Name: conf.SystemEvents.Topic, + Partitions: conf.SystemEvents.AutoProvision.Partitions, + }) + } + + return provisionTopics +} diff --git a/app/common/openmeter_sinkworker.go b/app/common/openmeter_sinkworker.go new file mode 100644 index 000000000..b09a5ed15 --- /dev/null +++ b/app/common/openmeter_sinkworker.go @@ -0,0 +1,70 @@ +package common + +import ( + "context" + "log/slog" + + "github.com/ThreeDotsLabs/watermill/message" + "go.opentelemetry.io/otel/metric" + + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/sink/flushhandler" + "github.com/openmeterio/openmeter/openmeter/sink/flushhandler/ingestnotification" + watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" + pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" +) + +// the sink-worker requires control over how the publisher is closed +func NewSinkWorkerPublisher( + ctx context.Context, + options watermillkafka.PublisherOptions, + logger *slog.Logger, +) (message.Publisher, func(), error) { + publisher, _, err := NewPublisher(ctx, options, logger) + + return publisher, func() {}, err +} + +func NewFlushHandler( + eventsConfig config.EventsConfiguration, + sinkConfig config.SinkConfiguration, + messagePublisher message.Publisher, + eventPublisher eventbus.Publisher, + logger *slog.Logger, + meter metric.Meter, +) (flushhandler.FlushEventHandler, error) { + if !eventsConfig.Enabled { + return nil, nil + } + + flushHandlerMux := flushhandler.NewFlushEventHandlers() + + // We should only close the producer once the ingest events are fully processed + flushHandlerMux.OnDrainComplete(func() { + logger.Info("shutting down kafka producer") + if err := messagePublisher.Close(); err != nil { + logger.Error("failed to close kafka producer", slog.String("error", err.Error())) + } + }) + + ingestNotificationHandler, err := ingestnotification.NewHandler(logger, meter, eventPublisher, ingestnotification.HandlerConfig{ + MaxEventsInBatch: sinkConfig.IngestNotifications.MaxEventsInBatch, + }) + if err != nil { + return nil, err + } + + flushHandlerMux.AddHandler(ingestNotificationHandler) + + return flushHandlerMux, nil +} + +func SinkWorkerProvisionTopics(conf config.EventsConfiguration) []pkgkafka.TopicConfig { + return []pkgkafka.TopicConfig{ + { + Name: conf.IngestEvents.Topic, + Partitions: conf.IngestEvents.AutoProvision.Partitions, + }, + } +} diff --git a/app/common/productcatalog.go b/app/common/productcatalog.go index 23162f398..3d9ee29c5 100644 --- a/app/common/productcatalog.go +++ b/app/common/productcatalog.go @@ -1,16 +1,67 @@ package common import ( + "fmt" "log/slog" + "github.com/google/wire" + "github.com/openmeterio/openmeter/app/config" entdb "github.com/openmeterio/openmeter/openmeter/ent/db" "github.com/openmeterio/openmeter/openmeter/meter" productcatalogpgadapter "github.com/openmeterio/openmeter/openmeter/productcatalog/adapter" "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/openmeter/productcatalog/plan" + planadapter "github.com/openmeterio/openmeter/openmeter/productcatalog/plan/adapter" + planservice "github.com/openmeterio/openmeter/openmeter/productcatalog/plan/service" +) + +var ProductCatalog = wire.NewSet( + wire.FieldsOf(new(config.Configuration), "ProductCatalog"), + + Feature, + Plan, +) + +var Feature = wire.NewSet( + NewFeatureConnector, +) + +var Plan = wire.NewSet( + NewPlanService, ) -func NewFeatureConnector(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, meterRepo meter.Repository) feature.FeatureConnector { +func NewFeatureConnector(logger *slog.Logger, db *entdb.Client, meterRepo meter.Repository) feature.FeatureConnector { + // TODO: remove this check after enabled by default + if db == nil { + return nil + } + featureRepo := productcatalogpgadapter.NewPostgresFeatureRepo(db, logger) return feature.NewFeatureConnector(featureRepo, meterRepo) } + +func NewPlanService(logger *slog.Logger, db *entdb.Client, productCatalogConf config.ProductCatalogConfiguration, featureConnector feature.FeatureConnector) (plan.Service, error) { + // TODO: remove this check after enabled by default + if db == nil { + return nil, nil + } + + if !productCatalogConf.Enabled { + return nil, nil + } + + adapter, err := planadapter.New(planadapter.Config{ + Client: db, + Logger: logger.With("subsystem", "productcatalog.plan"), + }) + if err != nil { + return nil, fmt.Errorf("failed to initialize plan adapter: %w", err) + } + + return planservice.New(planservice.Config{ + Feature: featureConnector, + Adapter: adapter, + Logger: logger.With("subsystem", "productcatalog.plan"), + }) +} diff --git a/app/common/runner.go b/app/common/runner.go new file mode 100644 index 000000000..7735320bb --- /dev/null +++ b/app/common/runner.go @@ -0,0 +1,24 @@ +package common + +import ( + "errors" + "log/slog" + "net/http" + + "github.com/oklog/run" +) + +// Runner is a helper struct that runs a group of services. +type Runner struct { + Group run.Group + Logger *slog.Logger +} + +func (r Runner) Run() { + err := r.Group.Run() + if e := (run.SignalError{}); errors.As(err, &e) { + r.Logger.Info("received signal: shutting down", slog.String("signal", e.Signal.String())) + } else if !errors.Is(err, http.ErrServerClosed) { + r.Logger.Error("application stopped due to error", slog.String("error", err.Error())) + } +} diff --git a/app/common/secret.go b/app/common/secret.go index 2d7d9d803..dcc989056 100644 --- a/app/common/secret.go +++ b/app/common/secret.go @@ -3,14 +3,19 @@ package common import ( "log/slog" - "github.com/openmeterio/openmeter/app/config" + "github.com/google/wire" + entdb "github.com/openmeterio/openmeter/openmeter/ent/db" "github.com/openmeterio/openmeter/openmeter/secret" secretadapter "github.com/openmeterio/openmeter/openmeter/secret/adapter" secretservice "github.com/openmeterio/openmeter/openmeter/secret/service" ) -func NewSecretService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration) (secret.Service, error) { +var Secret = wire.NewSet( + NewSecretService, +) + +func NewSecretService(logger *slog.Logger, db *entdb.Client) (secret.Service, error) { secretAdapter := secretadapter.New() return secretservice.New(secretservice.Config{ diff --git a/app/common/streaming.go b/app/common/streaming.go new file mode 100644 index 000000000..f839adb3a --- /dev/null +++ b/app/common/streaming.go @@ -0,0 +1,67 @@ +package common + +import ( + "context" + "fmt" + "log/slog" + + "github.com/ClickHouse/clickhouse-go/v2" + + "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/meter" + "github.com/openmeterio/openmeter/openmeter/streaming" + "github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/materialized_view" + "github.com/openmeterio/openmeter/openmeter/streaming/clickhouse/raw_events" +) + +func NewStreamingConnector( + ctx context.Context, + conf config.AggregationConfiguration, + clickHouse clickhouse.Conn, + meterRepository meter.Repository, + logger *slog.Logger, +) (streaming.Connector, error) { + var ( + connector streaming.Connector + err error + ) + + switch conf.Engine { + case config.AggregationEngineClickHouseRaw: + connector, err = raw_events.NewConnector(ctx, raw_events.ConnectorConfig{ + ClickHouse: clickHouse, + Database: conf.ClickHouse.Database, + EventsTableName: conf.EventsTableName, + Logger: logger, + AsyncInsert: conf.AsyncInsert, + AsyncInsertWait: conf.AsyncInsertWait, + InsertQuerySettings: conf.InsertQuerySettings, + }) + if err != nil { + return nil, fmt.Errorf("init clickhouse raw engine: %w", err) + } + + case config.AggregationEngineClickHouseMV: + connector, err = materialized_view.NewConnector(ctx, materialized_view.ConnectorConfig{ + ClickHouse: clickHouse, + Database: conf.ClickHouse.Database, + EventsTableName: conf.EventsTableName, + Logger: logger, + AsyncInsert: conf.AsyncInsert, + AsyncInsertWait: conf.AsyncInsertWait, + InsertQuerySettings: conf.InsertQuerySettings, + + Meters: meterRepository, + PopulateMeter: conf.PopulateMeter, + CreateOrReplaceMeter: conf.CreateOrReplaceMeter, + QueryRawEvents: conf.QueryRawEvents, + }) + if err != nil { + return nil, fmt.Errorf("init clickhouse mv engine: %w", err) + } + default: + return nil, fmt.Errorf("invalid aggregation engine: %s", conf.Engine) + } + + return connector, nil +} diff --git a/app/common/wire.go b/app/common/wire.go index 47edbcc7b..d00ea879e 100644 --- a/app/common/wire.go +++ b/app/common/wire.go @@ -22,7 +22,6 @@ import ( ) var Config = wire.NewSet( - wire.FieldsOf(new(config.Configuration), "Apps"), wire.FieldsOf(new(config.Configuration), "Aggregation"), wire.FieldsOf(new(config.AggregationConfiguration), "ClickHouse"), diff --git a/app/config/billing.go b/app/config/billing.go index a15c4e1c9..2e3ebcd0d 100644 --- a/app/config/billing.go +++ b/app/config/billing.go @@ -13,4 +13,6 @@ func (c BillingConfiguration) Validate() error { func ConfigureBilling(v *viper.Viper) { v.SetDefault("billing.enabled", false) + + ConfigureBillingWorker(v) } diff --git a/app/config/billingworker.go b/app/config/billingworker.go index 8d01dd3c7..5e74c8a98 100644 --- a/app/config/billingworker.go +++ b/app/config/billingworker.go @@ -23,7 +23,8 @@ func (c BillingWorkerConfiguration) Validate() error { } func ConfigureBillingWorker(v *viper.Viper) { - ConfigureConsumer(v, "billingWorker") - v.SetDefault("billingWorker.dlq.topic", "om_sys.billing_worker_dlq") - v.SetDefault("billingWorker.consumerGroupName", "om_billing_worker") + v.SetDefault("billing.worker.dlq.topic", "om_sys.billing_worker_dlq") + v.SetDefault("billing.worker.consumerGroupName", "om_billing_worker") + + ConfigureConsumer(v, "billing.worker") } diff --git a/app/config/config.go b/app/config/config.go index 57fc47a23..ad47d41d4 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -160,7 +160,6 @@ func SetViperDefaults(v *viper.Viper, flags *pflag.FlagSet) { ConfigurePortal(v) ConfigureEvents(v) ConfigureBalanceWorker(v) - ConfigureBillingWorker(v) ConfigureNotification(v) ConfigureBilling(v) ConfigureProductCatalog(v) diff --git a/cmd/billing-worker/.air.toml b/cmd/billing-worker/.air.toml index cb007a3db..6f2097c5a 100644 --- a/cmd/billing-worker/.air.toml +++ b/cmd/billing-worker/.air.toml @@ -4,8 +4,8 @@ tmp_dir = "tmp" [build] args_bin = ["--config", "./config.yaml", "--telemetry-address", ":10002"] - bin = "./tmp/openmeter-balance-worker" - cmd = "go build -tags dynamic -o ./tmp/openmeter-balance-worker ./cmd/balance-worker" + bin = "./tmp/openmeter-billing-worker" + cmd = "go build -tags dynamic -o ./tmp/openmeter-billing-worker ./cmd/billing-worker" delay = 0 exclude_dir = ["assets", "ci", "deploy", "docs", "examples", "testdata", "quickstart", "tmp", "vendor", "api/client", "node_modules"] exclude_file = [] diff --git a/cmd/billing-worker/main.go b/cmd/billing-worker/main.go index 6ac4dc6c9..684eeaaaa 100644 --- a/cmd/billing-worker/main.go +++ b/cmd/billing-worker/main.go @@ -73,6 +73,11 @@ func main() { // Validate service prerequisites + if !conf.Billing.Enabled { + logger.Error("billing are disabled, exiting") + os.Exit(1) + } + if !conf.Events.Enabled { logger.Error("events are disabled, exiting") os.Exit(1) diff --git a/cmd/billing-worker/wire_gen.go b/cmd/billing-worker/wire_gen.go index d7f19e74c..6c233a973 100644 --- a/cmd/billing-worker/wire_gen.go +++ b/cmd/billing-worker/wire_gen.go @@ -154,7 +154,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - secretService, err := common.NewSecretService(logger, client, appsConfiguration) + secretService, err := common.NewSecretService(logger, client) if err != nil { cleanup6() cleanup5() @@ -164,7 +164,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - appstripeService, err := common.NewAppStripeService(logger, client, service, customerService, secretService) + appstripeService, err := common.NewAppStripeService(logger, client, appsConfiguration, service, customerService, secretService) if err != nil { cleanup6() cleanup5() @@ -230,7 +230,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - app, err := common.NewAppSandbox(ctx, logger, client, service, manager) + app, err := common.NewAppSandbox(ctx, logger, appsConfiguration, service, manager) if err != nil { cleanup6() cleanup5() @@ -240,7 +240,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - featureConnector := common.NewFeatureConnector(logger, client, appsConfiguration, inMemoryRepository) + featureConnector := common.NewFeatureConnector(logger, client, inMemoryRepository) billingService, err := common.BillingService(logger, client, service, appstripeService, app, billingConfiguration, customerService, featureConnector, inMemoryRepository, connector) if err != nil { cleanup6() diff --git a/cmd/server/main.go b/cmd/server/main.go index 7bb06bdf2..93d30f986 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -16,19 +16,6 @@ import ( "github.com/spf13/viper" "github.com/openmeterio/openmeter/app/config" - apppkg "github.com/openmeterio/openmeter/openmeter/app" - appadapter "github.com/openmeterio/openmeter/openmeter/app/adapter" - appsandbox "github.com/openmeterio/openmeter/openmeter/app/sandbox" - appservice "github.com/openmeterio/openmeter/openmeter/app/service" - appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe" - appstripeadapter "github.com/openmeterio/openmeter/openmeter/app/stripe/adapter" - appstripeservice "github.com/openmeterio/openmeter/openmeter/app/stripe/service" - "github.com/openmeterio/openmeter/openmeter/billing" - billingadapter "github.com/openmeterio/openmeter/openmeter/billing/adapter" - billingservice "github.com/openmeterio/openmeter/openmeter/billing/service" - "github.com/openmeterio/openmeter/openmeter/customer" - customeradapter "github.com/openmeterio/openmeter/openmeter/customer/adapter" - customerservice "github.com/openmeterio/openmeter/openmeter/customer/service" "github.com/openmeterio/openmeter/openmeter/debug" "github.com/openmeterio/openmeter/openmeter/ingest" "github.com/openmeterio/openmeter/openmeter/ingest/ingestdriver" @@ -39,14 +26,9 @@ import ( notificationrepository "github.com/openmeterio/openmeter/openmeter/notification/repository" notificationservice "github.com/openmeterio/openmeter/openmeter/notification/service" notificationwebhook "github.com/openmeterio/openmeter/openmeter/notification/webhook" - plan "github.com/openmeterio/openmeter/openmeter/productcatalog/plan" - planadapter "github.com/openmeterio/openmeter/openmeter/productcatalog/plan/adapter" - planservice "github.com/openmeterio/openmeter/openmeter/productcatalog/plan/service" plansubscription "github.com/openmeterio/openmeter/openmeter/productcatalog/subscription" "github.com/openmeterio/openmeter/openmeter/registry" registrybuilder "github.com/openmeterio/openmeter/openmeter/registry/builder" - secretadapter "github.com/openmeterio/openmeter/openmeter/secret/adapter" - secretservice "github.com/openmeterio/openmeter/openmeter/secret/service" "github.com/openmeterio/openmeter/openmeter/server" "github.com/openmeterio/openmeter/openmeter/server/authenticator" "github.com/openmeterio/openmeter/openmeter/server/router" @@ -174,109 +156,11 @@ func main() { }) } - // Initialize Customer - var customerService customer.CustomerService - - if app.EntClient != nil { - var customerAdapter customer.Adapter - customerAdapter, err = customeradapter.New(customeradapter.Config{ - Client: app.EntClient, - Logger: logger.WithGroup("customer.postgres"), - }) - if err != nil { - logger.Error("failed to initialize customer repository", "error", err) - os.Exit(1) - } - - customerService, err = customerservice.New(customerservice.Config{ - Adapter: customerAdapter, - }) - if err != nil { - logger.Error("failed to initialize customer service", "error", err) - os.Exit(1) - } - } - - // Initialize Secret - secretService, err := secretservice.New(secretservice.Config{ - Adapter: secretadapter.New(), - }) - if err != nil { - logger.Error("failed to initialize secret service", "error", err) - os.Exit(1) - } - - // Initialize App - var appService apppkg.Service - - if conf.Apps.Enabled { - var appAdapter apppkg.Adapter - appAdapter, err = appadapter.New(appadapter.Config{ - Client: app.EntClient, - BaseURL: conf.Apps.BaseURL, - }) - if err != nil { - logger.Error("failed to initialize app repository", "error", err) - os.Exit(1) - } - - appService, err = appservice.New(appservice.Config{ - Adapter: appAdapter, - }) - if err != nil { - logger.Error("failed to initialize app service", "error", err) - os.Exit(1) - } - } - - // Initialize AppStripe - var appStripeService appstripe.Service - - if conf.Apps.Enabled { - var appStripeAdapter appstripe.Adapter - appStripeAdapter, err = appstripeadapter.New(appstripeadapter.Config{ - Client: app.EntClient, - AppService: appService, - CustomerService: customerService, - SecretService: secretService, - }) - if err != nil { - logger.Error("failed to initialize app stripe repository", "error", err) - os.Exit(1) - } - - appStripeService, err = appstripeservice.New(appstripeservice.Config{ - Adapter: appStripeAdapter, - AppService: appService, - SecretService: secretService, - }) - if err != nil { - logger.Error("failed to initialize app stripe service", "error", err) - os.Exit(1) - } - } - - // Initialize AppSandbox - if conf.Apps.Enabled { - _, err = appsandbox.NewFactory(appsandbox.Config{ - AppService: appService, - }) - if err != nil { - logger.Error("failed to initialize app sandbox factory", "error", err) - os.Exit(1) - } - - app, err := appsandbox.AutoProvision(ctx, appsandbox.AutoProvisionInput{ - Namespace: app.NamespaceManager.GetDefaultNamespace(), - AppService: appService, - }) - if err != nil { - logger.Error("failed to auto-provision sandbox app", "error", err) - os.Exit(1) - } - - logger.Info("sandbox app auto-provisioned", "app_id", app.GetID().ID) - } + appService := app.App + appStripeService := app.AppStripe + customerService := app.Customer + billingService := app.Billing + planService := app.Plan // Initialize Notification var notificationService notification.Service @@ -332,29 +216,6 @@ func main() { }() } - // Initialize plans & subscriptions - var planService plan.Service - if conf.ProductCatalog.Enabled { - adapter, err := planadapter.New(planadapter.Config{ - Client: app.EntClient, - Logger: logger.With("subsystem", "productcatalog.plan"), - }) - if err != nil { - logger.Error("failed to initialize plan adapter", "error", err) - os.Exit(1) - } - - planService, err = planservice.New(planservice.Config{ - Feature: entitlementConnRegistry.Feature, - Adapter: adapter, - Logger: logger.With("subsystem", "productcatalog.plan"), - }) - if err != nil { - logger.Error("failed to initialize plan service", "error", err) - os.Exit(1) - } - } - // Initialize subscriptions var subscriptionService subscription.Service var subscriptionWorkflowService subscription.WorkflowService @@ -391,33 +252,6 @@ func main() { }) } - // Initialize billing - var billingService billing.Service - if conf.Billing.Enabled { - adapter, err := billingadapter.New(billingadapter.Config{ - Client: app.EntClient, - Logger: logger.With("subsystem", "billing.adapter"), - }) - if err != nil { - logger.Error("failed to initialize billing adapter", "error", err) - os.Exit(1) - } - - billingService, err = billingservice.New(billingservice.Config{ - Adapter: adapter, - CustomerService: customerService, - AppService: appService, - Logger: logger.With("subsystem", "billing.service"), - FeatureService: entitlementConnRegistry.Feature, - MeterRepo: app.MeterRepository, - StreamingConnector: app.StreamingConnector, - }) - if err != nil { - logger.Error("failed to initialize billing service", "error", err) - os.Exit(1) - } - } - s, err := server.NewServer(&server.Config{ RouterConfig: router.Config{ NamespaceManager: app.NamespaceManager, diff --git a/cmd/server/wire.go b/cmd/server/wire.go index 6a7d0445a..e8c34dfcd 100644 --- a/cmd/server/wire.go +++ b/cmd/server/wire.go @@ -14,10 +14,18 @@ import ( "github.com/openmeterio/openmeter/app/common" "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/app" + appsandbox "github.com/openmeterio/openmeter/openmeter/app/sandbox" + appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe" + "github.com/openmeterio/openmeter/openmeter/billing" + "github.com/openmeterio/openmeter/openmeter/customer" "github.com/openmeterio/openmeter/openmeter/ent/db" "github.com/openmeterio/openmeter/openmeter/ingest" "github.com/openmeterio/openmeter/openmeter/meter" "github.com/openmeterio/openmeter/openmeter/namespace" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/openmeter/productcatalog/plan" + "github.com/openmeterio/openmeter/openmeter/secret" "github.com/openmeterio/openmeter/openmeter/streaming" "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics" @@ -27,40 +35,49 @@ type Application struct { common.GlobalInitializer common.Migrator - StreamingConnector streaming.Connector - MeterRepository meter.Repository + App app.Service + AppStripe appstripe.Service + AppSandbox *appsandbox.App + Customer customer.Service + Billing billing.Service EntClient *db.Client - TelemetryServer common.TelemetryServer + EventPublisher eventbus.Publisher + FeatureConnector feature.FeatureConnector + IngestCollector ingest.Collector KafkaProducer *kafka.Producer KafkaMetrics *kafkametrics.Metrics - EventPublisher eventbus.Publisher - - IngestCollector ingest.Collector - - NamespaceHandlers []namespace.Handler - NamespaceManager *namespace.Manager - - Logger *slog.Logger - Meter metric.Meter - - RouterHook func(chi.Router) + Logger *slog.Logger + MeterRepository meter.Repository + NamespaceHandlers []namespace.Handler + NamespaceManager *namespace.Manager + Meter metric.Meter + Plan plan.Service + RouterHook func(chi.Router) + Secret secret.Service + StreamingConnector streaming.Connector + TelemetryServer common.TelemetryServer } func initializeApplication(ctx context.Context, conf config.Configuration) (Application, func(), error) { wire.Build( metadata, + common.App, + common.Billing, + common.ClickHouse, common.Config, + common.Customer, + common.Database, common.Framework, - common.Telemetry, + common.Kafka, common.NewDefaultTextMapPropagator, + common.NewServerPublisher, common.NewTelemetryRouterHook, - common.Database, - common.ClickHouse, - common.Kafka, + common.OpenMeter, + common.ProductCatalog, + common.Secret, common.ServerProvisionTopics, + common.Telemetry, common.WatermillNoPublisher, - common.NewServerPublisher, - common.OpenMeter, wire.Struct(new(Application), "*"), ) diff --git a/cmd/server/wire_gen.go b/cmd/server/wire_gen.go index 53720d00a..ce5e438f8 100644 --- a/cmd/server/wire_gen.go +++ b/cmd/server/wire_gen.go @@ -12,10 +12,18 @@ import ( "github.com/go-chi/chi/v5" "github.com/openmeterio/openmeter/app/common" "github.com/openmeterio/openmeter/app/config" + "github.com/openmeterio/openmeter/openmeter/app" + "github.com/openmeterio/openmeter/openmeter/app/sandbox" + "github.com/openmeterio/openmeter/openmeter/app/stripe" + "github.com/openmeterio/openmeter/openmeter/billing" + "github.com/openmeterio/openmeter/openmeter/customer" "github.com/openmeterio/openmeter/openmeter/ent/db" "github.com/openmeterio/openmeter/openmeter/ingest" "github.com/openmeterio/openmeter/openmeter/meter" "github.com/openmeterio/openmeter/openmeter/namespace" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/openmeter/productcatalog/plan" + "github.com/openmeterio/openmeter/openmeter/secret" "github.com/openmeterio/openmeter/openmeter/streaming" "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka" "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" @@ -73,9 +81,8 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl Client: client, Logger: logger, } - aggregationConfiguration := conf.Aggregation - clickHouseAggregationConfiguration := aggregationConfiguration.ClickHouse - v, err := common.NewClickHouse(clickHouseAggregationConfiguration) + appsConfiguration := conf.Apps + service, err := common.NewAppService(logger, client, appsConfiguration) if err != nil { cleanup5() cleanup4() @@ -84,9 +91,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - v2 := conf.Meters - inMemoryRepository := common.NewMeterRepository(v2) - connector, err := common.NewStreamingConnector(ctx, aggregationConfiguration, v, inMemoryRepository, logger) + customerService, err := common.NewCustomerService(logger, client) if err != nil { cleanup5() cleanup4() @@ -95,12 +100,8 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - health := common.NewHealthChecker(logger) - telemetryHandler := common.NewTelemetryHandler(metricsTelemetryConfig, health) - v3, cleanup6 := common.NewTelemetryServer(telemetryConfig, telemetryHandler) - producer, err := common.NewKafkaProducer(conf, logger) + secretService, err := common.NewSecretService(logger, client) if err != nil { - cleanup6() cleanup5() cleanup4() cleanup3() @@ -108,9 +109,17 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - metrics, err := common.NewKafkaMetrics(meter) + appstripeService, err := common.NewAppStripeService(logger, client, appsConfiguration, service, customerService, secretService) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + namespacedTopicResolver, err := common.NewNamespacedTopicResolver(conf) if err != nil { - cleanup6() cleanup5() cleanup4() cleanup3() @@ -118,15 +127,11 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - eventsConfiguration := conf.Events ingestConfiguration := conf.Ingest kafkaIngestConfiguration := ingestConfiguration.Kafka kafkaConfiguration := kafkaIngestConfiguration.KafkaConfiguration - brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) - v4 := common.ServerProvisionTopics(eventsConfiguration) adminClient, err := common.NewKafkaAdminClient(kafkaConfiguration) if err != nil { - cleanup6() cleanup5() cleanup4() cleanup3() @@ -138,7 +143,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl kafkaTopicProvisionerConfig := common.NewKafkaTopicProvisionerConfig(adminClient, logger, meter, topicProvisionerConfig) topicProvisioner, err := common.NewKafkaTopicProvisioner(kafkaTopicProvisionerConfig) if err != nil { - cleanup6() cleanup5() cleanup4() cleanup3() @@ -146,14 +150,78 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } + namespaceHandler, err := common.NewKafkaNamespaceHandler(namespacedTopicResolver, topicProvisioner, kafkaIngestConfiguration) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + aggregationConfiguration := conf.Aggregation + clickHouseAggregationConfiguration := aggregationConfiguration.ClickHouse + v, err := common.NewClickHouse(clickHouseAggregationConfiguration) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + v2 := conf.Meters + inMemoryRepository := common.NewMeterRepository(v2) + connector, err := common.NewStreamingConnector(ctx, aggregationConfiguration, v, inMemoryRepository, logger) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + v3 := common.NewNamespaceHandlers(namespaceHandler, connector) + namespaceConfiguration := conf.Namespace + manager, err := common.NewNamespaceManager(v3, namespaceConfiguration) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + app, err := common.NewAppSandbox(ctx, logger, appsConfiguration, service, manager) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + billingConfiguration := conf.Billing + featureConnector := common.NewFeatureConnector(logger, client, inMemoryRepository) + billingService, err := common.BillingService(logger, client, service, appstripeService, app, billingConfiguration, customerService, featureConnector, inMemoryRepository, connector) + if err != nil { + cleanup5() + cleanup4() + cleanup3() + cleanup2() + cleanup() + return Application{}, nil, err + } + eventsConfiguration := conf.Events + brokerOptions := common.NewBrokerConfiguration(kafkaConfiguration, logTelemetryConfig, commonMetadata, logger, meter) + v4 := common.ServerProvisionTopics(eventsConfiguration) publisherOptions := kafka.PublisherOptions{ Broker: brokerOptions, ProvisionTopics: v4, TopicProvisioner: topicProvisioner, } - publisher, cleanup7, err := common.NewServerPublisher(ctx, eventsConfiguration, publisherOptions, logger) + publisher, cleanup6, err := common.NewServerPublisher(ctx, eventsConfiguration, publisherOptions, logger) if err != nil { - cleanup6() cleanup5() cleanup4() cleanup3() @@ -163,7 +231,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl } eventbusPublisher, err := common.NewEventBusPublisher(publisher, eventsConfiguration, logger) if err != nil { - cleanup7() cleanup6() cleanup5() cleanup4() @@ -172,9 +239,8 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - namespacedTopicResolver, err := common.NewNamespacedTopicResolver(conf) + producer, err := common.NewKafkaProducer(conf, logger) if err != nil { - cleanup7() cleanup6() cleanup5() cleanup4() @@ -185,7 +251,6 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl } collector, err := common.NewKafkaIngestCollector(kafkaIngestConfiguration, producer, namespacedTopicResolver, topicProvisioner) if err != nil { - cleanup7() cleanup6() cleanup5() cleanup4() @@ -194,9 +259,8 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - ingestCollector, cleanup8, err := common.NewIngestCollector(conf, collector, logger, meter) + ingestCollector, cleanup7, err := common.NewIngestCollector(conf, collector, logger, meter) if err != nil { - cleanup7() cleanup6() cleanup5() cleanup4() @@ -205,9 +269,8 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - namespaceHandler, err := common.NewKafkaNamespaceHandler(namespacedTopicResolver, topicProvisioner, kafkaIngestConfiguration) + metrics, err := common.NewKafkaMetrics(meter) if err != nil { - cleanup8() cleanup7() cleanup6() cleanup5() @@ -217,11 +280,9 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - v5 := common.NewNamespaceHandlers(namespaceHandler, connector) - namespaceConfiguration := conf.Namespace - manager, err := common.NewNamespaceManager(v5, namespaceConfiguration) + productCatalogConfiguration := conf.ProductCatalog + planService, err := common.NewPlanService(logger, client, productCatalogConfiguration, featureConnector) if err != nil { - cleanup8() cleanup7() cleanup6() cleanup5() @@ -231,23 +292,34 @@ func initializeApplication(ctx context.Context, conf config.Configuration) (Appl cleanup() return Application{}, nil, err } - v6 := common.NewTelemetryRouterHook(meterProvider, tracerProvider) + v5 := common.NewTelemetryRouterHook(meterProvider, tracerProvider) + health := common.NewHealthChecker(logger) + telemetryHandler := common.NewTelemetryHandler(metricsTelemetryConfig, health) + v6, cleanup8 := common.NewTelemetryServer(telemetryConfig, telemetryHandler) application := Application{ GlobalInitializer: globalInitializer, Migrator: migrator, - StreamingConnector: connector, - MeterRepository: inMemoryRepository, + App: service, + AppStripe: appstripeService, + AppSandbox: app, + Customer: customerService, + Billing: billingService, EntClient: client, - TelemetryServer: v3, - KafkaProducer: producer, - KafkaMetrics: metrics, EventPublisher: eventbusPublisher, + FeatureConnector: featureConnector, IngestCollector: ingestCollector, - NamespaceHandlers: v5, - NamespaceManager: manager, + KafkaProducer: producer, + KafkaMetrics: metrics, Logger: logger, + MeterRepository: inMemoryRepository, + NamespaceHandlers: v3, + NamespaceManager: manager, Meter: meter, - RouterHook: v6, + Plan: planService, + RouterHook: v5, + Secret: secretService, + StreamingConnector: connector, + TelemetryServer: v6, } return application, func() { cleanup8() @@ -267,23 +339,27 @@ type Application struct { common.GlobalInitializer common.Migrator - StreamingConnector streaming.Connector - MeterRepository meter.Repository + App app.Service + AppStripe appstripe.Service + AppSandbox *appsandbox.App + Customer customer.Service + Billing billing.Service EntClient *db.Client - TelemetryServer common.TelemetryServer + EventPublisher eventbus.Publisher + FeatureConnector feature.FeatureConnector + IngestCollector ingest.Collector KafkaProducer *kafka2.Producer KafkaMetrics *metrics.Metrics - EventPublisher eventbus.Publisher - - IngestCollector ingest.Collector - - NamespaceHandlers []namespace.Handler - NamespaceManager *namespace.Manager - - Logger *slog.Logger - Meter metric.Meter - - RouterHook func(chi.Router) + Logger *slog.Logger + MeterRepository meter.Repository + NamespaceHandlers []namespace.Handler + NamespaceManager *namespace.Manager + Meter metric.Meter + Plan plan.Service + RouterHook func(chi.Router) + Secret secret.Service + StreamingConnector streaming.Connector + TelemetryServer common.TelemetryServer } func metadata(conf config.Configuration) common.Metadata { From 9bb58be31a701f13d1aa8d8fda1e068a257954e4 Mon Sep 17 00:00:00 2001 From: Peter Marton Date: Fri, 13 Dec 2024 15:39:14 +0100 Subject: [PATCH 4/4] test(config): fix --- app/config/config_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/app/config/config_test.go b/app/config/config_test.go index 2bde498f9..5cc0f91da 100644 --- a/app/config/config_test.go +++ b/app/config/config_test.go @@ -132,6 +132,29 @@ func TestComplete(t *testing.T) { AsyncInsert: false, AsyncInsertWait: false, }, + Billing: BillingConfiguration{ + Enabled: false, + Worker: BillingWorkerConfiguration{ + ConsumerConfiguration: ConsumerConfiguration{ + ProcessingTimeout: 30 * time.Second, + Retry: RetryConfiguration{ + InitialInterval: 10 * time.Millisecond, + MaxInterval: time.Second, + MaxElapsedTime: time.Minute, + }, + DLQ: DLQConfiguration{ + Enabled: true, + Topic: "om_sys.billing_worker_dlq", + AutoProvision: DLQAutoProvisionConfiguration{ + Enabled: true, + Partitions: 1, + Retention: 90 * 24 * time.Hour, + }, + }, + ConsumerGroupName: "om_billing_worker", + }, + }, + }, Sink: SinkConfiguration{ GroupId: "openmeter-sink-worker", MinCommitCount: 500,