diff --git a/cmd/balance-worker/main.go b/cmd/balance-worker/main.go index a77ffa2cb..b5e38d993 100644 --- a/cmd/balance-worker/main.go +++ b/cmd/balance-worker/main.go @@ -11,8 +11,10 @@ import ( "syscall" "time" + "entgo.io/ent/dialect/sql" health "github.com/AppsFlyer/go-sundheit" healthhttp "github.com/AppsFlyer/go-sundheit/http" + "github.com/ClickHouse/clickhouse-go/v2" "github.com/IBM/sarama" "github.com/ThreeDotsLabs/watermill" wmkafka "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka" @@ -33,15 +35,22 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "github.com/openmeterio/openmeter/config" + "github.com/openmeterio/openmeter/internal/ent/db" "github.com/openmeterio/openmeter/internal/entitlement/balanceworker" "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/ingest/kafkaingest" - omwatermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" + "github.com/openmeterio/openmeter/internal/meter" + "github.com/openmeterio/openmeter/internal/registry" + "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" + watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka" "github.com/openmeterio/openmeter/pkg/contextx" + "github.com/openmeterio/openmeter/pkg/framework/entutils" "github.com/openmeterio/openmeter/pkg/framework/operation" "github.com/openmeterio/openmeter/pkg/gosundheit" pkgkafka "github.com/openmeterio/openmeter/pkg/kafka" kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics" + "github.com/openmeterio/openmeter/pkg/models" + "github.com/openmeterio/openmeter/pkg/slicesx" ) const ( @@ -177,6 +186,43 @@ func main() { var group run.Group + // Initialize the data sources (entitlements, productcatalog, etc.) + // Dependencies: meters + meterRepository := meter.NewInMemoryRepository(slicesx.Map(conf.Meters, func(meter *models.Meter) models.Meter { + return *meter + })) + + // Dependencies: clickhouse + clickHouseClient, err := clickhouse.Open(conf.Aggregation.ClickHouse.GetClientOptions()) + if err != nil { + logger.Error("failed to initialize clickhouse client", "error", err) + os.Exit(1) + } + + // Dependencies: streamingConnector + clickhouseStreamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{ + Logger: logger, + ClickHouse: clickHouseClient, + Database: conf.Aggregation.ClickHouse.Database, + Meters: meterRepository, + CreateOrReplaceMeter: conf.Aggregation.CreateOrReplaceMeter, + PopulateMeter: conf.Aggregation.PopulateMeter, + }) + if err != nil { + logger.Error("failed to initialize clickhouse aggregation", "error", err) + os.Exit(1) + } + + // Dependencies: postgresql + pgClients, err := initPGClients(conf.Postgres) + if err != nil { + logger.Error("failed to initialize postgres clients", "error", err) + os.Exit(1) + } + defer pgClients.driver.Close() + + logger.Info("Postgres clients initialized") + // Create subscriber wmSubscriber, err := initKafkaSubscriber(conf, logger) if err != nil { @@ -191,12 +237,21 @@ func main() { os.Exit(1) } - wmPublisher, err := initEventPublisher(ctx, logger, conf, kafkaPublisher) + publishers, err := initEventPublisher(ctx, logger, conf, kafkaPublisher) if err != nil { logger.Error("failed to initialize event publisher", slog.String("error", err.Error())) os.Exit(1) } + // Dependencies: entitlement + entitlementConnectors := registry.GetEntitlementRegistry(registry.EntitlementOptions{ + DatabaseClient: pgClients.client, + StreamingConnector: clickhouseStreamingConnector, + MeterRepository: meterRepository, + Logger: logger, + Publisher: publishers.eventPublisher.ForTopic(conf.Events.SystemEvents.Topic), + }) + // Initialize worker workerOptions := balanceworker.WorkerOptions{ SystemEventsTopic: conf.Events.SystemEvents.Topic, @@ -204,9 +259,10 @@ func main() { Subscriber: wmSubscriber, TargetTopic: conf.Events.SystemEvents.Topic, - Publisher: wmPublisher.publisher, + Publisher: publishers.watermillPublisher, + Marshaler: publishers.marshaler, - Marshaler: wmPublisher.marshaler, + Entitlement: entitlementConnectors, Logger: logger, } @@ -296,13 +352,14 @@ func initKafkaSubscriber(conf config.Configuration, logger *slog.Logger) (messag return subscriber, nil } -type eventPublisher struct { - publisher message.Publisher - marshaler publisher.CloudEventMarshaler +type eventPublishers struct { + watermillPublisher message.Publisher + marshaler publisher.CloudEventMarshaler + eventPublisher publisher.Publisher } -func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (*eventPublisher, error) { - eventDriver := omwatermillkafka.NewPublisher(kafkaProducer) +func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (*eventPublishers, error) { + eventDriver := watermillkafka.NewPublisher(kafkaProducer) if conf.BalanceWorker.PoisionQueue.AutoProvision.Enabled { adminClient, err := kafka.NewAdminClientFromProducer(kafkaProducer) @@ -321,9 +378,18 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co } } - return &eventPublisher{ - publisher: eventDriver, - marshaler: publisher.NewCloudEventMarshaler(omwatermillkafka.AddPartitionKeyFromSubject), + eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{ + Publisher: eventDriver, + Transform: watermillkafka.AddPartitionKeyFromSubject, + }) + if err != nil { + return nil, fmt.Errorf("failed to create event publisher: %w", err) + } + + return &eventPublishers{ + watermillPublisher: eventDriver, + marshaler: publisher.NewCloudEventMarshaler(watermillkafka.AddPartitionKeyFromSubject), + eventPublisher: eventPublisher, }, nil } @@ -350,3 +416,26 @@ func initKafkaProducer(ctx context.Context, config config.Configuration, logger slog.Debug("connected to Kafka") return producer, nil } + +type pgClients struct { + driver *sql.Driver + client *db.Client +} + +func initPGClients(config config.PostgresConfig) ( + *pgClients, + error, +) { + if err := config.Validate(); err != nil { + return nil, fmt.Errorf("invalid postgres config: %w", err) + } + driver, err := entutils.GetPGDriver(config.URL) + if err != nil { + return nil, fmt.Errorf("failed to init postgres driver: %w", err) + } + + return &pgClients{ + driver: driver, + client: db.NewClient(db.Driver(driver)), + }, nil +} diff --git a/cmd/server/main.go b/cmd/server/main.go index eb3bcf773..b955974b7 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "crypto/tls" "encoding/json" "errors" "fmt" @@ -35,15 +34,8 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/openmeterio/openmeter/config" - "github.com/openmeterio/openmeter/internal/credit" - creditpgadapter "github.com/openmeterio/openmeter/internal/credit/postgresadapter" "github.com/openmeterio/openmeter/internal/debug" "github.com/openmeterio/openmeter/internal/ent/db" - "github.com/openmeterio/openmeter/internal/entitlement" - booleanentitlement "github.com/openmeterio/openmeter/internal/entitlement/boolean" - meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered" - entitlementpgadapter "github.com/openmeterio/openmeter/internal/entitlement/postgresadapter" - staticentitlement "github.com/openmeterio/openmeter/internal/entitlement/static" "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/ingest" "github.com/openmeterio/openmeter/internal/ingest/ingestdriver" @@ -52,8 +44,7 @@ import ( "github.com/openmeterio/openmeter/internal/meter" "github.com/openmeterio/openmeter/internal/namespace" "github.com/openmeterio/openmeter/internal/namespace/namespacedriver" - "github.com/openmeterio/openmeter/internal/productcatalog" - productcatalogpgadapter "github.com/openmeterio/openmeter/internal/productcatalog/postgresadapter" + "github.com/openmeterio/openmeter/internal/registry" "github.com/openmeterio/openmeter/internal/server" "github.com/openmeterio/openmeter/internal/server/authenticator" "github.com/openmeterio/openmeter/internal/server/router" @@ -208,7 +199,7 @@ func main() { var namespaceHandlers []namespace.Handler // Initialize ClickHouse Client - clickHouseClient, err := initClickHouseClient(conf) + clickHouseClient, err := clickhouse.Open(conf.Aggregation.ClickHouse.GetClientOptions()) if err != nil { logger.Error("failed to initialize clickhouse client", "error", err) os.Exit(1) @@ -305,11 +296,7 @@ func main() { } debugConnector := debug.NewDebugConnector(streamingConnector) - var entitlementConnector entitlement.Connector - var meteredEntitlementConnector meteredentitlement.Connector - var featureConnector productcatalog.FeatureConnector - var creditGrantConnector credit.GrantConnector - var driver *entDialectSQL.Driver + entitlementConnRegistry := ®istry.Entitlement{} // Initialize Postgres if conf.Entitlements.Enabled { @@ -318,63 +305,17 @@ func main() { logger.Error("failed to initialize postgres clients", "error", err) os.Exit(1) } - driver = pgClients.driver - logger.Info("Postgres clients initialized") - - entitlementsTopicPublisher := eventPublisher.ForTopic(conf.Events.SystemEvents.Topic) - - // db adapters - featureRepo := productcatalogpgadapter.NewPostgresFeatureRepo(pgClients.client, logger) - entitlementRepo := entitlementpgadapter.NewPostgresEntitlementRepo(pgClients.client) - usageResetRepo := entitlementpgadapter.NewPostgresUsageResetRepo(pgClients.client) - grantRepo := creditpgadapter.NewPostgresGrantRepo(pgClients.client) - balanceSnashotRepo := creditpgadapter.NewPostgresBalanceSnapshotRepo(pgClients.client) - - // connectors - featureConnector = productcatalog.NewFeatureConnector(featureRepo, meterRepository) - entitlementOwnerConnector := meteredentitlement.NewEntitlementGrantOwnerAdapter( - featureRepo, - entitlementRepo, - usageResetRepo, - meterRepository, - logger, - ) - creditBalanceConnector := credit.NewBalanceConnector( - grantRepo, - balanceSnashotRepo, - entitlementOwnerConnector, - streamingConnector, - logger, - ) - creditGrantConnector = credit.NewGrantConnector( - entitlementOwnerConnector, - grantRepo, - balanceSnashotRepo, - time.Minute, - entitlementsTopicPublisher, - ) - meteredEntitlementConnector = meteredentitlement.NewMeteredEntitlementConnector( - streamingConnector, - entitlementOwnerConnector, - creditBalanceConnector, - creditGrantConnector, - entitlementRepo, - entitlementsTopicPublisher, - ) - staticEntitlementConnector := staticentitlement.NewStaticEntitlementConnector() - booleanEntitlementConnector := booleanentitlement.NewBooleanEntitlementConnector() - entitlementConnector = entitlement.NewEntitlementConnector( - entitlementRepo, - featureConnector, - meterRepository, - meteredEntitlementConnector, - staticEntitlementConnector, - booleanEntitlementConnector, - entitlementsTopicPublisher, - ) - } - defer driver.Close() + defer pgClients.client.Close() + + entitlementConnRegistry = registry.GetEntitlementRegistry(registry.EntitlementOptions{ + DatabaseClient: pgClients.client, + StreamingConnector: streamingConnector, + MeterRepository: meterRepository, + Logger: logger, + Publisher: eventPublisher.ForTopic(conf.Events.SystemEvents.Topic), + }) + } s, err := server.NewServer(&server.Config{ RouterConfig: router.Config{ @@ -387,10 +328,10 @@ func main() { ErrorHandler: errorsx.NewAppHandler(errorsx.NewSlogHandler(logger)), // deps DebugConnector: debugConnector, - FeatureConnector: featureConnector, - EntitlementConnector: entitlementConnector, - EntitlementBalanceConnector: meteredEntitlementConnector, - GrantConnector: creditGrantConnector, + FeatureConnector: entitlementConnRegistry.Feature, + EntitlementConnector: entitlementConnRegistry.Entitlement, + EntitlementBalanceConnector: entitlementConnRegistry.MeteredEntitlement, + GrantConnector: entitlementConnRegistry.Grant, // modules EntitlementsEnabled: conf.Entitlements.Enabled, }, @@ -560,36 +501,6 @@ func initKafkaIngest(producer *kafka.Producer, config config.Configuration, logg return collector, namespaceHandler, nil } -func initClickHouseClient(config config.Configuration) (clickhouse.Conn, error) { - options := &clickhouse.Options{ - Addr: []string{config.Aggregation.ClickHouse.Address}, - Auth: clickhouse.Auth{ - Database: config.Aggregation.ClickHouse.Database, - Username: config.Aggregation.ClickHouse.Username, - Password: config.Aggregation.ClickHouse.Password, - }, - DialTimeout: time.Duration(10) * time.Second, - MaxOpenConns: 5, - MaxIdleConns: 5, - ConnMaxLifetime: time.Duration(10) * time.Minute, - ConnOpenStrategy: clickhouse.ConnOpenInOrder, - BlockBufferSize: 10, - } - // This minimal TLS.Config is normally sufficient to connect to the secure native port (normally 9440) on a ClickHouse server. - // See: https://clickhouse.com/docs/en/integrations/go#using-tls - if config.Aggregation.ClickHouse.TLS { - options.TLS = &tls.Config{} - } - - // Initialize ClickHouse - clickHouseClient, err := clickhouse.Open(options) - if err != nil { - return nil, fmt.Errorf("init clickhouse client: %w", err) - } - - return clickHouseClient, nil -} - func initClickHouseStreaming(config config.Configuration, clickHouseClient clickhouse.Conn, meterRepository meter.Repository, logger *slog.Logger) (*clickhouse_connector.ClickhouseConnector, error) { streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{ Logger: logger, diff --git a/cmd/sink-worker/main.go b/cmd/sink-worker/main.go index 6b675eed1..dffe5f75d 100644 --- a/cmd/sink-worker/main.go +++ b/cmd/sink-worker/main.go @@ -2,14 +2,12 @@ package main import ( "context" - "crypto/tls" "errors" "fmt" "net" "net/http" "os" "syscall" - "time" health "github.com/AppsFlyer/go-sundheit" healthhttp "github.com/AppsFlyer/go-sundheit/http" @@ -230,36 +228,6 @@ func main() { } } -func initClickHouseClient(config config.Configuration) (clickhouse.Conn, error) { - options := &clickhouse.Options{ - Addr: []string{config.Aggregation.ClickHouse.Address}, - Auth: clickhouse.Auth{ - Database: config.Aggregation.ClickHouse.Database, - Username: config.Aggregation.ClickHouse.Username, - Password: config.Aggregation.ClickHouse.Password, - }, - DialTimeout: time.Duration(10) * time.Second, - MaxOpenConns: 5, - MaxIdleConns: 5, - ConnMaxLifetime: time.Duration(10) * time.Minute, - ConnOpenStrategy: clickhouse.ConnOpenInOrder, - BlockBufferSize: 10, - } - // This minimal TLS.Config is normally sufficient to connect to the secure native port (normally 9440) on a ClickHouse server. - // See: https://clickhouse.com/docs/en/integrations/go#using-tls - if config.Aggregation.ClickHouse.TLS { - options.TLS = &tls.Config{} - } - - // Initialize ClickHouse - clickHouseClient, err := clickhouse.Open(options) - if err != nil { - return nil, fmt.Errorf("init clickhouse client: %w", err) - } - - return clickHouseClient, nil -} - func initKafkaProducer(ctx context.Context, config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, group *run.Group) (*kafka.Producer, error) { // Initialize Kafka Admin Client kafkaConfig := config.Ingest.Kafka.CreateKafkaConfig() @@ -331,7 +299,7 @@ func initIngestEventPublisher(ctx context.Context, logger *slog.Logger, conf con } func initSink(config config.Configuration, logger *slog.Logger, metricMeter metric.Meter, tracer trace.Tracer, meterRepository meter.Repository, flushHandler flushhandler.FlushEventHandler) (*sink.Sink, error) { - clickhouseClient, err := initClickHouseClient(config) + clickhouseClient, err := clickhouse.Open(config.Aggregation.ClickHouse.GetClientOptions()) if err != nil { return nil, fmt.Errorf("init clickhouse client: %w", err) } diff --git a/config/aggregation.go b/config/aggregation.go index f4982aebe..2829e2aa7 100644 --- a/config/aggregation.go +++ b/config/aggregation.go @@ -1,9 +1,12 @@ package config import ( + "crypto/tls" "errors" "fmt" + "time" + "github.com/ClickHouse/clickhouse-go/v2" "github.com/spf13/viper" ) @@ -42,6 +45,30 @@ func (c ClickHouseAggregationConfiguration) Validate() error { return nil } +func (c ClickHouseAggregationConfiguration) GetClientOptions() *clickhouse.Options { + options := &clickhouse.Options{ + Addr: []string{c.Address}, + Auth: clickhouse.Auth{ + Database: c.Database, + Username: c.Username, + Password: c.Password, + }, + DialTimeout: time.Duration(10) * time.Second, + MaxOpenConns: 5, + MaxIdleConns: 5, + ConnMaxLifetime: time.Duration(10) * time.Minute, + ConnOpenStrategy: clickhouse.ConnOpenInOrder, + BlockBufferSize: 10, + } + // This minimal TLS.Config is normally sufficient to connect to the secure native port (normally 9440) on a ClickHouse server. + // See: https://clickhouse.com/docs/en/integrations/go#using-tls + if c.TLS { + options.TLS = &tls.Config{} + } + + return options +} + // ConfigureAggregation configures some defaults in the Viper instance. func ConfigureAggregation(v *viper.Viper) { v.SetDefault("aggregation.clickhouse.address", "127.0.0.1:9000") diff --git a/internal/entitlement/balanceworker/worker.go b/internal/entitlement/balanceworker/worker.go index a3b981d96..b300a4c55 100644 --- a/internal/entitlement/balanceworker/worker.go +++ b/internal/entitlement/balanceworker/worker.go @@ -14,6 +14,7 @@ import ( "github.com/openmeterio/openmeter/internal/entitlement" "github.com/openmeterio/openmeter/internal/event/publisher" "github.com/openmeterio/openmeter/internal/event/spec" + "github.com/openmeterio/openmeter/internal/registry" ) type WorkerOptions struct { @@ -27,6 +28,8 @@ type WorkerOptions struct { Marshaler publisher.CloudEventMarshaler + Entitlement *registry.Entitlement + Logger *slog.Logger } diff --git a/internal/registry/entitlement.go b/internal/registry/entitlement.go new file mode 100644 index 000000000..418376cef --- /dev/null +++ b/internal/registry/entitlement.go @@ -0,0 +1,96 @@ +package registry + +import ( + "log/slog" + "time" + + "github.com/openmeterio/openmeter/internal/ent/db" + "github.com/openmeterio/openmeter/internal/meter" + "github.com/openmeterio/openmeter/openmeter/credit" + creditpgadapter "github.com/openmeterio/openmeter/openmeter/credit/postgresadapter" + "github.com/openmeterio/openmeter/openmeter/entitlement" + booleanentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/boolean" + meteredentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/metered" + entitlementpgadapter "github.com/openmeterio/openmeter/openmeter/entitlement/postgresadapter" + staticentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/static" + "github.com/openmeterio/openmeter/openmeter/event/publisher" + "github.com/openmeterio/openmeter/openmeter/productcatalog" + productcatalogpgadapter "github.com/openmeterio/openmeter/openmeter/productcatalog/postgresadapter" + "github.com/openmeterio/openmeter/openmeter/streaming" +) + +type Entitlement struct { + Feature productcatalog.FeatureConnector + EntitlementOwner credit.OwnerConnector + CreditBalance credit.BalanceConnector + Grant credit.GrantConnector + MeteredEntitlement meteredentitlement.Connector + Entitlement entitlement.EntitlementConnector +} + +type EntitlementOptions struct { + DatabaseClient *db.Client + StreamingConnector streaming.Connector + Logger *slog.Logger + MeterRepository meter.Repository + Publisher publisher.TopicPublisher +} + +func GetEntitlementRegistry(opts EntitlementOptions) *Entitlement { + // Initialize database adapters + featureDBAdapter := productcatalogpgadapter.NewPostgresFeatureDBAdapter(opts.DatabaseClient, opts.Logger) + entitlementDBAdapter := entitlementpgadapter.NewPostgresEntitlementDBAdapter(opts.DatabaseClient) + usageResetDBAdapter := entitlementpgadapter.NewPostgresUsageResetDBAdapter(opts.DatabaseClient) + grantDBAdapter := creditpgadapter.NewPostgresGrantDBAdapter(opts.DatabaseClient) + balanceSnashotDBAdapter := creditpgadapter.NewPostgresBalanceSnapshotDBAdapter(opts.DatabaseClient) + + // Initialize connectors + featureConnector := productcatalog.NewFeatureConnector(featureDBAdapter, opts.MeterRepository) + entitlementOwnerConnector := meteredentitlement.NewEntitlementGrantOwnerAdapter( + featureDBAdapter, + entitlementDBAdapter, + usageResetDBAdapter, + opts.MeterRepository, + opts.Logger, + ) + creditBalanceConnector := credit.NewBalanceConnector( + grantDBAdapter, + balanceSnashotDBAdapter, + entitlementOwnerConnector, + opts.StreamingConnector, + opts.Logger, + ) + grantConnector := credit.NewGrantConnector( + entitlementOwnerConnector, + grantDBAdapter, + balanceSnashotDBAdapter, + time.Minute, + opts.Publisher, + ) + meteredEntitlementConnector := meteredentitlement.NewMeteredEntitlementConnector( + opts.StreamingConnector, + entitlementOwnerConnector, + creditBalanceConnector, + grantConnector, + entitlementDBAdapter, + opts.Publisher, + ) + entitlementConnector := entitlement.NewEntitlementConnector( + entitlementDBAdapter, + featureConnector, + opts.MeterRepository, + meteredEntitlementConnector, + staticentitlement.NewStaticEntitlementConnector(), + booleanentitlement.NewBooleanEntitlementConnector(), + opts.Publisher, + ) + + return &Entitlement{ + Feature: featureConnector, + EntitlementOwner: entitlementOwnerConnector, + CreditBalance: creditBalanceConnector, + Grant: grantConnector, + MeteredEntitlement: meteredEntitlementConnector, + Entitlement: entitlementConnector, + } +}