Skip to content

Commit

Permalink
infer changelog topic partitions (#42)
Browse files Browse the repository at this point in the history
* infer changelog topic partitions

* update

* update
  • Loading branch information
Kishan Sairam Adapa authored Oct 11, 2024
1 parent e5c9fe4 commit 1ddde41
Showing 1 changed file with 56 additions and 3 deletions.
59 changes: 56 additions & 3 deletions kafka-topic-creator/src/main/go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"slices"
"strconv"
"strings"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
Expand All @@ -23,12 +24,17 @@ type Config struct {
Address string `yaml:"address"`
Topics map[string]KafkaTopic `yaml:"topics"`
MinValueOverrideForTopicConfig map[string]int64 `yaml:"minValueOverrideForTopicConfig"`
InferChangelogPartitions bool `yaml:"inferChangelogPartitions"`
}

const timeoutDuration = 60 * time.Second

var adminRequestTimeout = kafka.SetAdminRequestTimeout(timeoutDuration)

func isChangelogTopic(topic string) bool {
return strings.HasSuffix(topic, "-changelog")
}

func (config *Config) LoadConfiguration(file string) *Config {
yamlFile, err := os.ReadFile(file)
if err != nil {
Expand Down Expand Up @@ -61,7 +67,7 @@ func GetTopicConfigs(a *kafka.AdminClient, topics []string) map[string]map[strin

results, err := a.DescribeConfigs(context.Background(), configs, adminRequestTimeout)
if err != nil {
log.Panicf("Failed to describe topics: %v\n", err)
log.Panicf("Failed to describe topic configs: %v\n", err)
}

output := make(map[string]map[string]string)
Expand All @@ -75,6 +81,18 @@ func GetTopicConfigs(a *kafka.AdminClient, topics []string) map[string]map[strin
return output
}

func GetPartitionCounts(a *kafka.AdminClient, topics []string) map[string]int {
describeTopicResult, err := a.DescribeTopics(context.Background(), kafka.NewTopicCollectionOfTopicNames(topics), adminRequestTimeout)
if err != nil {
log.Panicf("Failed to describe topics: %v\n", err)
}
var partitionCounts map[string]int
for _, information := range describeTopicResult.TopicDescriptions {
partitionCounts[information.Name] = len(information.Partitions)
}
return partitionCounts
}

func CreateTopics(a *kafka.AdminClient, topics []kafka.TopicSpecification) {
results, err := a.CreateTopics(
context.Background(),
Expand Down Expand Up @@ -171,7 +189,7 @@ func main() {
flag.Parse()
log.Printf("configpath: %s\n", *configpath)

var config Config
config := Config{InferChangelogPartitions: false}
config.LoadConfiguration(*configpath)

a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": config.Address})
Expand All @@ -183,6 +201,17 @@ func main() {

var newTopics []kafka.TopicSpecification
var existingTopics []string
var changeLogTopics []string

maxNonChangelogPartitions := 0

for topicName, topicConfig := range config.Topics {
if isChangelogTopic(topicName) {
changeLogTopics = append(changeLogTopics, topicName)
} else {
maxNonChangelogPartitions = max(maxNonChangelogPartitions, topicConfig.NumPartitions)
}
}

for topicName, topicConfig := range config.Topics {
if slices.Contains(topicList, topicName) {
Expand All @@ -194,7 +223,11 @@ func main() {
topicConfig.Configs["cleanup.policy"] = "compact,delete"
}
}
newTopics = append(newTopics, kafka.TopicSpecification{Topic: topicName, NumPartitions: topicConfig.NumPartitions, ReplicationFactor: topicConfig.ReplicationFactor, Config: topicConfig.Configs})
if isChangelogTopic(topicName) {
newTopics = append(newTopics, kafka.TopicSpecification{Topic: topicName, NumPartitions: maxNonChangelogPartitions, ReplicationFactor: topicConfig.ReplicationFactor, Config: topicConfig.Configs})
} else {
newTopics = append(newTopics, kafka.TopicSpecification{Topic: topicName, NumPartitions: topicConfig.NumPartitions, ReplicationFactor: topicConfig.ReplicationFactor, Config: topicConfig.Configs})
}
}
}

Expand All @@ -203,6 +236,26 @@ func main() {
CreateTopics(a, newTopics)
}

if config.InferChangelogPartitions {
log.Printf("[WARNING] infer changelog topic partitions is enabled, setting partitions to %v...\n", maxNonChangelogPartitions)
changelogTopicPartitions := GetPartitionCounts(a, changeLogTopics)
var updatePartitionSpecs []kafka.PartitionsSpecification
for topic, partitionCnt := range changelogTopicPartitions {
if partitionCnt < maxNonChangelogPartitions {
log.Printf("updating partition count for [%v] from %v to %v\n", topic, partitionCnt, maxNonChangelogPartitions)
updatePartitionSpecs = append(updatePartitionSpecs, kafka.PartitionsSpecification{
Topic: topic,
IncreaseTo: maxNonChangelogPartitions,
})
}
}
_, err := a.CreatePartitions(context.Background(), updatePartitionSpecs, adminRequestTimeout)
if err != nil {
log.Panic("failed to update partitions")
}
log.Printf("Finished updating partitions of %v changelog topics\n", len(updatePartitionSpecs))
}

log.Printf("Number of existing topics: %d\n", len(existingTopics))
if len(existingTopics) > 0 {
existingTopicConfigs := GetTopicConfigs(a, existingTopics)
Expand Down

0 comments on commit 1ddde41

Please sign in to comment.