Skip to content

Commit

Permalink
feat: billing worker
Browse files Browse the repository at this point in the history
  • Loading branch information
turip committed Dec 12, 2024
1 parent eba9258 commit 1313361
Show file tree
Hide file tree
Showing 13 changed files with 751 additions and 0 deletions.
151 changes: 151 additions & 0 deletions app/common/billing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package common

import (
"context"
"fmt"
"log/slog"
"syscall"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/google/wire"
"github.com/oklog/run"

"github.com/openmeterio/openmeter/app/config"
"github.com/openmeterio/openmeter/openmeter/app"
"github.com/openmeterio/openmeter/openmeter/billing"
billingadapter "github.com/openmeterio/openmeter/openmeter/billing/adapter"
billingservice "github.com/openmeterio/openmeter/openmeter/billing/service"
billingworker "github.com/openmeterio/openmeter/openmeter/billing/worker"
"github.com/openmeterio/openmeter/openmeter/customer"
entdb "github.com/openmeterio/openmeter/openmeter/ent/db"
"github.com/openmeterio/openmeter/openmeter/meter"
"github.com/openmeterio/openmeter/openmeter/productcatalog/feature"
"github.com/openmeterio/openmeter/openmeter/streaming"
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
"github.com/openmeterio/openmeter/openmeter/watermill/router"
pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
)

func BillingWorkerProvisionTopics(conf config.BillingConfiguration) []pkgkafka.TopicConfig {
var provisionTopics []pkgkafka.TopicConfig

if conf.Worker.DLQ.AutoProvision.Enabled {
provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{
Name: conf.Worker.DLQ.Topic,
Partitions: conf.Worker.DLQ.AutoProvision.Partitions,
RetentionTime: pkgkafka.TimeDurationMilliSeconds(conf.Worker.DLQ.AutoProvision.Retention),
})
}

return provisionTopics
}

// no closer function: the subscriber is closed by the router/worker
func BillingWorkerSubscriber(conf config.BillingConfiguration, brokerOptions watermillkafka.BrokerOptions) (message.Subscriber, error) {
subscriber, err := watermillkafka.NewSubscriber(watermillkafka.SubscriberOptions{
Broker: brokerOptions,
ConsumerGroupName: conf.Worker.ConsumerGroupName,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize Kafka subscriber: %w", err)
}

return subscriber, nil
}

func NewBillingWorkerOptions(
eventConfig config.EventsConfiguration,
routerOptions router.Options,
eventBus eventbus.Publisher,
billingService billing.Service,
logger *slog.Logger,
) billingworker.WorkerOptions {
return billingworker.WorkerOptions{
SystemEventsTopic: eventConfig.SystemEvents.Topic,

Router: routerOptions,
EventBus: eventBus,
BillingService: billingService,
Logger: logger,
}
}

func NewBillingWorker(workerOptions billingworker.WorkerOptions) (*billingworker.Worker, error) {
worker, err := billingworker.New(workerOptions)
if err != nil {
return nil, fmt.Errorf("failed to initialize worker: %w", err)
}

return worker, nil
}

func BillingWorkerGroup(
ctx context.Context,
worker *billingworker.Worker,
telemetryServer TelemetryServer,
) run.Group {
var group run.Group

group.Add(
func() error { return telemetryServer.ListenAndServe() },
func(err error) { _ = telemetryServer.Shutdown(ctx) },
)

group.Add(
func() error { return worker.Run(ctx) },
func(err error) { _ = worker.Close() },
)

group.Add(run.SignalHandler(ctx, syscall.SIGINT, syscall.SIGTERM))

return group
}

func BillingService(
logger *slog.Logger,
db *entdb.Client,
billingConfig config.BillingConfiguration,
customerService customer.Service,
appService app.Service,
featureConnector feature.FeatureConnector,
meterRepo meter.Repository,
streamingConnector streaming.Connector,
) (billing.Service, error) {
if !billingConfig.Enabled {
return nil, nil
}

adapter, err := billingadapter.New(billingadapter.Config{
Client: db,
Logger: logger,
})
if err != nil {
return nil, fmt.Errorf("creating billing adapter: %w", err)
}

return billingservice.New(billingservice.Config{
Adapter: adapter,
CustomerService: customerService,
AppService: appService,
Logger: logger,
FeatureService: featureConnector,
MeterRepo: meterRepo,
StreamingConnector: streamingConnector,
})
}

var BillingWorker = wire.NewSet(
wire.FieldsOf(new(config.BillingWorkerConfiguration), "ConsumerConfiguration"),
wire.FieldsOf(new(config.BillingConfiguration), "Worker"),

BillingWorkerProvisionTopics,
BillingWorkerSubscriber,

NewCustomerService,
BillingService,

NewBillingWorkerOptions,
NewBillingWorker,
BillingWorkerGroup,
)
1 change: 1 addition & 0 deletions app/common/customer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package common
20 changes: 20 additions & 0 deletions app/common/openmeter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"os"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ThreeDotsLabs/watermill/message"
Expand All @@ -12,6 +13,10 @@ import (
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/app/config"
"github.com/openmeterio/openmeter/openmeter/customer"
customeradapter "github.com/openmeterio/openmeter/openmeter/customer/adapter"
customerservice "github.com/openmeterio/openmeter/openmeter/customer/service"
entdb "github.com/openmeterio/openmeter/openmeter/ent/db"
"github.com/openmeterio/openmeter/openmeter/ingest"
"github.com/openmeterio/openmeter/openmeter/ingest/ingestadapter"
"github.com/openmeterio/openmeter/openmeter/ingest/kafkaingest"
Expand Down Expand Up @@ -251,3 +256,18 @@ func NewFlushHandler(

return flushHandlerMux, nil
}

func NewCustomerService(logger *slog.Logger, db *entdb.Client) (customer.Service, error) {
customerAdapter, err := customeradapter.New(customeradapter.Config{
Client: db,
Logger: logger.WithGroup("customer.postgres"),
})
if err != nil {
logger.Error("failed to initialize customer repository", "error", err)
os.Exit(1)
}

return customerservice.New(customerservice.Config{
Adapter: customerAdapter,
})
}
1 change: 1 addition & 0 deletions app/common/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var Config = wire.NewSet(
wire.FieldsOf(new(config.Configuration), "Namespace"),
wire.FieldsOf(new(config.Configuration), "Events"),
wire.FieldsOf(new(config.Configuration), "BalanceWorker"),
wire.FieldsOf(new(config.Configuration), "Billing"),
wire.FieldsOf(new(config.Configuration), "Notification"),
wire.FieldsOf(new(config.Configuration), "Sink"),
)
Expand Down
1 change: 1 addition & 0 deletions app/config/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "github.com/spf13/viper"

type BillingConfiguration struct {
Enabled bool
Worker BillingWorkerConfiguration
}

func (c BillingConfiguration) Validate() error {
Expand Down
29 changes: 29 additions & 0 deletions app/config/billingworker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package config

import (
"errors"

"github.com/spf13/viper"

"github.com/openmeterio/openmeter/pkg/errorsx"
)

type BillingWorkerConfiguration struct {
ConsumerConfiguration `mapstructure:",squash"`
}

func (c BillingWorkerConfiguration) Validate() error {
var errs []error

if err := c.ConsumerConfiguration.Validate(); err != nil {
errs = append(errs, errorsx.WithPrefix(err, "consumer"))
}

return errors.Join(errs...)
}

func ConfigureBillingWorker(v *viper.Viper) {
ConfigureConsumer(v, "billingWorker")
v.SetDefault("billingWorker.dlq.topic", "om_sys.billing_worker_dlq")
v.SetDefault("billingWorker.consumerGroupName", "om_billing_worker")
}
1 change: 1 addition & 0 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func SetViperDefaults(v *viper.Viper, flags *pflag.FlagSet) {
ConfigurePortal(v)
ConfigureEvents(v)
ConfigureBalanceWorker(v)
ConfigureBillingWorker(v)
ConfigureNotification(v)
ConfigureStripe(v)
ConfigureBilling(v)
Expand Down
44 changes: 44 additions & 0 deletions cmd/billing-worker/.air.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
root = "."
testdata_dir = "testdata"
tmp_dir = "tmp"

[build]
args_bin = ["--config", "./config.yaml", "--telemetry-address", ":10002"]
bin = "./tmp/openmeter-balance-worker"
cmd = "go build -tags dynamic -o ./tmp/openmeter-balance-worker ./cmd/balance-worker"
delay = 0
exclude_dir = ["assets", "ci", "deploy", "docs", "examples", "testdata", "quickstart", "tmp", "vendor", "api/client", "node_modules"]
exclude_file = []
exclude_regex = ["_test.go"]
exclude_unchanged = false
follow_symlink = false
full_bin = ""
include_dir = []
include_ext = ["go", "tpl", "tmpl", "html", "yml", "yaml", "sql", "json"]
include_file = []
kill_delay = "0s"
log = "build-errors.log"
poll = false
poll_interval = 0
rerun = false
rerun_delay = 500
send_interrupt = false
stop_on_error = false

[color]
app = ""
build = "yellow"
main = "magenta"
runner = "green"
watcher = "cyan"

[log]
main_only = false
time = false

[misc]
clean_on_exit = false

[screen]
clear_on_rebuild = false
keep_scroll = true
87 changes: 87 additions & 0 deletions cmd/billing-worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package main

import (
"context"
"errors"
"fmt"
"log/slog"
"os"

"github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/openmeterio/openmeter/app/config"
)

func main() {
v, flags := viper.NewWithOptions(viper.WithDecodeHook(config.DecodeHook())), pflag.NewFlagSet("OpenMeter", pflag.ExitOnError)
ctx := context.Background()

config.SetViperDefaults(v, flags)

flags.String("config", "", "Configuration file")
flags.Bool("version", false, "Show version information")
flags.Bool("validate", false, "Validate configuration and exit")

_ = flags.Parse(os.Args[1:])

if v, _ := flags.GetBool("version"); v {
fmt.Printf("%s version %s (%s) built on %s\n", "Open Meter", version, revision, revisionDate)

os.Exit(0)
}

if c, _ := flags.GetString("config"); c != "" {
v.SetConfigFile(c)
}

err := v.ReadInConfig()
if err != nil && !errors.As(err, &viper.ConfigFileNotFoundError{}) {
panic(err)
}

var conf config.Configuration
err = v.Unmarshal(&conf)
if err != nil {
panic(err)
}

err = conf.Validate()
if err != nil {
println("configuration error:")
println(err.Error())
os.Exit(1)
}

if v, _ := flags.GetBool("validate"); v {
os.Exit(0)
}

app, cleanup, err := initializeApplication(ctx, conf)
if err != nil {
slog.Error("failed to initialize application", "error", err)

cleanup()

os.Exit(1)
}
defer cleanup()

app.SetGlobals()

logger := app.Logger

// Validate service prerequisites

if !conf.Events.Enabled {
logger.Error("events are disabled, exiting")
os.Exit(1)
}

if err := app.Migrate(ctx); err != nil {
logger.Error("failed to initialize database", "error", err)
os.Exit(1)
}

app.Run()
}
Loading

0 comments on commit 1313361

Please sign in to comment.