Skip to content

Commit

Permalink
Topic enforcer to generate Strimzi KafkaTopic resources.
Browse files Browse the repository at this point in the history
  • Loading branch information
shk3 committed Dec 8, 2024
1 parent c7a0fcd commit c820ff9
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 0 deletions.
15 changes: 15 additions & 0 deletions kafka_topic_enforcer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ Kafka topic enforcer's goal is to automate Kafka topic management & hence remove
* Self service, removed dependency on a human
* Simple configuration

If you choose to use Strimzi Kafka operator, this command can also generate Strimzi KafkaTopic CRDs.

## Dependencies

* JVM
Expand Down Expand Up @@ -51,6 +53,9 @@ Usage: <main class> [options] [command] [command options]
enforce Enforce given configuration
Usage: enforce [options] path to a configuration file
Options:
--cluster
a cluster name, if specified, consolidated (multi-cluster)
configuration file is expected
--continuous, -c
run enforcement continuously
Default: false
Expand All @@ -63,6 +68,16 @@ Usage: <main class> [options] [command] [command options]
--unsafemode
run in unsafe mode, topic deletion is _only_ allowed in this mode
Default: false
strimzi Generate the Strimzi KafkaTopic CRDs on stdout from the topic
config. Certain information such as topic tags and config comments
would be missed.
Usage: strimzi [options] /path/to/a/configuration/file
Options:
--cluster
a cluster name, if specified, consolidated (multi-cluster)
configuration file is expected
* --kafka_name, -k
the name of the Strimzi Kafka cluster to be generated for
```
## Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ java_library(
"//3rdparty/jvm/com/fasterxml/jackson/core:jackson_databind",
"//3rdparty/jvm/com/fasterxml/jackson/dataformat:jackson_dataformat_yaml",
"//3rdparty/jvm/com/fasterxml/jackson/module:jackson_module_parameter_names",
"//3rdparty/jvm/io/fabric8:kubernetes_client_api",
"//3rdparty/jvm/io/fabric8:kubernetes_model_common",
"//3rdparty/jvm/io/fabric8:kubernetes_model_core",
"//3rdparty/jvm/io/prometheus:simpleclient",
"//3rdparty/jvm/io/prometheus:simpleclient_httpserver",
"//3rdparty/jvm/io/strimzi:api",
"//3rdparty/jvm/org/apache/kafka:kafka_clients",
"//3rdparty/jvm/org/slf4j:slf4j_api",
"//3rdparty/jvm_shaded/com/google/guava_shaded",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static void main(String[] args) {
final Main main = new Main();
final Map<String, BaseCommand<ConfiguredTopic>> commands = new HashMap<>();
commands.put("validate", new BaseCommand<>());
commands.put("strimzi", new StrimziCommand());
commands.put("dump", new DumpCommand());
commands.put("enforce", new TopicEnforceCommand());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (C) 2024 Tesla Motors, Inc. All rights reserved.
*/

package com.tesla.data.topic.enforcer;

import com.tesla.data.enforcer.BaseCommand;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import io.strimzi.api.kafka.model.topic.KafkaTopic;
import io.strimzi.api.kafka.model.topic.KafkaTopicBuilder;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Parameters(commandDescription = "Generate the Strimzi KafkaTopic CRDs on stdout from the topic config. " +
"Certain information such as topic tags and config comments would be missed.")
public class StrimziCommand extends BaseCommand<ConfiguredTopic> {

@Parameter(
names = {"--kafka_name", "-k"},
description = "the name of the Strimzi Kafka cluster to be generated for",
required = true)
protected String kafkaName;

@Override
public int run() {
List<ConfiguredTopic> configuredTopics = configuredEntities(ConfiguredTopic.class, "topics", "topicsFile");

ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()
.configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, true)
.configure(YAMLGenerator.Feature.LITERAL_BLOCK_STYLE, true)
.configure(YAMLGenerator.Feature.MINIMIZE_QUOTES, true)
.configure(YAMLGenerator.Feature.INDENT_ARRAYS, false)
.configure(YAMLGenerator.Feature.SPLIT_LINES, false)
);

try {
for (ConfiguredTopic topic : configuredTopics) {
System.out.println(yamlMapper.writeValueAsString(getKafkaTopic(topic)));
}
} catch (JsonProcessingException e) {
LOG.error("Failed to dump config", e);
return FAILURE;
}

return SUCCESS;
}

private KafkaTopic getKafkaTopic(ConfiguredTopic enforcerTopic) {
// There could be some Kafka topics under the same names but for different Kafka clusters. To avoid ambiguity,
// we prepend the Kafka cluster names in the KafkaTopic metadata name.
String resourceName = kafkaName + "." + enforcerTopic.getName();

return new KafkaTopicBuilder()
.withNewMetadata()
.withName(resourceName)
.withLabels(Map.of("strimzi.io/cluster", kafkaName))
.endMetadata()
.withNewSpec()
.withTopicName(enforcerTopic.getName())
.withPartitions(enforcerTopic.getPartitions())
.withReplicas((int) enforcerTopic.getReplicationFactor())
.withConfig(new HashMap<>(enforcerTopic.getConfig()))
.endSpec().build();
}
}


0 comments on commit c820ff9

Please sign in to comment.