From 32b7e8fc7257269318a4ed795c16824ace116c8c Mon Sep 17 00:00:00 2001 From: George Sun Date: Sun, 8 Dec 2024 17:24:16 +0800 Subject: [PATCH] Topic enforcer to generate Strimzi KafkaTopic resources. --- kafka_topic_enforcer/README.md | 15 ++++ .../java/com/tesla/data/topic/enforcer/BUILD | 4 + .../com/tesla/data/topic/enforcer/Main.java | 1 + .../data/topic/enforcer/StrimziCommand.java | 75 +++++++++++++++++++ 4 files changed, 95 insertions(+) create mode 100644 kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/StrimziCommand.java diff --git a/kafka_topic_enforcer/README.md b/kafka_topic_enforcer/README.md index f09a211..9a74b59 100644 --- a/kafka_topic_enforcer/README.md +++ b/kafka_topic_enforcer/README.md @@ -9,6 +9,8 @@ Kafka topic enforcer's goal is to automate Kafka topic management & hence remove * Self service, removed dependency on a human * Simple configuration +If you choose to use Strimzi Kafka operator, this command can also generate Strimzi KafkaTopic CRDs. + ## Dependencies * JVM @@ -51,6 +53,9 @@ Usage:
[options] [command] [command options] enforce Enforce given configuration Usage: enforce [options] path to a configuration file Options: + --cluster + a cluster name, if specified, consolidated (multi-cluster) + configuration file is expected --continuous, -c run enforcement continuously Default: false @@ -63,6 +68,16 @@ Usage:
[options] [command] [command options] --unsafemode run in unsafe mode, topic deletion is _only_ allowed in this mode Default: false + strimzi Generate the Strimzi KafkaTopic resources YAML on stdout from + the topic config. Certain information such as topic tags and + config comments would be missed. + Usage: strimzi [options] /path/to/a/configuration/file + Options: + --cluster + a cluster name, if specified, consolidated (multi-cluster) + configuration file is expected + * --kafka_name, -k + the name of the Strimzi Kafka cluster to be generated for ``` ## Configuration diff --git a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/BUILD b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/BUILD index 5ebdd18..537cb6c 100644 --- a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/BUILD +++ b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/BUILD @@ -10,8 +10,12 @@ java_library( "//3rdparty/jvm/com/fasterxml/jackson/core:jackson_databind", "//3rdparty/jvm/com/fasterxml/jackson/dataformat:jackson_dataformat_yaml", "//3rdparty/jvm/com/fasterxml/jackson/module:jackson_module_parameter_names", + "//3rdparty/jvm/io/fabric8:kubernetes_client_api", + "//3rdparty/jvm/io/fabric8:kubernetes_model_common", + "//3rdparty/jvm/io/fabric8:kubernetes_model_core", "//3rdparty/jvm/io/prometheus:simpleclient", "//3rdparty/jvm/io/prometheus:simpleclient_httpserver", + "//3rdparty/jvm/io/strimzi:api", "//3rdparty/jvm/org/apache/kafka:kafka_clients", "//3rdparty/jvm/org/slf4j:slf4j_api", "//3rdparty/jvm_shaded/com/google/guava_shaded", diff --git a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/Main.java b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/Main.java index 8dde211..6173a6e 100644 --- a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/Main.java +++ b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/Main.java @@ -48,6 +48,7 @@ public static void main(String[] args) { final Main main = new Main(); final Map> commands = new HashMap<>(); commands.put("validate", new BaseCommand<>()); + commands.put("strimzi", new StrimziCommand()); commands.put("dump", new DumpCommand()); commands.put("enforce", new TopicEnforceCommand()); diff --git a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/StrimziCommand.java b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/StrimziCommand.java new file mode 100644 index 0000000..a20123c --- /dev/null +++ b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/StrimziCommand.java @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2024 Tesla Motors, Inc. All rights reserved. + */ + +package com.tesla.data.topic.enforcer; + +import com.tesla.data.enforcer.BaseCommand; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import io.strimzi.api.kafka.model.topic.KafkaTopic; +import io.strimzi.api.kafka.model.topic.KafkaTopicBuilder; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Parameters(commandDescription = "Generate the Strimzi KafkaTopic resources YAML on stdout from the topic config. " + + "Certain information such as topic tags and config comments would be missed.") +public class StrimziCommand extends BaseCommand { + + @Parameter( + names = {"--kafka_name", "-k"}, + description = "the name of the Strimzi Kafka cluster to be generated for", + required = true) + protected String kafkaName; + + @Override + public int run() { + List configuredTopics = configuredEntities(ConfiguredTopic.class, "topics", "topicsFile"); + + ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory() + .configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, true) + .configure(YAMLGenerator.Feature.LITERAL_BLOCK_STYLE, true) + .configure(YAMLGenerator.Feature.MINIMIZE_QUOTES, true) + .configure(YAMLGenerator.Feature.INDENT_ARRAYS, false) + .configure(YAMLGenerator.Feature.SPLIT_LINES, false) + ); + + try { + for (ConfiguredTopic topic : configuredTopics) { + System.out.println(yamlMapper.writeValueAsString(getKafkaTopic(topic))); + } + } catch (JsonProcessingException e) { + LOG.error("Failed to dump config", e); + return FAILURE; + } + + return SUCCESS; + } + + private KafkaTopic getKafkaTopic(ConfiguredTopic enforcerTopic) { + // There could be some Kafka topics under the same names but for different Kafka clusters. To avoid ambiguity, + // we prepend the Kafka cluster names in the KafkaTopic metadata name. + String resourceName = kafkaName + "." + enforcerTopic.getName(); + + return new KafkaTopicBuilder() + .withNewMetadata() + .withName(resourceName) + .withLabels(Map.of("strimzi.io/cluster", kafkaName)) + .endMetadata() + .withNewSpec() + .withTopicName(enforcerTopic.getName()) + .withPartitions(enforcerTopic.getPartitions()) + .withReplicas((int) enforcerTopic.getReplicationFactor()) + .withConfig(new HashMap<>(enforcerTopic.getConfig())) + .endSpec().build(); + } +} + +