Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add command to recalculate balance snapshots #1323

Merged
merged 3 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@
"--telemetry-address",
":10003"
]
}
},
{
"name": "Launch recalculate entitlement snapshots",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/jobs",
"args": [
"--config",
"${workspaceFolder}/config.yaml",
"entitlement",
"recalculate-balance-snapshots"
]
},
]
}
46 changes: 46 additions & 0 deletions cmd/jobs/config/loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package config

import (
"errors"

"github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/openmeterio/openmeter/config"
)

func LoadConfig(fileName string) (config.Configuration, error) {
v, flags := viper.NewWithOptions(viper.WithDecodeHook(config.DecodeHook())), pflag.NewFlagSet("OpenMeter", pflag.ExitOnError)

config.SetViperDefaults(v, flags)
if fileName != "" {
v.SetConfigFile(fileName)
}

err := v.ReadInConfig()
if err != nil && !errors.As(err, &viper.ConfigFileNotFoundError{}) {
return config.Configuration{}, err
}

var conf config.Configuration
err = v.Unmarshal(&conf)
if err != nil {
return conf, err
}

return conf, conf.Validate()
}

var defaultConfig *config.Configuration

func GetConfig() (config.Configuration, error) {
if defaultConfig == nil {
return config.Configuration{}, errors.New("config not set")
}

return *defaultConfig, nil
}

func SetConfig(c config.Configuration) {
defaultConfig = &c
}
111 changes: 111 additions & 0 deletions cmd/jobs/entitlement/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package entitlement

import (
"context"
"fmt"
"log/slog"

"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/config"
"github.com/openmeterio/openmeter/internal/ent/db"
"github.com/openmeterio/openmeter/internal/meter"
"github.com/openmeterio/openmeter/internal/registry"
registrybuilder "github.com/openmeterio/openmeter/internal/registry/builder"
"github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector"
watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
"github.com/openmeterio/openmeter/internal/watermill/eventbus"
"github.com/openmeterio/openmeter/pkg/framework/entutils"
"github.com/openmeterio/openmeter/pkg/models"
"github.com/openmeterio/openmeter/pkg/slicesx"
)

type entitlementConnectors struct {
Registry *registry.Entitlement
EventBus eventbus.Publisher
Shutdown func()
}

func initEntitlements(ctx context.Context, conf config.Configuration, logger *slog.Logger, metricMeter metric.Meter, otelName string) (*entitlementConnectors, error) {
// Postgresql
entDriver, err := entutils.GetPGDriver(conf.Postgres.URL)
if err != nil {
return nil, fmt.Errorf("failed to init postgres driver: %w", err)
}

dbClient := db.NewClient(db.Driver(entDriver))

// Meter repository
meterRepository := meter.NewInMemoryRepository(slicesx.Map(conf.Meters, func(meter *models.Meter) models.Meter {
return *meter
}))

// streaming connector
clickHouseClient, err := clickhouse.Open(conf.Aggregation.ClickHouse.GetClientOptions())
if err != nil {
return nil, fmt.Errorf("failed to initialize clickhouse client: %w", err)
}

streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
Logger: logger,
ClickHouse: clickHouseClient,
Database: conf.Aggregation.ClickHouse.Database,
Meters: meterRepository,
CreateOrReplaceMeter: conf.Aggregation.CreateOrReplaceMeter,
PopulateMeter: conf.Aggregation.PopulateMeter,
})
if err != nil {
return nil, fmt.Errorf("init clickhouse streaming: %w", err)
}

// event publishing
eventPublisherDriver, err := watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
Broker: watermillkafka.BrokerOptions{
KafkaConfig: conf.Ingest.Kafka.KafkaConfiguration,
ClientID: otelName,
Logger: logger,
MetricMeter: metricMeter,
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
},
})
if err != nil {
return nil, fmt.Errorf("failed to initialize event publisher driver: %w", err)
}

eventPublisher, err := eventbus.New(eventbus.Options{
Publisher: eventPublisherDriver,
Config: conf.Events,
Logger: logger,
MarshalerTransformFunc: watermillkafka.AddPartitionKeyFromSubject,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize event publisher: %w", err)
}

entitlementRegistry := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{
DatabaseClient: dbClient,
StreamingConnector: streamingConnector,
MeterRepository: meterRepository,
Logger: logger,
Publisher: eventPublisher,
})

return &entitlementConnectors{
Registry: entitlementRegistry,
EventBus: eventPublisher,
Shutdown: func() {
if err := dbClient.Close(); err != nil {
logger.Error("failed to close entitlement db client", "error", err)
}

if err := clickHouseClient.Close(); err != nil {
logger.Error("failed to close clickhouse client", "error", err)
}

if err := eventPublisherDriver.Close(); err != nil {
logger.Error("failed to close event publisher", "error", err)
}
},
}, nil
}
54 changes: 54 additions & 0 deletions cmd/jobs/entitlement/recalculatesnapshots.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package entitlement

import (
"log/slog"

"github.com/spf13/cobra"
"go.opentelemetry.io/otel"

"github.com/openmeterio/openmeter/cmd/jobs/config"
"github.com/openmeterio/openmeter/internal/entitlement/balanceworker"
)

const (
otelNameRecalculateBalanceSnapshot = "openmeter.io/jobs/entitlement/recalculate-balance-snapshots"
)

func NewRecalculateBalanceSnapshotsCommand() *cobra.Command {
return &cobra.Command{
Use: "recalculate-balance-snapshots",
Short: "Recalculate balance snapshots and send the resulting events into the eventbus",
RunE: func(cmd *cobra.Command, args []string) error {
conf, err := config.GetConfig()
if err != nil {
return err
}

logger := slog.Default()

entitlementConnectors, err := initEntitlements(
cmd.Context(),
conf,
logger,
otel.GetMeterProvider().Meter(otelNameRecalculateBalanceSnapshot),
otelNameRecalculateBalanceSnapshot,
)
if err != nil {
return err
}

defer entitlementConnectors.Shutdown()

recalculator, err := balanceworker.NewRecalculator(balanceworker.RecalculatorOptions{
Entitlement: entitlementConnectors.Registry,
Namespace: "default",
EventBus: entitlementConnectors.EventBus,
})
if err != nil {
return err
}

return recalculator.Recalculate(cmd.Context())
},
}
}
14 changes: 14 additions & 0 deletions cmd/jobs/entitlement/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package entitlement

import "github.com/spf13/cobra"

func RootCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "entitlement",
Short: "Entitlement related jobs",
}

cmd.AddCommand(NewRecalculateBalanceSnapshotsCommand())

return cmd
}
55 changes: 55 additions & 0 deletions cmd/jobs/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"log/slog"
"os"

"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/openmeterio/openmeter/cmd/jobs/config"
"github.com/openmeterio/openmeter/cmd/jobs/entitlement"
"github.com/openmeterio/openmeter/cmd/jobs/service"
)

const (
otelName = "openmeter.io/jobs"
)

func main() {
var telemetry *service.Telemetry

defer func() {
if telemetry != nil && telemetry.Shutdown != nil {
telemetry.Shutdown()
}
}()

rootCmd := cobra.Command{
Use: "jobs",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
conf, err := config.LoadConfig(cmd.Flag("config").Value.String())
if err != nil {
return err
}

config.SetConfig(conf)

telemetry, err = service.NewTelemetry(cmd.Context(), conf.Telemetry, conf.Environment, version, otelName)
return err
},
}

var configFileName string

rootCmd.PersistentFlags().StringVarP(&configFileName, "config", "", "config.yaml", "config file (default is config.yaml)")
_ = viper.BindPFlag("config", rootCmd.PersistentFlags().Lookup("config"))

rootCmd.AddCommand(versionCommand())
rootCmd.AddCommand(entitlement.RootCommand())

if err := rootCmd.Execute(); err != nil {
slog.Default().Error("failed to execute command", "error", err)
os.Exit(1)
}
}
Loading
Loading