Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: billing worker #1958

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ build-balance-worker: ## Build balance-worker binary
$(call print-target)
go build -o build/balance-worker ./cmd/balance-worker

.PHONY: build-billing-worker
build-billing-worker: ## Build billing-worker binary
$(call print-target)
go build -o build/billing-worker ./cmd/billing-worker

.PHONY: build-notification-service
build-notification-service: ## Build notification-service binary
$(call print-target)
Expand All @@ -86,6 +91,12 @@ balance-worker: ## Run balance-worker
$(call print-target)
air -c ./cmd/balance-worker/.air.toml

.PHONY: billing-worker
billing-worker: ## Run billing-worker
@ if [ config.yaml -ot config.example.yaml ]; then diff -u config.yaml config.example.yaml || (echo "!!! The configuration example changed. Please update your config.yaml file accordingly (or at least touch it). !!!" && false); fi
$(call print-target)
air -c ./cmd/billing-worker/.air.toml

.PHONY: notification-service
notification-service: ## Run notification-service
@ if [ config.yaml -ot config.example.yaml ]; then diff -u config.yaml config.example.yaml || (echo "!!! The configuration example changed. Please update your config.yaml file accordingly (or at least touch it). !!!" && false); fi
Expand Down
110 changes: 84 additions & 26 deletions app/common/app.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,102 @@
package common

import (
"errors"
"context"
"fmt"
"log/slog"
"net/http"

"github.com/oklog/run"
"github.com/google/wire"

"github.com/openmeterio/openmeter/app/config"
"github.com/openmeterio/openmeter/openmeter/app"
appadapter "github.com/openmeterio/openmeter/openmeter/app/adapter"
appsandbox "github.com/openmeterio/openmeter/openmeter/app/sandbox"
appservice "github.com/openmeterio/openmeter/openmeter/app/service"
appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe"
appstripeadapter "github.com/openmeterio/openmeter/openmeter/app/stripe/adapter"
appstripeservice "github.com/openmeterio/openmeter/openmeter/app/stripe/service"
"github.com/openmeterio/openmeter/openmeter/customer"
entdb "github.com/openmeterio/openmeter/openmeter/ent/db"
"github.com/openmeterio/openmeter/openmeter/namespace"
"github.com/openmeterio/openmeter/openmeter/secret"
)

// Metadata provides information about the service to components that need it (eg. telemetry).
type Metadata struct {
ServiceName string
Version string
Environment string
OpenTelemetryName string
}
var App = wire.NewSet(
wire.FieldsOf(new(config.Configuration), "Apps"),

NewAppService,
NewAppStripeService,
NewAppSandbox,
)

func NewAppService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration) (app.Service, error) {
// TODO: remove this check after enabled by default
if !appsConfig.Enabled || db == nil {
return nil, nil
}

func NewMetadata(conf config.Configuration, version string, serviceName string) Metadata {
return Metadata{
ServiceName: fmt.Sprintf("openmeter-%s", serviceName),
Version: version,
Environment: conf.Environment,
OpenTelemetryName: fmt.Sprintf("openmeter.io/%s", serviceName),
appAdapter, err := appadapter.New(appadapter.Config{
Client: db,
BaseURL: appsConfig.BaseURL,
})
if err != nil {
return nil, fmt.Errorf("failed to create app adapter: %w", err)
}

return appservice.New(appservice.Config{
Adapter: appAdapter,
})
}

// Runner is a helper struct that runs a group of services.
type Runner struct {
Group run.Group
Logger *slog.Logger
func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, appService app.Service, customerService customer.Service, secretService secret.Service) (appstripe.Service, error) {
// TODO: remove this check after enabled by default
if !appsConfig.Enabled || db == nil {
return nil, nil
}

appStripeAdapter, err := appstripeadapter.New(appstripeadapter.Config{
Client: db,
AppService: appService,
CustomerService: customerService,
SecretService: secretService,
})
if err != nil {
return nil, fmt.Errorf("failed to create appstripe adapter: %w", err)
}

return appstripeservice.New(appstripeservice.Config{
Adapter: appStripeAdapter,
AppService: appService,
SecretService: secretService,
})
}

func (r Runner) Run() {
err := r.Group.Run()
if e := (run.SignalError{}); errors.As(err, &e) {
r.Logger.Info("received signal: shutting down", slog.String("signal", e.Signal.String()))
} else if !errors.Is(err, http.ErrServerClosed) {
r.Logger.Error("application stopped due to error", slog.String("error", err.Error()))
func NewAppSandbox(ctx context.Context, logger *slog.Logger, appsConfig config.AppsConfiguration, appService app.Service, namespaceManager *namespace.Manager) (*appsandbox.App, error) {
if !appsConfig.Enabled {
return nil, nil
}

_, err := appsandbox.NewFactory(appsandbox.Config{
AppService: appService,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize app sandbox factory: %w", err)
}

app, err := appsandbox.AutoProvision(ctx, appsandbox.AutoProvisionInput{
Namespace: namespaceManager.GetDefaultNamespace(),
AppService: appService,
})
if err != nil {
return nil, fmt.Errorf("failed to auto-provision sandbox app: %w", err)
}

logger.Info("sandbox app auto-provisioned", "app_id", app.GetID().ID)

appSandbox, ok := app.(appsandbox.App)
if !ok {
return nil, fmt.Errorf("failed to cast app to sandbox app")
}

return &appSandbox, nil
}
60 changes: 60 additions & 0 deletions app/common/billing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package common

import (
"fmt"
"log/slog"

"github.com/google/wire"

"github.com/openmeterio/openmeter/app/config"
"github.com/openmeterio/openmeter/openmeter/app"
appsandbox "github.com/openmeterio/openmeter/openmeter/app/sandbox"
appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe"
"github.com/openmeterio/openmeter/openmeter/billing"
billingadapter "github.com/openmeterio/openmeter/openmeter/billing/adapter"
billingservice "github.com/openmeterio/openmeter/openmeter/billing/service"
"github.com/openmeterio/openmeter/openmeter/customer"
entdb "github.com/openmeterio/openmeter/openmeter/ent/db"
"github.com/openmeterio/openmeter/openmeter/meter"
"github.com/openmeterio/openmeter/openmeter/productcatalog/feature"
"github.com/openmeterio/openmeter/openmeter/streaming"
)

var Billing = wire.NewSet(
BillingService,
)

func BillingService(
logger *slog.Logger,
db *entdb.Client,
appService app.Service,
appStripeService appstripe.Service,
appSandbox *appsandbox.App,
billingConfig config.BillingConfiguration,
customerService customer.Service,
featureConnector feature.FeatureConnector,
meterRepo meter.Repository,
streamingConnector streaming.Connector,
) (billing.Service, error) {
if !billingConfig.Enabled {
return nil, nil
}

adapter, err := billingadapter.New(billingadapter.Config{
Client: db,
Logger: logger,
})
if err != nil {
return nil, fmt.Errorf("creating billing adapter: %w", err)
}

return billingservice.New(billingservice.Config{
Adapter: adapter,
AppService: appService,
CustomerService: customerService,
FeatureService: featureConnector,
Logger: logger,
MeterRepo: meterRepo,
StreamingConnector: streamingConnector,
})
}
36 changes: 36 additions & 0 deletions app/common/customer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package common

import (
"fmt"
"log/slog"

"github.com/google/wire"

"github.com/openmeterio/openmeter/openmeter/customer"
customeradapter "github.com/openmeterio/openmeter/openmeter/customer/adapter"
customerservice "github.com/openmeterio/openmeter/openmeter/customer/service"
entdb "github.com/openmeterio/openmeter/openmeter/ent/db"
)

var Customer = wire.NewSet(
NewCustomerService,
)

func NewCustomerService(logger *slog.Logger, db *entdb.Client) (customer.Service, error) {
// TODO: remove this check after enabled by default
if db == nil {
return nil, nil
}

customerAdapter, err := customeradapter.New(customeradapter.Config{
Client: db,
Logger: logger.WithGroup("customer.postgres"),
})
if err != nil {
return nil, fmt.Errorf("failed to create customer adapter: %w", err)
}

return customerservice.New(customerservice.Config{
Adapter: customerAdapter,
})
}
36 changes: 36 additions & 0 deletions app/common/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/app/config"
"github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest"
"github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest/topicresolver"
"github.com/openmeterio/openmeter/openmeter/namespace"
"github.com/openmeterio/openmeter/openmeter/streaming"
pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics"
)
Expand Down Expand Up @@ -83,3 +87,35 @@ func NewKafkaTopicProvisioner(conf pkgkafka.TopicProvisionerConfig) (pkgkafka.To

return topicProvisioner, nil
}

func NewNamespaceHandlers(
kafkaHandler *kafkaingest.NamespaceHandler,
clickHouseHandler streaming.Connector,
) []namespace.Handler {
return []namespace.Handler{
kafkaHandler,
clickHouseHandler,
}
}

func NewKafkaNamespaceHandler(
topicResolver topicresolver.Resolver,
topicProvisioner pkgkafka.TopicProvisioner,
conf config.KafkaIngestConfiguration,
) (*kafkaingest.NamespaceHandler, error) {
return &kafkaingest.NamespaceHandler{
TopicResolver: topicResolver,
TopicProvisioner: topicProvisioner,
Partitions: conf.Partitions,
DeletionEnabled: conf.NamespaceDeletionEnabled,
}, nil
}

func NewNamespacedTopicResolver(config config.Configuration) (*topicresolver.NamespacedTopicResolver, error) {
topicResolver, err := topicresolver.NewNamespacedTopicResolver(config.Ingest.Kafka.EventsTopicTemplate)
if err != nil {
return nil, fmt.Errorf("failed to create topic name resolver: %w", err)
}

return topicResolver, nil
}
24 changes: 24 additions & 0 deletions app/common/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package common

import (
"fmt"

"github.com/openmeterio/openmeter/app/config"
)

// Metadata provides information about the service to components that need it (eg. telemetry).
type Metadata struct {
ServiceName string
Version string
Environment string
OpenTelemetryName string
}

func NewMetadata(conf config.Configuration, version string, serviceName string) Metadata {
return Metadata{
ServiceName: fmt.Sprintf("openmeter-%s", serviceName),
Version: version,
Environment: conf.Environment,
OpenTelemetryName: fmt.Sprintf("openmeter.io/%s", serviceName),
}
}
13 changes: 13 additions & 0 deletions app/common/meter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package common

import (
"github.com/samber/lo"

"github.com/openmeterio/openmeter/openmeter/meter"
"github.com/openmeterio/openmeter/pkg/models"
"github.com/openmeterio/openmeter/pkg/slicesx"
)

func NewMeterRepository(meters []*models.Meter) *meter.InMemoryRepository {
return meter.NewInMemoryRepository(slicesx.Map(meters, lo.FromPtr[models.Meter]))
}
24 changes: 24 additions & 0 deletions app/common/namespace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package common

import (
"fmt"

"github.com/openmeterio/openmeter/app/config"
"github.com/openmeterio/openmeter/openmeter/namespace"
)

func NewNamespaceManager(
handlers []namespace.Handler,
conf config.NamespaceConfiguration,
) (*namespace.Manager, error) {
manager, err := namespace.NewManager(namespace.ManagerConfig{
Handlers: handlers,
DefaultNamespace: conf.Default,
DisableManagement: conf.DisableManagement,
})
if err != nil {
return nil, fmt.Errorf("create namespace manager: %v", err)
}

return manager, nil
}
Loading
Loading