diff --git a/src/main/java/io/statnett/k3a/topicterminator/ApplicationProperties.java b/src/main/java/io/statnett/k3a/topicterminator/ApplicationProperties.java index 215b2fc..2b67c71 100644 --- a/src/main/java/io/statnett/k3a/topicterminator/ApplicationProperties.java +++ b/src/main/java/io/statnett/k3a/topicterminator/ApplicationProperties.java @@ -5,7 +5,6 @@ import org.springframework.validation.annotation.Validated; import java.util.Collection; -import java.util.Map; import java.util.regex.Pattern; @ConfigurationProperties("app") @@ -39,10 +38,9 @@ public class ApplicationProperties { /** * Can be used to terminate topics even if the topic contains data * (destructive operation) if the topic is otherwise considered unused. - * The supplied properties will be matched against topic configuration, - * and all properties must match! + * By default, no topics with data will be deleted. */ - private Map nonEmptyTopicsMatchingProps; + private NonEmptyTopicCharacteristics nonEmptyTopics; public String getFixedRateString() { return fixedRateString; @@ -68,11 +66,26 @@ public void setBlessedTopics(Collection blessedTopics) { this.blessedTopics = blessedTopics; } - public Map getNonEmptyTopicsMatchingProps() { - return nonEmptyTopicsMatchingProps; + public NonEmptyTopicCharacteristics getNonEmptyTopics() { + return nonEmptyTopics; } - public void setNonEmptyTopicsMatchingProps(Map nonEmptyTopicsMatchingProps) { - this.nonEmptyTopicsMatchingProps = nonEmptyTopicsMatchingProps; + public void setNonEmptyTopics(NonEmptyTopicCharacteristics nonEmptyTopics) { + this.nonEmptyTopics = nonEmptyTopics; + } + + public static class NonEmptyTopicCharacteristics { + /** + * If set to ´true´, topics without time retention can be terminated. + */ + private boolean withoutTimeRetention; + + public boolean isWithoutTimeRetention() { + return withoutTimeRetention; + } + + public void setWithoutTimeRetention(boolean withoutTimeRetention) { + this.withoutTimeRetention = withoutTimeRetention; + } } } diff --git a/src/main/java/io/statnett/k3a/topicterminator/TopicTerminator.java b/src/main/java/io/statnett/k3a/topicterminator/TopicTerminator.java index 49e6900..fe75257 100644 --- a/src/main/java/io/statnett/k3a/topicterminator/TopicTerminator.java +++ b/src/main/java/io/statnett/k3a/topicterminator/TopicTerminator.java @@ -10,6 +10,7 @@ import io.statnett.k3a.topicterminator.strategy.ReservedIfTopicNotMatchingProps; import io.statnett.k3a.topicterminator.strategy.ReservedTopic; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.common.config.TopicConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.core.KafkaAdmin; @@ -25,6 +26,7 @@ import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; + @Component public class TopicTerminator { private final Logger log = LoggerFactory.getLogger(this.getClass()); @@ -47,16 +49,19 @@ public void terminateUnusedTopics() throws ExecutionException, InterruptedExcept .setMessage("Terminating unused topics") .addKeyValue("dry-run", props.isDryRun()) .log(); + + ReservedTopic nonEmptyTopic = nonEmptyTopics(); + if (props.getNonEmptyTopics().isWithoutTimeRetention()) { + nonEmptyTopic = and(nonEmptyTopic, reservedTopicsNotMatchingProps( + Map.of(TopicConfig.RETENTION_MS_CONFIG, "-1") + )); + } + from(allTopics()) .remove(internalTopics()) .remove(blessedTopics(props.getBlessedTopics())) .remove(consumedTopics()) - .remove( - and( - nonEmptyTopics(), - reservedTopicsNotMatchingProps(props.getNonEmptyTopicsMatchingProps()) - ) - ) + .remove(nonEmptyTopic) .terminate(); } diff --git a/src/test/java/io/statnett/k3a/topicterminator/ApplicationTest.java b/src/test/java/io/statnett/k3a/topicterminator/ApplicationTest.java index ebbb289..b83493b 100644 --- a/src/test/java/io/statnett/k3a/topicterminator/ApplicationTest.java +++ b/src/test/java/io/statnett/k3a/topicterminator/ApplicationTest.java @@ -7,6 +7,7 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.config.TopicConfig; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.actuate.observability.AutoConfigureObservability; @@ -35,7 +36,7 @@ public class ApplicationTest { public static final String TOPIC_INTERNAL = "_schemas"; public static final String TOPIC_UNUSED = "topic-unused"; public static final String TOPIC_WITH_DATA = "topic-with-data"; - public static final String TOPIC_WITH_DATA_COMPACT = "topic-with-data-compact"; + public static final String TOPIC_WITH_DATA_WITHOUT_TIME_RETENTION = "topic-with-data-without-time-retention"; public static final String TOPIC_BLESSED_BY_REGEX = "blessed-topic"; public static final String TOPIC_BLESSED_BY_NAME = "topic-foo"; @@ -55,7 +56,7 @@ public class ApplicationTest { void testTerminateUnusedTopics() throws Exception { // Put some data on topic-with-data topic kafkaTemplate.send(TOPIC_WITH_DATA, "foo").get(); - kafkaTemplate.send(TOPIC_WITH_DATA_COMPACT, "key", "value").get(); + kafkaTemplate.send(TOPIC_WITH_DATA_WITHOUT_TIME_RETENTION, "key", "value").get(); // Wait until consumer is started and registered in cluster try (AdminClient client1 = AdminClient.create(kafkaAdmin.getConfigurationProperties())) { @@ -71,7 +72,7 @@ void testTerminateUnusedTopics() throws Exception { assertThat(allTopics) .contains(TOPIC_CONSUMED, TOPIC_INTERNAL, TOPIC_WITH_DATA, TOPIC_BLESSED_BY_REGEX, TOPIC_BLESSED_BY_NAME) - .doesNotContain(TOPIC_UNUSED, TOPIC_WITH_DATA_COMPACT); + .doesNotContain(TOPIC_UNUSED, TOPIC_WITH_DATA_WITHOUT_TIME_RETENTION); } // Assert delete of topic increases metrics counter @@ -112,9 +113,9 @@ public NewTopic topicWithData() { } @Bean - public NewTopic topicWithDataCompact() { - return TopicBuilder.name(TOPIC_WITH_DATA_COMPACT) - .compact() + public NewTopic topicWithDataWithoutTimeRetention() { + return TopicBuilder.name(TOPIC_WITH_DATA_WITHOUT_TIME_RETENTION) + .config(TopicConfig.RETENTION_MS_CONFIG, "-1") .build(); } diff --git a/src/test/resources/config/application.yaml b/src/test/resources/config/application.yaml index 9bfb687..3527646 100644 --- a/src/test/resources/config/application.yaml +++ b/src/test/resources/config/application.yaml @@ -1,5 +1,5 @@ app: dry-run: false blessed-topics: ^blessed.*,topic-foo - non-empty-topics-matching-props: - cleanup.policy: compact + non-empty-topics: + without-time-retention: true