diff --git a/kafka-topic-creator/src/main/go/main.go b/kafka-topic-creator/src/main/go/main.go index dd4a561..6d68743 100644 --- a/kafka-topic-creator/src/main/go/main.go +++ b/kafka-topic-creator/src/main/go/main.go @@ -7,6 +7,7 @@ import ( "os" "slices" "strconv" + "strings" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" @@ -23,12 +24,17 @@ type Config struct { Address string `yaml:"address"` Topics map[string]KafkaTopic `yaml:"topics"` MinValueOverrideForTopicConfig map[string]int64 `yaml:"minValueOverrideForTopicConfig"` + InferChangelogPartitions bool `yaml:"inferChangelogPartitions"` } const timeoutDuration = 60 * time.Second var adminRequestTimeout = kafka.SetAdminRequestTimeout(timeoutDuration) +func isChangelogTopic(topic string) bool { + return strings.HasSuffix(topic, "-changelog") +} + func (config *Config) LoadConfiguration(file string) *Config { yamlFile, err := os.ReadFile(file) if err != nil { @@ -61,7 +67,7 @@ func GetTopicConfigs(a *kafka.AdminClient, topics []string) map[string]map[strin results, err := a.DescribeConfigs(context.Background(), configs, adminRequestTimeout) if err != nil { - log.Panicf("Failed to describe topics: %v\n", err) + log.Panicf("Failed to describe topic configs: %v\n", err) } output := make(map[string]map[string]string) @@ -75,6 +81,18 @@ func GetTopicConfigs(a *kafka.AdminClient, topics []string) map[string]map[strin return output } +func GetPartitionCounts(a *kafka.AdminClient, topics []string) map[string]int { + describeTopicResult, err := a.DescribeTopics(context.Background(), kafka.NewTopicCollectionOfTopicNames(topics), adminRequestTimeout) + if err != nil { + log.Panicf("Failed to describe topics: %v\n", err) + } + var partitionCounts map[string]int + for _, information := range describeTopicResult.TopicDescriptions { + partitionCounts[information.Name] = len(information.Partitions) + } + return partitionCounts +} + func CreateTopics(a *kafka.AdminClient, topics []kafka.TopicSpecification) { results, err := a.CreateTopics( context.Background(), @@ -171,7 +189,7 @@ func main() { flag.Parse() log.Printf("configpath: %s\n", *configpath) - var config Config + config := Config{InferChangelogPartitions: false} config.LoadConfiguration(*configpath) a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": config.Address}) @@ -183,6 +201,17 @@ func main() { var newTopics []kafka.TopicSpecification var existingTopics []string + var changeLogTopics []string + + maxNonChangelogPartitions := 0 + + for topicName, topicConfig := range config.Topics { + if isChangelogTopic(topicName) { + changeLogTopics = append(changeLogTopics, topicName) + } else { + maxNonChangelogPartitions = max(maxNonChangelogPartitions, topicConfig.NumPartitions) + } + } for topicName, topicConfig := range config.Topics { if slices.Contains(topicList, topicName) { @@ -194,7 +223,11 @@ func main() { topicConfig.Configs["cleanup.policy"] = "compact,delete" } } - newTopics = append(newTopics, kafka.TopicSpecification{Topic: topicName, NumPartitions: topicConfig.NumPartitions, ReplicationFactor: topicConfig.ReplicationFactor, Config: topicConfig.Configs}) + if isChangelogTopic(topicName) { + newTopics = append(newTopics, kafka.TopicSpecification{Topic: topicName, NumPartitions: maxNonChangelogPartitions, ReplicationFactor: topicConfig.ReplicationFactor, Config: topicConfig.Configs}) + } else { + newTopics = append(newTopics, kafka.TopicSpecification{Topic: topicName, NumPartitions: topicConfig.NumPartitions, ReplicationFactor: topicConfig.ReplicationFactor, Config: topicConfig.Configs}) + } } } @@ -203,6 +236,26 @@ func main() { CreateTopics(a, newTopics) } + if config.InferChangelogPartitions { + log.Printf("[WARNING] infer changelog topic partitions is enabled, setting partitions to %v...\n", maxNonChangelogPartitions) + changelogTopicPartitions := GetPartitionCounts(a, changeLogTopics) + var updatePartitionSpecs []kafka.PartitionsSpecification + for topic, partitionCnt := range changelogTopicPartitions { + if partitionCnt < maxNonChangelogPartitions { + log.Printf("updating partition count for [%v] from %v to %v\n", topic, partitionCnt, maxNonChangelogPartitions) + updatePartitionSpecs = append(updatePartitionSpecs, kafka.PartitionsSpecification{ + Topic: topic, + IncreaseTo: maxNonChangelogPartitions, + }) + } + } + _, err := a.CreatePartitions(context.Background(), updatePartitionSpecs, adminRequestTimeout) + if err != nil { + log.Panic("failed to update partitions") + } + log.Printf("Finished updating partitions of %v changelog topics\n", len(updatePartitionSpecs)) + } + log.Printf("Number of existing topics: %d\n", len(existingTopics)) if len(existingTopics) > 0 { existingTopicConfigs := GetTopicConfigs(a, existingTopics)