diff --git a/src/main/java/io/statnett/k3a/topicterminator/TopicTerminator.java b/src/main/java/io/statnett/k3a/topicterminator/TopicTerminator.java index 06a68d9..8266267 100644 --- a/src/main/java/io/statnett/k3a/topicterminator/TopicTerminator.java +++ b/src/main/java/io/statnett/k3a/topicterminator/TopicTerminator.java @@ -14,11 +14,13 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.regex.Pattern; @Component public class TopicTerminator { @@ -42,40 +44,89 @@ public void terminateUnusedTopics() throws ExecutionException, InterruptedExcept .setMessage("Terminating unused topics") .addKeyValue("dry-run", props.isDryRun()) .log(); - try (AdminClient client = AdminClient.create(kafkaAdmin.getConfigurationProperties())) { - final Set allTopics = client.listTopics().names().get(); + from(allTopics()) + .remove(internalTopics()) + .remove(blessedTopics(props.getBlessedTopics())) + .remove(consumedTopics()) + .remove(nonEmptyTopics()) + .terminate(); + } - final Set unusedTopics = new HashSet<>(allTopics); + private TopicTerminatorChain from(TopicProvider topicProvider) { + return new TopicTerminatorChain(topicProvider); + } - Collection reservedTopics = List.of( - new ConsumedTopic(), - new InternalTopic(allTopics), - new NonEmptyTopic(), - new BlessedTopic(allTopics, props.getBlessedTopics()) - ); + private class TopicTerminatorChain { + private final TopicProvider topicProvider; + private final List reservedTopics; - for (ReservedTopic reservedTopic : reservedTopics) { - unusedTopics.removeAll(reservedTopic.getNames(client)); - } + public TopicTerminatorChain(TopicProvider topicProvider) { + this.topicProvider = topicProvider; + this.reservedTopics = new ArrayList<>(); + } + + public void terminate() throws ExecutionException, InterruptedException { + try (AdminClient client = AdminClient.create(kafkaAdmin.getConfigurationProperties())) { + Set unusedTopics = topicProvider.getNames(client); + + for (ReservedTopic reservedTopic : reservedTopics) { + unusedTopics = reservedTopic.filter(client, unusedTopics); + } - log.atInfo() - .setMessage("Start deleting unused topics") - .addKeyValue("count", unusedTopics.size()) - .log(); - if (props.isDryRun()) { - unusedTopics.forEach(t -> log.atInfo() - .setMessage("NOT deleting unused topic in dry-run mode") - .addKeyValue("topic", t) - .log()); - } else { - unusedTopics.forEach(t -> log.atInfo() - .setMessage("Deleting unused topic") - .addKeyValue("topic", t) - .log()); - client.deleteTopics(unusedTopics); - deletedCounter.increment(unusedTopics.size()); + log.atInfo() + .setMessage("Start deleting unused topics") + .addKeyValue("count", unusedTopics.size()) + .log(); + if (props.isDryRun()) { + unusedTopics.forEach(t -> log.atInfo() + .setMessage("NOT deleting unused topic in dry-run mode") + .addKeyValue("topic", t) + .log()); + } else { + unusedTopics.forEach(t -> log.atInfo() + .setMessage("Deleting unused topic") + .addKeyValue("topic", t) + .log()); + client.deleteTopics(unusedTopics); + deletedCounter.increment(unusedTopics.size()); + } } } + + public TopicTerminatorChain remove(ReservedTopic reservedTopic) { + reservedTopics.add(reservedTopic); + return this; + } + } + + public interface TopicProvider { + Set getNames(AdminClient client) throws ExecutionException, InterruptedException; + } + + public static class AllTopicsProvider implements TopicProvider { + @Override + public Set getNames(AdminClient client) throws ExecutionException, InterruptedException { + return new HashSet<>(client.listTopics().names().get()); + } + } + + public static TopicProvider allTopics() { + return new AllTopicsProvider(); + } + + private static InternalTopic internalTopics() { + return new InternalTopic(); } + private static BlessedTopic blessedTopics(Collection blessedTopics) { + return new BlessedTopic(blessedTopics); + } + + private static ConsumedTopic consumedTopics() { + return new ConsumedTopic(); + } + + private static NonEmptyTopic nonEmptyTopics() { + return new NonEmptyTopic(); + } } diff --git a/src/main/java/io/statnett/k3a/topicterminator/strategy/BlessedTopic.java b/src/main/java/io/statnett/k3a/topicterminator/strategy/BlessedTopic.java index 5ec69f7..c5c9710 100644 --- a/src/main/java/io/statnett/k3a/topicterminator/strategy/BlessedTopic.java +++ b/src/main/java/io/statnett/k3a/topicterminator/strategy/BlessedTopic.java @@ -3,27 +3,26 @@ import org.apache.kafka.clients.admin.AdminClient; import java.util.Collection; -import java.util.Collections; import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; +import static java.util.function.Predicate.not; + public class BlessedTopic implements ReservedTopic { - private final Set allNames; private final Collection patterns; - public BlessedTopic(Set allNames, Collection patterns) { - this.allNames = allNames; + public BlessedTopic(Collection patterns) { this.patterns = patterns; } @Override - public Set getNames(AdminClient client) { + public Set filter(AdminClient client, Set topicNames) { if (patterns == null || patterns.isEmpty()) { - return Collections.emptySet(); + return topicNames; } - return allNames.stream() - .filter(this::isBlessed) + return topicNames.stream() + .filter(not(this::isBlessed)) .collect(Collectors.toSet()); } diff --git a/src/main/java/io/statnett/k3a/topicterminator/strategy/ConsumedTopic.java b/src/main/java/io/statnett/k3a/topicterminator/strategy/ConsumedTopic.java index 9649c32..be507b8 100644 --- a/src/main/java/io/statnett/k3a/topicterminator/strategy/ConsumedTopic.java +++ b/src/main/java/io/statnett/k3a/topicterminator/strategy/ConsumedTopic.java @@ -5,7 +5,6 @@ import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.common.TopicPartition; -import java.util.HashSet; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -19,14 +18,11 @@ */ public class ConsumedTopic implements ReservedTopic { @Override - public Set getNames(AdminClient client) throws ExecutionException, InterruptedException { - final Set topics = new HashSet<>(); - + public Set filter(AdminClient client, Set topicNames) throws ExecutionException, InterruptedException { for (ConsumerGroupListing group : client.listConsumerGroups().all().get()) { - topics.addAll(getTopics(client.listConsumerGroupOffsets(group.groupId()))); + topicNames.removeAll(getTopics(client.listConsumerGroupOffsets(group.groupId()))); } - - return topics; + return topicNames; } private Set getTopics(ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult) throws ExecutionException, InterruptedException { diff --git a/src/main/java/io/statnett/k3a/topicterminator/strategy/InternalTopic.java b/src/main/java/io/statnett/k3a/topicterminator/strategy/InternalTopic.java index 944e292..2011816 100644 --- a/src/main/java/io/statnett/k3a/topicterminator/strategy/InternalTopic.java +++ b/src/main/java/io/statnett/k3a/topicterminator/strategy/InternalTopic.java @@ -3,23 +3,18 @@ import org.apache.kafka.clients.admin.AdminClient; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static java.util.function.Predicate.not; + /** * Topic that is internal, i.e. used by Kafka components. */ public class InternalTopic implements ReservedTopic { - private final Set allNames; - - public InternalTopic(Set allNames) { - this.allNames = allNames; - } - @Override - public Set getNames(AdminClient client) { - return allNames.stream() - .filter(this::isInternal) + public Set filter(AdminClient client, Set topicNames) { + return topicNames.stream() + .filter(not(this::isInternal)) .collect(Collectors.toSet()); } diff --git a/src/main/java/io/statnett/k3a/topicterminator/strategy/NonEmptyTopic.java b/src/main/java/io/statnett/k3a/topicterminator/strategy/NonEmptyTopic.java index 8f37234..b1f716d 100644 --- a/src/main/java/io/statnett/k3a/topicterminator/strategy/NonEmptyTopic.java +++ b/src/main/java/io/statnett/k3a/topicterminator/strategy/NonEmptyTopic.java @@ -4,7 +4,6 @@ import org.apache.kafka.clients.admin.LogDirDescription; import org.apache.kafka.common.Node; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -14,11 +13,8 @@ * Topic that contains data */ public class NonEmptyTopic implements ReservedTopic { - @Override - public Set getNames(AdminClient client) throws ExecutionException, InterruptedException { - final Set topics = new HashSet<>(); - + public Set filter(AdminClient client, Set topicNames) throws ExecutionException, InterruptedException { final List brokers = client.describeCluster() .nodes().get().stream() .map(Node::id) @@ -30,10 +26,10 @@ public Set getNames(AdminClient client) throws ExecutionException, Inter .map(LogDirDescription::replicaInfos) .forEach(log -> log.forEach((topicPartition, replicaInfo) -> { if (replicaInfo.size() > 0) { - topics.add(topicPartition.topic()); + topicNames.remove(topicPartition.topic()); } })); - return topics; + return topicNames; } } diff --git a/src/main/java/io/statnett/k3a/topicterminator/strategy/ReservedTopic.java b/src/main/java/io/statnett/k3a/topicterminator/strategy/ReservedTopic.java index edc4ab5..4642b08 100644 --- a/src/main/java/io/statnett/k3a/topicterminator/strategy/ReservedTopic.java +++ b/src/main/java/io/statnett/k3a/topicterminator/strategy/ReservedTopic.java @@ -6,5 +6,5 @@ import java.util.concurrent.ExecutionException; public interface ReservedTopic { - Set getNames(AdminClient client) throws ExecutionException, InterruptedException; + Set filter(AdminClient client, Set topicNames) throws ExecutionException, InterruptedException; }