Skip to content

Commit

Permalink
feat: add balance snapshot recalculation job
Browse files Browse the repository at this point in the history
This job can be used to trigger a recalculation of all the entitlmenet
snapshots if needed.
  • Loading branch information
turip committed Aug 9, 2024
1 parent c451c79 commit 26ca984
Show file tree
Hide file tree
Showing 13 changed files with 668 additions and 15 deletions.
15 changes: 14 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@
"--telemetry-address",
":10003"
]
}
},
{
"name": "Launch recalculate entitlement snapshots",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/jobs",
"args": [
"--config",
"${workspaceFolder}/config.yaml",
"entitlement",
"recalculate-balance-snapshots"
]
},
]
}
46 changes: 46 additions & 0 deletions cmd/jobs/config/loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package config

import (
"errors"

"github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/openmeterio/openmeter/config"
)

func LoadConfig(fileName string) (config.Configuration, error) {
v, flags := viper.NewWithOptions(viper.WithDecodeHook(config.DecodeHook())), pflag.NewFlagSet("OpenMeter", pflag.ExitOnError)

config.SetViperDefaults(v, flags)
if fileName != "" {
v.SetConfigFile(fileName)
}

err := v.ReadInConfig()
if err != nil && !errors.As(err, &viper.ConfigFileNotFoundError{}) {
return config.Configuration{}, err
}

var conf config.Configuration
err = v.Unmarshal(&conf)
if err != nil {
return conf, err
}

return conf, conf.Validate()
}

var defaultConfig *config.Configuration

func GetConfig() (config.Configuration, error) {
if defaultConfig == nil {
return config.Configuration{}, errors.New("config not set")
}

return *defaultConfig, nil
}

func SetConfig(c config.Configuration) {
defaultConfig = &c
}
111 changes: 111 additions & 0 deletions cmd/jobs/entitlement/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package entitlement

import (
"context"
"fmt"
"log/slog"

"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/config"
"github.com/openmeterio/openmeter/internal/ent/db"
"github.com/openmeterio/openmeter/internal/meter"
"github.com/openmeterio/openmeter/internal/registry"
registrybuilder "github.com/openmeterio/openmeter/internal/registry/builder"
"github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector"
watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
"github.com/openmeterio/openmeter/internal/watermill/eventbus"
"github.com/openmeterio/openmeter/pkg/framework/entutils"
"github.com/openmeterio/openmeter/pkg/models"
"github.com/openmeterio/openmeter/pkg/slicesx"
)

type entitlementConnectors struct {
Registry *registry.Entitlement
EventBus eventbus.Publisher
Shutdown func()
}

func initEntitlements(ctx context.Context, conf config.Configuration, logger *slog.Logger, metricMeter metric.Meter, otelName string) (*entitlementConnectors, error) {
// Postgresql
entDriver, err := entutils.GetPGDriver(conf.Postgres.URL)
if err != nil {
return nil, fmt.Errorf("failed to init postgres driver: %w", err)
}

dbClient := db.NewClient(db.Driver(entDriver))

// Meter repository
meterRepository := meter.NewInMemoryRepository(slicesx.Map(conf.Meters, func(meter *models.Meter) models.Meter {
return *meter
}))

// streaming connector
clickHouseClient, err := clickhouse.Open(conf.Aggregation.ClickHouse.GetClientOptions())
if err != nil {
return nil, fmt.Errorf("failed to initialize clickhouse client: %w", err)
}

streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
Logger: logger,
ClickHouse: clickHouseClient,
Database: conf.Aggregation.ClickHouse.Database,
Meters: meterRepository,
CreateOrReplaceMeter: conf.Aggregation.CreateOrReplaceMeter,
PopulateMeter: conf.Aggregation.PopulateMeter,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse streaming: %w", err)
}

// event publishing
eventPublisherDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
Broker: watermillkafka.BrokerOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
},
})
if err != nil {
return nil, fmt.Errorf("failed to initialize event publisher driver: %w", err)
}

eventPublisher, err := eventbus.New(eventbus.Options{
Publisher: eventPublisherDriver,
Config: conf.Events,
Logger: logger,
MarshalerTransformFunc: watermillkafka.AddPartitionKeyFromSubject,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize event publisher: %w", err)
}

entitlementRegistry := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{
DatabaseClient: dbClient,
StreamingConnector: streamingConnector,
MeterRepository: meterRepository,
Logger: logger,
Publisher: eventPublisher,
})

return &entitlementConnectors{
Registry: entitlementRegistry,
EventBus: eventPublisher,
Shutdown: func() {
if err := dbClient.Close(); err != nil {
logger.Error("failed to close entitlement db client", "error", err)
}

if err := clickHouseClient.Close(); err != nil {
logger.Error("failed to close clickhouse client", "error", err)
}

if err := eventPublisherDriver.Close(); err != nil {
logger.Error("failed to close event publisher", "error", err)
}
},
}, nil
}
54 changes: 54 additions & 0 deletions cmd/jobs/entitlement/recalculatesnapshots.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package entitlement

import (
"log/slog"

"github.com/spf13/cobra"
"go.opentelemetry.io/otel"

"github.com/openmeterio/openmeter/cmd/jobs/config"
"github.com/openmeterio/openmeter/internal/entitlement/balanceworker"
)

const (
otelNameRecalculateBalanceSnapshot = "openmeter.io/jobs/entitlement/recalculate-balance-snapshots"
)

func NewRecalculateBalanceSnapshotsCommand() *cobra.Command {
return &cobra.Command{
Use: "recalculate-balance-snapshots",
Short: "Recalculate balance snapshots and send the resulting events into the eventbus",
RunE: func(cmd *cobra.Command, args []string) error {
conf, err := config.GetConfig()
if err != nil {
return err
}

logger := slog.Default()

entitlementConnectors, err := initEntitlements(
cmd.Context(),
conf,
logger,
otel.GetMeterProvider().Meter(otelNameRecalculateBalanceSnapshot),
otelNameRecalculateBalanceSnapshot,
)
if err != nil {
return err
}

defer entitlementConnectors.Shutdown()

recalculator, err := balanceworker.NewRecalculator(balanceworker.RecalculatorOptions{
Entitlement: entitlementConnectors.Registry,
Namespace: "default",
EventBus: entitlementConnectors.EventBus,
})
if err != nil {
return err
}

return recalculator.Recalculate(cmd.Context())
},
}
}
14 changes: 14 additions & 0 deletions cmd/jobs/entitlement/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package entitlement

import "github.com/spf13/cobra"

func RootCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "entitlement",
Short: "Entitlement related jobs",
}

cmd.AddCommand(NewRecalculateBalanceSnapshotsCommand())

return cmd
}
55 changes: 55 additions & 0 deletions cmd/jobs/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"log/slog"
"os"

"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/openmeterio/openmeter/cmd/jobs/config"
"github.com/openmeterio/openmeter/cmd/jobs/entitlement"
"github.com/openmeterio/openmeter/cmd/jobs/service"
)

const (
otelName = "openmeter.io/jobs"
)

func main() {
var telemetry *service.Telemetry

defer func() {
if telemetry != nil && telemetry.Shutdown != nil {
telemetry.Shutdown()
}
}()

rootCmd := cobra.Command{
Use: "jobs",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
conf, err := config.LoadConfig(cmd.Flag("config").Value.String())
if err != nil {
return err
}

config.SetConfig(conf)

telemetry, err = service.NewTelemetry(cmd.Context(), conf.Telemetry, conf.Environment, version, otelName)
return err
},
}

var configFileName string

rootCmd.PersistentFlags().StringVarP(&configFileName, "config", "", "config.yaml", "config file (default is config.yaml)")
_ = viper.BindPFlag("config", rootCmd.PersistentFlags().Lookup("config"))

rootCmd.AddCommand(versionCommand())
rootCmd.AddCommand(entitlement.RootCommand())

if err := rootCmd.Execute(); err != nil {
slog.Default().Error("failed to execute command", "error", err)
os.Exit(1)
}
}
Loading

0 comments on commit 26ca984

Please sign in to comment.