From 8004ba67c2a85e761a03bdae7794b0f259a02c0e Mon Sep 17 00:00:00 2001 From: Erik Godding Boye Date: Sat, 26 Oct 2024 12:27:08 +0200 Subject: [PATCH] feat: add opt-in to terminate non-empty topics matching props --- .../ApplicationProperties.java | 17 +++++++ .../k3a/topicterminator/TopicTerminator.java | 16 ++++-- .../strategy/AndOperation.java | 31 ++++++++++++ .../ReservedIfTopicNotMatchingProps.java | 50 +++++++++++++++++++ .../k3a/topicterminator/ApplicationTest.java | 13 ++++- src/test/resources/config/application.yaml | 2 + 6 files changed, 124 insertions(+), 5 deletions(-) create mode 100644 src/main/java/io/statnett/k3a/topicterminator/strategy/AndOperation.java create mode 100644 src/main/java/io/statnett/k3a/topicterminator/strategy/ReservedIfTopicNotMatchingProps.java diff --git a/src/main/java/io/statnett/k3a/topicterminator/ApplicationProperties.java b/src/main/java/io/statnett/k3a/topicterminator/ApplicationProperties.java index ab2d013..215b2fc 100644 --- a/src/main/java/io/statnett/k3a/topicterminator/ApplicationProperties.java +++ b/src/main/java/io/statnett/k3a/topicterminator/ApplicationProperties.java @@ -5,6 +5,7 @@ import org.springframework.validation.annotation.Validated; import java.util.Collection; +import java.util.Map; import java.util.regex.Pattern; @ConfigurationProperties("app") @@ -35,6 +36,14 @@ public class ApplicationProperties { */ private Collection blessedTopics; + /** + * Can be used to terminate topics even if the topic contains data + * (destructive operation) if the topic is otherwise considered unused. + * The supplied properties will be matched against topic configuration, + * and all properties must match! + */ + private Map nonEmptyTopicsMatchingProps; + public String getFixedRateString() { return fixedRateString; } @@ -58,4 +67,12 @@ public Collection getBlessedTopics() { public void setBlessedTopics(Collection blessedTopics) { this.blessedTopics = blessedTopics; } + + public Map getNonEmptyTopicsMatchingProps() { + return nonEmptyTopicsMatchingProps; + } + + public void setNonEmptyTopicsMatchingProps(Map nonEmptyTopicsMatchingProps) { + this.nonEmptyTopicsMatchingProps = nonEmptyTopicsMatchingProps; + } } diff --git a/src/main/java/io/statnett/k3a/topicterminator/TopicTerminator.java b/src/main/java/io/statnett/k3a/topicterminator/TopicTerminator.java index 8266267..fcac8ec 100644 --- a/src/main/java/io/statnett/k3a/topicterminator/TopicTerminator.java +++ b/src/main/java/io/statnett/k3a/topicterminator/TopicTerminator.java @@ -2,11 +2,13 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; +import io.statnett.k3a.topicterminator.strategy.AndOperation; import io.statnett.k3a.topicterminator.strategy.BlessedTopic; import io.statnett.k3a.topicterminator.strategy.ConsumedTopic; import io.statnett.k3a.topicterminator.strategy.InternalTopic; import io.statnett.k3a.topicterminator.strategy.NonEmptyTopic; import io.statnett.k3a.topicterminator.strategy.ReservedTopic; +import io.statnett.k3a.topicterminator.strategy.ReservedIfTopicNotMatchingProps; import org.apache.kafka.clients.admin.AdminClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,6 +20,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; @@ -48,7 +51,10 @@ public void terminateUnusedTopics() throws ExecutionException, InterruptedExcept .remove(internalTopics()) .remove(blessedTopics(props.getBlessedTopics())) .remove(consumedTopics()) - .remove(nonEmptyTopics()) + .remove( + nonEmptyTopics(), + reservedTopicsNotMatchingProps(props.getNonEmptyTopicsMatchingProps()) + ) .terminate(); } @@ -93,8 +99,8 @@ public void terminate() throws ExecutionException, InterruptedException { } } - public TopicTerminatorChain remove(ReservedTopic reservedTopic) { - reservedTopics.add(reservedTopic); + public TopicTerminatorChain remove(ReservedTopic... reservedTopic) { + reservedTopics.add(new AndOperation(reservedTopic)); return this; } } @@ -129,4 +135,8 @@ private static ConsumedTopic consumedTopics() { private static NonEmptyTopic nonEmptyTopics() { return new NonEmptyTopic(); } + + private static ReservedIfTopicNotMatchingProps reservedTopicsNotMatchingProps(Map props) { + return new ReservedIfTopicNotMatchingProps(props); + } } diff --git a/src/main/java/io/statnett/k3a/topicterminator/strategy/AndOperation.java b/src/main/java/io/statnett/k3a/topicterminator/strategy/AndOperation.java new file mode 100644 index 0000000..71c038c --- /dev/null +++ b/src/main/java/io/statnett/k3a/topicterminator/strategy/AndOperation.java @@ -0,0 +1,31 @@ +package io.statnett.k3a.topicterminator.strategy; + +import org.apache.kafka.clients.admin.AdminClient; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; + + +public class AndOperation implements ReservedTopic { + + private final ReservedTopic[] parts; + + public AndOperation(ReservedTopic... parts) { + this.parts = parts; + } + + @Override + public Set filter(AdminClient client, Set topicNames) throws ExecutionException, InterruptedException { + if (parts.length == 0) { + return topicNames; + } + + HashSet reserved = new HashSet<>(); + for (ReservedTopic reservedTopic : parts) { + reserved.addAll(reservedTopic.filter(client, new HashSet<>(topicNames))); + } + topicNames.retainAll(reserved); + return topicNames; + } +} diff --git a/src/main/java/io/statnett/k3a/topicterminator/strategy/ReservedIfTopicNotMatchingProps.java b/src/main/java/io/statnett/k3a/topicterminator/strategy/ReservedIfTopicNotMatchingProps.java new file mode 100644 index 0000000..c62230c --- /dev/null +++ b/src/main/java/io/statnett/k3a/topicterminator/strategy/ReservedIfTopicNotMatchingProps.java @@ -0,0 +1,50 @@ +package io.statnett.k3a.topicterminator.strategy; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.config.ConfigResource; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static java.util.Collections.emptySet; +import static java.util.Collections.unmodifiableSet; + +public class ReservedIfTopicNotMatchingProps implements ReservedTopic { + private final Set> matchingProps; + + public ReservedIfTopicNotMatchingProps(Map matchingProps) { + if (matchingProps == null) { + this.matchingProps = emptySet(); + } else { + this.matchingProps = unmodifiableSet(matchingProps.entrySet()); + } + } + + @Override + public Set filter(AdminClient client, Set topicNames) throws ExecutionException, InterruptedException { + if (matchingProps.isEmpty()) { + return emptySet(); + } + + List topicResources = topicNames.stream() + .map(t -> new ConfigResource(ConfigResource.Type.TOPIC, t)) + .toList(); + + Map> topicProps = client.describeConfigs(topicResources).all().get().entrySet().stream() + .collect(Collectors.toMap( + entry -> entry.getKey().name(), + entry -> entry.getValue().entries().stream().collect( + Collectors.toMap(ConfigEntry::name, ConfigEntry::value) + ) + ) + ); + + return topicNames.stream() + .filter(t -> topicProps.get(t).entrySet().containsAll(matchingProps)) + .collect(Collectors.toSet()); + } +} diff --git a/src/test/java/io/statnett/k3a/topicterminator/ApplicationTest.java b/src/test/java/io/statnett/k3a/topicterminator/ApplicationTest.java index c20a160..ebbb289 100644 --- a/src/test/java/io/statnett/k3a/topicterminator/ApplicationTest.java +++ b/src/test/java/io/statnett/k3a/topicterminator/ApplicationTest.java @@ -35,6 +35,7 @@ public class ApplicationTest { public static final String TOPIC_INTERNAL = "_schemas"; public static final String TOPIC_UNUSED = "topic-unused"; public static final String TOPIC_WITH_DATA = "topic-with-data"; + public static final String TOPIC_WITH_DATA_COMPACT = "topic-with-data-compact"; public static final String TOPIC_BLESSED_BY_REGEX = "blessed-topic"; public static final String TOPIC_BLESSED_BY_NAME = "topic-foo"; @@ -54,6 +55,7 @@ public class ApplicationTest { void testTerminateUnusedTopics() throws Exception { // Put some data on topic-with-data topic kafkaTemplate.send(TOPIC_WITH_DATA, "foo").get(); + kafkaTemplate.send(TOPIC_WITH_DATA_COMPACT, "key", "value").get(); // Wait until consumer is started and registered in cluster try (AdminClient client1 = AdminClient.create(kafkaAdmin.getConfigurationProperties())) { @@ -69,13 +71,13 @@ void testTerminateUnusedTopics() throws Exception { assertThat(allTopics) .contains(TOPIC_CONSUMED, TOPIC_INTERNAL, TOPIC_WITH_DATA, TOPIC_BLESSED_BY_REGEX, TOPIC_BLESSED_BY_NAME) - .doesNotContain(TOPIC_UNUSED); + .doesNotContain(TOPIC_UNUSED, TOPIC_WITH_DATA_COMPACT); } // Assert delete of topic increases metrics counter assertThat(meterRegistry.find("topic.deleted.total").counter()) .isNotNull() - .matches(counter -> counter.count() == 1); + .matches(counter -> counter.count() == 2); } @TestConfiguration @@ -109,6 +111,13 @@ public NewTopic topicWithData() { .build(); } + @Bean + public NewTopic topicWithDataCompact() { + return TopicBuilder.name(TOPIC_WITH_DATA_COMPACT) + .compact() + .build(); + } + @Bean public NewTopic topicBlessedByRegex() { return TopicBuilder.name(TOPIC_BLESSED_BY_REGEX) diff --git a/src/test/resources/config/application.yaml b/src/test/resources/config/application.yaml index ea307fb..9bfb687 100644 --- a/src/test/resources/config/application.yaml +++ b/src/test/resources/config/application.yaml @@ -1,3 +1,5 @@ app: dry-run: false blessed-topics: ^blessed.*,topic-foo + non-empty-topics-matching-props: + cleanup.policy: compact