Skip to content

Commit

Permalink
feat!: better UX to configure non-empty topic deletion (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgb authored Oct 29, 2024
1 parent 5466ace commit 8e8f491
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.springframework.validation.annotation.Validated;

import java.util.Collection;
import java.util.Map;
import java.util.regex.Pattern;

@ConfigurationProperties("app")
Expand Down Expand Up @@ -39,10 +38,9 @@ public class ApplicationProperties {
/**
* Can be used to terminate topics even if the topic contains data
* (destructive operation) if the topic is otherwise considered unused.
* The supplied properties will be matched against topic configuration,
* and all properties must match!
* By default, no topics with data will be deleted.
*/
private Map<String, String> nonEmptyTopicsMatchingProps;
private NonEmptyTopicCharacteristics nonEmptyTopics;

public String getFixedRateString() {
return fixedRateString;
Expand All @@ -68,11 +66,26 @@ public void setBlessedTopics(Collection<Pattern> blessedTopics) {
this.blessedTopics = blessedTopics;
}

public Map<String, String> getNonEmptyTopicsMatchingProps() {
return nonEmptyTopicsMatchingProps;
public NonEmptyTopicCharacteristics getNonEmptyTopics() {
return nonEmptyTopics;
}

public void setNonEmptyTopicsMatchingProps(Map<String, String> nonEmptyTopicsMatchingProps) {
this.nonEmptyTopicsMatchingProps = nonEmptyTopicsMatchingProps;
public void setNonEmptyTopics(NonEmptyTopicCharacteristics nonEmptyTopics) {
this.nonEmptyTopics = nonEmptyTopics;
}

public static class NonEmptyTopicCharacteristics {
/**
* If set to ´true´, topics without time retention can be terminated.
*/
private boolean withoutTimeRetention;

public boolean isWithoutTimeRetention() {
return withoutTimeRetention;
}

public void setWithoutTimeRetention(boolean withoutTimeRetention) {
this.withoutTimeRetention = withoutTimeRetention;
}
}
}
17 changes: 11 additions & 6 deletions src/main/java/io/statnett/k3a/topicterminator/TopicTerminator.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.statnett.k3a.topicterminator.strategy.ReservedIfTopicNotMatchingProps;
import io.statnett.k3a.topicterminator.strategy.ReservedTopic;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.config.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaAdmin;
Expand All @@ -25,6 +26,7 @@
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;


@Component
public class TopicTerminator {
private final Logger log = LoggerFactory.getLogger(this.getClass());
Expand All @@ -47,16 +49,19 @@ public void terminateUnusedTopics() throws ExecutionException, InterruptedExcept
.setMessage("Terminating unused topics")
.addKeyValue("dry-run", props.isDryRun())
.log();

ReservedTopic nonEmptyTopic = nonEmptyTopics();
if (props.getNonEmptyTopics().isWithoutTimeRetention()) {
nonEmptyTopic = and(nonEmptyTopic, reservedTopicsNotMatchingProps(
Map.of(TopicConfig.RETENTION_MS_CONFIG, "-1")
));
}

from(allTopics())
.remove(internalTopics())
.remove(blessedTopics(props.getBlessedTopics()))
.remove(consumedTopics())
.remove(
and(
nonEmptyTopics(),
reservedTopicsNotMatchingProps(props.getNonEmptyTopicsMatchingProps())
)
)
.remove(nonEmptyTopic)
.terminate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.TopicConfig;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.actuate.observability.AutoConfigureObservability;
Expand Down Expand Up @@ -35,7 +36,7 @@ public class ApplicationTest {
public static final String TOPIC_INTERNAL = "_schemas";
public static final String TOPIC_UNUSED = "topic-unused";
public static final String TOPIC_WITH_DATA = "topic-with-data";
public static final String TOPIC_WITH_DATA_COMPACT = "topic-with-data-compact";
public static final String TOPIC_WITH_DATA_WITHOUT_TIME_RETENTION = "topic-with-data-without-time-retention";
public static final String TOPIC_BLESSED_BY_REGEX = "blessed-topic";
public static final String TOPIC_BLESSED_BY_NAME = "topic-foo";

Expand All @@ -55,7 +56,7 @@ public class ApplicationTest {
void testTerminateUnusedTopics() throws Exception {
// Put some data on topic-with-data topic
kafkaTemplate.send(TOPIC_WITH_DATA, "foo").get();
kafkaTemplate.send(TOPIC_WITH_DATA_COMPACT, "key", "value").get();
kafkaTemplate.send(TOPIC_WITH_DATA_WITHOUT_TIME_RETENTION, "key", "value").get();

// Wait until consumer is started and registered in cluster
try (AdminClient client1 = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
Expand All @@ -71,7 +72,7 @@ void testTerminateUnusedTopics() throws Exception {

assertThat(allTopics)
.contains(TOPIC_CONSUMED, TOPIC_INTERNAL, TOPIC_WITH_DATA, TOPIC_BLESSED_BY_REGEX, TOPIC_BLESSED_BY_NAME)
.doesNotContain(TOPIC_UNUSED, TOPIC_WITH_DATA_COMPACT);
.doesNotContain(TOPIC_UNUSED, TOPIC_WITH_DATA_WITHOUT_TIME_RETENTION);
}

// Assert delete of topic increases metrics counter
Expand Down Expand Up @@ -112,9 +113,9 @@ public NewTopic topicWithData() {
}

@Bean
public NewTopic topicWithDataCompact() {
return TopicBuilder.name(TOPIC_WITH_DATA_COMPACT)
.compact()
public NewTopic topicWithDataWithoutTimeRetention() {
return TopicBuilder.name(TOPIC_WITH_DATA_WITHOUT_TIME_RETENTION)
.config(TopicConfig.RETENTION_MS_CONFIG, "-1")
.build();
}

Expand Down
4 changes: 2 additions & 2 deletions src/test/resources/config/application.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
app:
dry-run: false
blessed-topics: ^blessed.*,topic-foo
non-empty-topics-matching-props:
cleanup.policy: compact
non-empty-topics:
without-time-retention: true

0 comments on commit 8e8f491

Please sign in to comment.