Skip to content

Commit

Permalink
Merge pull request #1278 from openmeterio/feat/add-entitlements-initi…
Browse files Browse the repository at this point in the history
…alization-v2

feat: add entitlements initialization - v2
  • Loading branch information
turip authored Jul 31, 2024
2 parents 4b2c692 + 0a40202 commit 8c95b0a
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 151 deletions.
113 changes: 101 additions & 12 deletions cmd/balance-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"syscall"
"time"

"entgo.io/ent/dialect/sql"
health "github.com/AppsFlyer/go-sundheit"
healthhttp "github.com/AppsFlyer/go-sundheit/http"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/IBM/sarama"
"github.com/ThreeDotsLabs/watermill"
wmkafka "github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka"
Expand All @@ -33,15 +35,22 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"

"github.com/openmeterio/openmeter/config"
"github.com/openmeterio/openmeter/internal/ent/db"
"github.com/openmeterio/openmeter/internal/entitlement/balanceworker"
"github.com/openmeterio/openmeter/internal/event/publisher"
"github.com/openmeterio/openmeter/internal/ingest/kafkaingest"
omwatermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
"github.com/openmeterio/openmeter/internal/meter"
"github.com/openmeterio/openmeter/internal/registry"
"github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector"
watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
"github.com/openmeterio/openmeter/pkg/contextx"
"github.com/openmeterio/openmeter/pkg/framework/entutils"
"github.com/openmeterio/openmeter/pkg/framework/operation"
"github.com/openmeterio/openmeter/pkg/gosundheit"
pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
kafkametrics "github.com/openmeterio/openmeter/pkg/kafka/metrics"
"github.com/openmeterio/openmeter/pkg/models"
"github.com/openmeterio/openmeter/pkg/slicesx"
)

const (
Expand Down Expand Up @@ -177,6 +186,43 @@ func main() {

var group run.Group

// Initialize the data sources (entitlements, productcatalog, etc.)
// Dependencies: meters
meterRepository := meter.NewInMemoryRepository(slicesx.Map(conf.Meters, func(meter *models.Meter) models.Meter {
return *meter
}))

// Dependencies: clickhouse
clickHouseClient, err := clickhouse.Open(conf.Aggregation.ClickHouse.GetClientOptions())
if err != nil {
logger.Error("failed to initialize clickhouse client", "error", err)
os.Exit(1)
}

// Dependencies: streamingConnector
clickhouseStreamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
Logger: logger,
ClickHouse: clickHouseClient,
Database: conf.Aggregation.ClickHouse.Database,
Meters: meterRepository,
CreateOrReplaceMeter: conf.Aggregation.CreateOrReplaceMeter,
PopulateMeter: conf.Aggregation.PopulateMeter,
})
if err != nil {
logger.Error("failed to initialize clickhouse aggregation", "error", err)
os.Exit(1)
}

// Dependencies: postgresql
pgClients, err := initPGClients(conf.Postgres)
if err != nil {
logger.Error("failed to initialize postgres clients", "error", err)
os.Exit(1)
}
defer pgClients.driver.Close()

logger.Info("Postgres clients initialized")

// Create subscriber
wmSubscriber, err := initKafkaSubscriber(conf, logger)
if err != nil {
Expand All @@ -191,22 +237,32 @@ func main() {
os.Exit(1)
}

wmPublisher, err := initEventPublisher(ctx, logger, conf, kafkaPublisher)
publishers, err := initEventPublisher(ctx, logger, conf, kafkaPublisher)
if err != nil {
logger.Error("failed to initialize event publisher", slog.String("error", err.Error()))
os.Exit(1)
}

// Dependencies: entitlement
entitlementConnectors := registry.GetEntitlementRegistry(registry.EntitlementOptions{
DatabaseClient: pgClients.client,
StreamingConnector: clickhouseStreamingConnector,
MeterRepository: meterRepository,
Logger: logger,
Publisher: publishers.eventPublisher.ForTopic(conf.Events.SystemEvents.Topic),
})

// Initialize worker
workerOptions := balanceworker.WorkerOptions{
SystemEventsTopic: conf.Events.SystemEvents.Topic,
// TODO: IngestEventsTopic
Subscriber: wmSubscriber,

TargetTopic: conf.Events.SystemEvents.Topic,
Publisher: wmPublisher.publisher,
Publisher: publishers.watermillPublisher,
Marshaler: publishers.marshaler,

Marshaler: wmPublisher.marshaler,
Entitlement: entitlementConnectors,

Logger: logger,
}
Expand Down Expand Up @@ -296,13 +352,14 @@ func initKafkaSubscriber(conf config.Configuration, logger *slog.Logger) (messag
return subscriber, nil
}

type eventPublisher struct {
publisher message.Publisher
marshaler publisher.CloudEventMarshaler
type eventPublishers struct {
watermillPublisher message.Publisher
marshaler publisher.CloudEventMarshaler
eventPublisher publisher.Publisher
}

func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (*eventPublisher, error) {
eventDriver := omwatermillkafka.NewPublisher(kafkaProducer)
func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, kafkaProducer *kafka.Producer) (*eventPublishers, error) {
eventDriver := watermillkafka.NewPublisher(kafkaProducer)

if conf.BalanceWorker.PoisionQueue.AutoProvision.Enabled {
adminClient, err := kafka.NewAdminClientFromProducer(kafkaProducer)
Expand All @@ -321,9 +378,18 @@ func initEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Co
}
}

return &eventPublisher{
publisher: eventDriver,
marshaler: publisher.NewCloudEventMarshaler(omwatermillkafka.AddPartitionKeyFromSubject),
eventPublisher, err := publisher.NewPublisher(publisher.PublisherOptions{
Publisher: eventDriver,
Transform: watermillkafka.AddPartitionKeyFromSubject,
})
if err != nil {
return nil, fmt.Errorf("failed to create event publisher: %w", err)
}

return &eventPublishers{
watermillPublisher: eventDriver,
marshaler: publisher.NewCloudEventMarshaler(watermillkafka.AddPartitionKeyFromSubject),
eventPublisher: eventPublisher,
}, nil
}

Expand All @@ -350,3 +416,26 @@ func initKafkaProducer(ctx context.Context, config config.Configuration, logger
slog.Debug("connected to Kafka")
return producer, nil
}

type pgClients struct {
driver *sql.Driver
client *db.Client
}

func initPGClients(config config.PostgresConfig) (
*pgClients,
error,
) {
if err := config.Validate(); err != nil {
return nil, fmt.Errorf("invalid postgres config: %w", err)
}
driver, err := entutils.GetPGDriver(config.URL)
if err != nil {
return nil, fmt.Errorf("failed to init postgres driver: %w", err)
}

return &pgClients{
driver: driver,
client: db.NewClient(db.Driver(driver)),
}, nil
}
123 changes: 17 additions & 106 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -35,15 +34,8 @@ import (
"go.opentelemetry.io/otel/trace"

"github.com/openmeterio/openmeter/config"
"github.com/openmeterio/openmeter/internal/credit"
creditpgadapter "github.com/openmeterio/openmeter/internal/credit/postgresadapter"
"github.com/openmeterio/openmeter/internal/debug"
"github.com/openmeterio/openmeter/internal/ent/db"
"github.com/openmeterio/openmeter/internal/entitlement"
booleanentitlement "github.com/openmeterio/openmeter/internal/entitlement/boolean"
meteredentitlement "github.com/openmeterio/openmeter/internal/entitlement/metered"
entitlementpgadapter "github.com/openmeterio/openmeter/internal/entitlement/postgresadapter"
staticentitlement "github.com/openmeterio/openmeter/internal/entitlement/static"
"github.com/openmeterio/openmeter/internal/event/publisher"
"github.com/openmeterio/openmeter/internal/ingest"
"github.com/openmeterio/openmeter/internal/ingest/ingestdriver"
Expand All @@ -52,8 +44,7 @@ import (
"github.com/openmeterio/openmeter/internal/meter"
"github.com/openmeterio/openmeter/internal/namespace"
"github.com/openmeterio/openmeter/internal/namespace/namespacedriver"
"github.com/openmeterio/openmeter/internal/productcatalog"
productcatalogpgadapter "github.com/openmeterio/openmeter/internal/productcatalog/postgresadapter"
"github.com/openmeterio/openmeter/internal/registry"
"github.com/openmeterio/openmeter/internal/server"
"github.com/openmeterio/openmeter/internal/server/authenticator"
"github.com/openmeterio/openmeter/internal/server/router"
Expand Down Expand Up @@ -208,7 +199,7 @@ func main() {
var namespaceHandlers []namespace.Handler

// Initialize ClickHouse Client
clickHouseClient, err := initClickHouseClient(conf)
clickHouseClient, err := clickhouse.Open(conf.Aggregation.ClickHouse.GetClientOptions())
if err != nil {
logger.Error("failed to initialize clickhouse client", "error", err)
os.Exit(1)
Expand Down Expand Up @@ -305,11 +296,7 @@ func main() {
}

debugConnector := debug.NewDebugConnector(streamingConnector)
var entitlementConnector entitlement.Connector
var meteredEntitlementConnector meteredentitlement.Connector
var featureConnector productcatalog.FeatureConnector
var creditGrantConnector credit.GrantConnector
var driver *entDialectSQL.Driver
entitlementConnRegistry := &registry.Entitlement{}

// Initialize Postgres
if conf.Entitlements.Enabled {
Expand All @@ -318,63 +305,17 @@ func main() {
logger.Error("failed to initialize postgres clients", "error", err)
os.Exit(1)
}
driver = pgClients.driver
logger.Info("Postgres clients initialized")

entitlementsTopicPublisher := eventPublisher.ForTopic(conf.Events.SystemEvents.Topic)

// db adapters
featureRepo := productcatalogpgadapter.NewPostgresFeatureRepo(pgClients.client, logger)
entitlementRepo := entitlementpgadapter.NewPostgresEntitlementRepo(pgClients.client)
usageResetRepo := entitlementpgadapter.NewPostgresUsageResetRepo(pgClients.client)
grantRepo := creditpgadapter.NewPostgresGrantRepo(pgClients.client)
balanceSnashotRepo := creditpgadapter.NewPostgresBalanceSnapshotRepo(pgClients.client)

// connectors
featureConnector = productcatalog.NewFeatureConnector(featureRepo, meterRepository)
entitlementOwnerConnector := meteredentitlement.NewEntitlementGrantOwnerAdapter(
featureRepo,
entitlementRepo,
usageResetRepo,
meterRepository,
logger,
)
creditBalanceConnector := credit.NewBalanceConnector(
grantRepo,
balanceSnashotRepo,
entitlementOwnerConnector,
streamingConnector,
logger,
)
creditGrantConnector = credit.NewGrantConnector(
entitlementOwnerConnector,
grantRepo,
balanceSnashotRepo,
time.Minute,
entitlementsTopicPublisher,
)
meteredEntitlementConnector = meteredentitlement.NewMeteredEntitlementConnector(
streamingConnector,
entitlementOwnerConnector,
creditBalanceConnector,
creditGrantConnector,
entitlementRepo,
entitlementsTopicPublisher,
)
staticEntitlementConnector := staticentitlement.NewStaticEntitlementConnector()
booleanEntitlementConnector := booleanentitlement.NewBooleanEntitlementConnector()
entitlementConnector = entitlement.NewEntitlementConnector(
entitlementRepo,
featureConnector,
meterRepository,
meteredEntitlementConnector,
staticEntitlementConnector,
booleanEntitlementConnector,
entitlementsTopicPublisher,
)
}

defer driver.Close()
defer pgClients.client.Close()

entitlementConnRegistry = registry.GetEntitlementRegistry(registry.EntitlementOptions{
DatabaseClient: pgClients.client,
StreamingConnector: streamingConnector,
MeterRepository: meterRepository,
Logger: logger,
Publisher: eventPublisher.ForTopic(conf.Events.SystemEvents.Topic),
})
}

s, err := server.NewServer(&server.Config{
RouterConfig: router.Config{
Expand All @@ -387,10 +328,10 @@ func main() {
ErrorHandler: errorsx.NewAppHandler(errorsx.NewSlogHandler(logger)),
// deps
DebugConnector: debugConnector,
FeatureConnector: featureConnector,
EntitlementConnector: entitlementConnector,
EntitlementBalanceConnector: meteredEntitlementConnector,
GrantConnector: creditGrantConnector,
FeatureConnector: entitlementConnRegistry.Feature,
EntitlementConnector: entitlementConnRegistry.Entitlement,
EntitlementBalanceConnector: entitlementConnRegistry.MeteredEntitlement,
GrantConnector: entitlementConnRegistry.Grant,
// modules
EntitlementsEnabled: conf.Entitlements.Enabled,
},
Expand Down Expand Up @@ -560,36 +501,6 @@ func initKafkaIngest(producer *kafka.Producer, config config.Configuration, logg
return collector, namespaceHandler, nil
}

func initClickHouseClient(config config.Configuration) (clickhouse.Conn, error) {
options := &clickhouse.Options{
Addr: []string{config.Aggregation.ClickHouse.Address},
Auth: clickhouse.Auth{
Database: config.Aggregation.ClickHouse.Database,
Username: config.Aggregation.ClickHouse.Username,
Password: config.Aggregation.ClickHouse.Password,
},
DialTimeout: time.Duration(10) * time.Second,
MaxOpenConns: 5,
MaxIdleConns: 5,
ConnMaxLifetime: time.Duration(10) * time.Minute,
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
BlockBufferSize: 10,
}
// This minimal TLS.Config is normally sufficient to connect to the secure native port (normally 9440) on a ClickHouse server.
// See: https://clickhouse.com/docs/en/integrations/go#using-tls
if config.Aggregation.ClickHouse.TLS {
options.TLS = &tls.Config{}
}

// Initialize ClickHouse
clickHouseClient, err := clickhouse.Open(options)
if err != nil {
return nil, fmt.Errorf("init clickhouse client: %w", err)
}

return clickHouseClient, nil
}

func initClickHouseStreaming(config config.Configuration, clickHouseClient clickhouse.Conn, meterRepository meter.Repository, logger *slog.Logger) (*clickhouse_connector.ClickhouseConnector, error) {
streamingConnector, err := clickhouse_connector.NewClickhouseConnector(clickhouse_connector.ClickhouseConnectorConfig{
Logger: logger,
Expand Down
Loading

0 comments on commit 8c95b0a

Please sign in to comment.