Skip to content

Commit

Permalink
Merge pull request #1600 from openmeterio/feat/kafka-topic-provisoner
Browse files Browse the repository at this point in the history
feat: kafka topic provisoner
  • Loading branch information
chrisgacsal authored Oct 9, 2024
2 parents 29897de + 9132428 commit a32a84a
Show file tree
Hide file tree
Showing 19 changed files with 483 additions and 202 deletions.
53 changes: 44 additions & 9 deletions cmd/balance-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"syscall"

"github.com/ThreeDotsLabs/watermill/message"
confluentkafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/oklog/run"
"github.com/spf13/pflag"
"github.com/spf13/viper"
Expand All @@ -26,6 +27,7 @@ import (
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
"github.com/openmeterio/openmeter/openmeter/watermill/router"
pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
)

const (
Expand Down Expand Up @@ -123,8 +125,15 @@ func main() {
os.Exit(1)
}

// Initialize Kafka Topic Provisioner
topicProvisioner, err := initTopicProvisioner(conf, logger, app.Meter)
if err != nil {
logger.Error("failed to initialize kafka topic provisioner", "error", err)
os.Exit(1)
}

// Create publisher
eventPublisherDriver, err := initEventPublisherDriver(ctx, wmBrokerConfig, conf)
eventPublisherDriver, err := initEventPublisherDriver(ctx, wmBrokerConfig, conf, topicProvisioner)
if err != nil {
logger.Error("failed to initialize event publisher", slog.String("error", err.Error()))
os.Exit(1)
Expand Down Expand Up @@ -225,18 +234,44 @@ func wmBrokerConfiguration(conf config.Configuration, logger *slog.Logger, metri
}
}

func initEventPublisherDriver(ctx context.Context, broker watermillkafka.BrokerOptions, conf config.Configuration) (message.Publisher, error) {
provisionTopics := []watermillkafka.AutoProvisionTopic{}
func initEventPublisherDriver(ctx context.Context, broker watermillkafka.BrokerOptions, conf config.Configuration, topicProvisioner pkgkafka.TopicProvisioner) (message.Publisher, error) {
var provisionTopics []pkgkafka.TopicConfig
if conf.BalanceWorker.DLQ.AutoProvision.Enabled {
provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{
Topic: conf.BalanceWorker.DLQ.Topic,
NumPartitions: int32(conf.BalanceWorker.DLQ.AutoProvision.Partitions),
Retention: conf.BalanceWorker.DLQ.AutoProvision.Retention,
provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{
Name: conf.BalanceWorker.DLQ.Topic,
Partitions: conf.BalanceWorker.DLQ.AutoProvision.Partitions,
RetentionTime: pkgkafka.TimeDurationMilliSeconds(conf.BalanceWorker.DLQ.AutoProvision.Retention),
})
}

return watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
Broker: broker,
ProvisionTopics: provisionTopics,
Broker: broker,
ProvisionTopics: provisionTopics,
TopicProvisioner: topicProvisioner,
})
}

func initTopicProvisioner(conf config.Configuration, logger *slog.Logger, meter metric.Meter) (pkgkafka.TopicProvisioner, error) {
kafkaConfigMap := conf.Ingest.Kafka.CreateKafkaConfig()
// NOTE(chrisgacsal): remove 'go.logs.channel.enable' configuration parameter as it is not supported by AdminClient
// and initializing the client fails if this parameter is set.
delete(kafkaConfigMap, "go.logs.channel.enable")

adminClient, err := confluentkafka.NewAdminClient(&kafkaConfigMap)
if err != nil {
return nil, fmt.Errorf("failed to initialize Kafka admin client: %w", err)
}

topicProvisioner, err := pkgkafka.NewTopicProvisioner(pkgkafka.TopicProvisionerConfig{
AdminClient: adminClient,
Logger: logger,
Meter: meter,
CacheSize: conf.Ingest.CacheSize,
CacheTTL: conf.Ingest.CacheTTL,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize topic provisioner: %w", err)
}

return topicProvisioner, nil
}
2 changes: 1 addition & 1 deletion cmd/notification-service/.air.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ testdata_dir = "testdata"
tmp_dir = "tmp"

[build]
args_bin = ["--config", "./config.yaml", "--telemetry-address", ":10002"]
args_bin = ["--config", "./config.yaml", "--telemetry-address", ":10003"]
bin = "./tmp/openmeter-notification-service"
cmd = "go build -tags dynamic -o ./tmp/openmeter-notification-service ./cmd/notification-service"
delay = 0
Expand Down
53 changes: 44 additions & 9 deletions cmd/notification-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"syscall"

"github.com/ThreeDotsLabs/watermill/message"
confluentkafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/oklog/run"
"github.com/spf13/pflag"
"github.com/spf13/viper"
Expand All @@ -28,6 +29,7 @@ import (
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
"github.com/openmeterio/openmeter/openmeter/watermill/router"
pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
)

const (
Expand Down Expand Up @@ -124,8 +126,15 @@ func main() {
os.Exit(1)
}

// Initialize Kafka Topic Provisioner
topicProvisioner, err := initTopicProvisioner(conf, logger, app.Meter)
if err != nil {
logger.Error("failed to initialize kafka topic provisioner", "error", err)
os.Exit(1)
}

// Create publisher
eventPublisherDriver, err := initEventPublisherDriver(ctx, logger, conf, app.Meter)
eventPublisherDriver, err := initEventPublisherDriver(ctx, logger, conf, app.Meter, topicProvisioner)
if err != nil {
logger.Error("failed to initialize event publisher", slog.String("error", err.Error()))
os.Exit(1)
Expand Down Expand Up @@ -254,18 +263,44 @@ func wmBrokerConfiguration(conf config.Configuration, logger *slog.Logger, metri
}
}

func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (message.Publisher, error) {
provisionTopics := []watermillkafka.AutoProvisionTopic{}
func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter, topicProvisioner pkgkafka.TopicProvisioner) (message.Publisher, error) {
var provisionTopics []pkgkafka.TopicConfig
if conf.Notification.Consumer.DLQ.AutoProvision.Enabled {
provisionTopics = append(provisionTopics, watermillkafka.AutoProvisionTopic{
Topic: conf.Notification.Consumer.DLQ.Topic,
NumPartitions: int32(conf.Notification.Consumer.DLQ.AutoProvision.Partitions),
Retention: conf.BalanceWorker.DLQ.AutoProvision.Retention,
provisionTopics = append(provisionTopics, pkgkafka.TopicConfig{
Name: conf.Notification.Consumer.DLQ.Topic,
Partitions: conf.Notification.Consumer.DLQ.AutoProvision.Partitions,
RetentionTime: pkgkafka.TimeDurationMilliSeconds(conf.Notification.Consumer.DLQ.AutoProvision.Retention),
})
}

return watermillkafka.NewPublisher(ctx, watermillkafka.PublisherOptions{
Broker: wmBrokerConfiguration(conf, logger, metricMeter),
ProvisionTopics: provisionTopics,
Broker: wmBrokerConfiguration(conf, logger, metricMeter),
ProvisionTopics: provisionTopics,
TopicProvisioner: topicProvisioner,
})
}

func initTopicProvisioner(conf config.Configuration, logger *slog.Logger, meter metric.Meter) (pkgkafka.TopicProvisioner, error) {
kafkaConfigMap := conf.Ingest.Kafka.CreateKafkaConfig()
// NOTE(chrisgacsal): remove 'go.logs.channel.enable' configuration parameter as it is not supported by AdminClient
// and initializing the client fails if this parameter is set.
delete(kafkaConfigMap, "go.logs.channel.enable")

adminClient, err := confluentkafka.NewAdminClient(&kafkaConfigMap)
if err != nil {
return nil, fmt.Errorf("failed to initialize Kafka admin client: %w", err)
}

topicProvisioner, err := pkgkafka.NewTopicProvisioner(pkgkafka.TopicProvisionerConfig{
AdminClient: adminClient,
Logger: logger,
Meter: meter,
CacheSize: conf.Ingest.CacheSize,
CacheTTL: conf.Ingest.CacheTTL,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize topic provisioner: %w", err)
}

return topicProvisioner, nil
}
1 change: 1 addition & 0 deletions cmd/server/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func initializeApplication(ctx context.Context, conf config.Configuration, logge
wire.Value(app.WatermillClientID(otelName)),
wire.Struct(new(Application), "*"),
)

return Application{}, nil, nil
}

Expand Down
14 changes: 12 additions & 2 deletions cmd/server/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 38 additions & 8 deletions cmd/sink-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"syscall"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
confluentkafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/oklog/run"
"github.com/sagikazarmark/slog-shim"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -107,8 +107,15 @@ func main() {

var group run.Group

// Initialize Kafka Topic Provisioner
topicProvisioner, err := initTopicProvisioner(conf, logger, app.Meter)
if err != nil {
logger.Error("failed to initialize kafka topic provisioner", "error", err)
os.Exit(1)
}

// initialize system event producer
ingestEventFlushHandler, err := initIngestEventPublisher(ctx, logger, conf, app.Meter)
ingestEventFlushHandler, err := initIngestEventPublisher(ctx, logger, conf, app.Meter, topicProvisioner)
if err != nil {
logger.Error("failed to initialize event publisher", "error", err)
os.Exit(1)
Expand Down Expand Up @@ -154,7 +161,7 @@ func main() {
}
}

func initIngestEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter) (flushhandler.FlushEventHandler, error) {
func initIngestEventPublisher(ctx context.Context, logger *slog.Logger, conf config.Configuration, metricMeter metric.Meter, topicProvisioner pkgkafka.TopicProvisioner) (flushhandler.FlushEventHandler, error) {
if !conf.Events.Enabled {
return nil, nil
}
Expand All @@ -167,13 +174,13 @@ func initIngestEventPublisher(ctx context.Context, logger *slog.Logger, conf con
DebugLogging: conf.Telemetry.Log.Level == slog.LevelDebug,
MetricMeter: metricMeter,
},

ProvisionTopics: []watermillkafka.AutoProvisionTopic{
ProvisionTopics: []pkgkafka.TopicConfig{
{
Topic: conf.Events.IngestEvents.Topic,
NumPartitions: int32(conf.Events.IngestEvents.AutoProvision.Partitions),
Name: conf.Events.IngestEvents.Topic,
Partitions: conf.Events.IngestEvents.AutoProvision.Partitions,
},
},
TopicProvisioner: topicProvisioner,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -259,7 +266,7 @@ func initSink(config config.Configuration, logger *slog.Logger, metricMeter metr
return nil, fmt.Errorf("failed to generate Kafka configuration map: %w", err)
}

consumer, err := kafka.NewConsumer(&consumerConfigMap)
consumer, err := confluentkafka.NewConsumer(&consumerConfigMap)
if err != nil {
return nil, fmt.Errorf("failed to initialize kafka consumer: %s", err)
}
Expand Down Expand Up @@ -289,3 +296,26 @@ func initSink(config config.Configuration, logger *slog.Logger, metricMeter metr

return sink.NewSink(sinkConfig)
}

func initTopicProvisioner(conf config.Configuration, logger *slog.Logger, meter metric.Meter) (pkgkafka.TopicProvisioner, error) {
kafkaConfigMap := conf.Ingest.Kafka.CreateKafkaConfig()
delete(kafkaConfigMap, "go.logs.channel.enable")

adminClient, err := confluentkafka.NewAdminClient(&kafkaConfigMap)
if err != nil {
return nil, fmt.Errorf("failed to initialize Kafka admin client: %w", err)
}

topicProvisioner, err := pkgkafka.NewTopicProvisioner(pkgkafka.TopicProvisionerConfig{
AdminClient: adminClient,
Logger: logger,
Meter: meter,
CacheSize: conf.Ingest.CacheSize,
CacheTTL: conf.Ingest.CacheTTL,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize topic provisioner: %w", err)
}

return topicProvisioner, nil
}
6 changes: 6 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ telemetry:
# debugContexts:
# - broker
# - topic
# # The maximum number of entries stored in topic cache at a time which after the least recently used is evicted.
# # Setting it to 0 makes the cache size unlimited.
# cacheSize: 250
# # CacheTTL stores maximum time an entries is kept in cache before being evicted.
# # Setting it to 0 disables cache entry expiration.
# cacheTTL: 5m

# dedupe:
# enabled: true
Expand Down
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func TestComplete(t *testing.T) {
Partitions: 1,
EventsTopicTemplate: "om_%s_events",
},
CacheSize: 200,
CacheTTL: 15 * time.Minute,
},
Aggregation: AggregationConfiguration{
ClickHouse: ClickHouseAggregationConfiguration{
Expand Down
9 changes: 9 additions & 0 deletions config/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ import (

type IngestConfiguration struct {
Kafka KafkaIngestConfiguration

// The maximum number of entries stored in topic cache at a time which after the least recently used is evicted.
// Setting size to 0 makes it unlimited
CacheSize int

// The maximum time an entries is kept in cache before being evicted
CacheTTL time.Duration
}

// Validate validates the configuration.
Expand Down Expand Up @@ -160,4 +167,6 @@ func ConfigureIngest(v *viper.Viper) {
v.SetDefault("ingest.kafka.saslPassword", "")
v.SetDefault("ingest.kafka.partitions", 1)
v.SetDefault("ingest.kafka.eventsTopicTemplate", "om_%s_events")
v.SetDefault("ingest.cacheSize", 250)
v.SetDefault("ingest.cacheTTL", "5m")
}
2 changes: 2 additions & 0 deletions config/testdata/complete.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ ingest:
- broker
- topic
- consumer
cacheSize: 200
cacheTTL: 15m

aggregation:
clickhouse:
Expand Down
25 changes: 25 additions & 0 deletions openmeter/app/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,28 @@ func NewKafkaMetrics(meter metric.Meter) (*kafkametrics.Metrics, error) {

return metrics, nil
}

func NewKafkaTopicProvisioner(conf config.IngestConfiguration, logger *slog.Logger, meter metric.Meter) (pkgkafka.TopicProvisioner, error) {
kafkaConfigMap := conf.Kafka.CreateKafkaConfig()
// NOTE(chrisgacsal): remove 'go.logs.channel.enable' configuration parameter as it is not supported by AdminClient
// and initializing the client fails if this parameter is set.
delete(kafkaConfigMap, "go.logs.channel.enable")

adminClient, err := kafka.NewAdminClient(&kafkaConfigMap)
if err != nil {
return nil, fmt.Errorf("failed to initialize Kafka admin client: %w", err)
}

topicProvisioner, err := pkgkafka.NewTopicProvisioner(pkgkafka.TopicProvisionerConfig{
AdminClient: adminClient,
Logger: logger,
Meter: meter,
CacheSize: conf.CacheSize,
CacheTTL: conf.CacheTTL,
})
if err != nil {
return nil, fmt.Errorf("failed to initialize topic provisioner: %w", err)
}

return topicProvisioner, nil
}
Loading

0 comments on commit a32a84a

Please sign in to comment.