From 7363428c202ead1e851ef3acd545d6df514918cd Mon Sep 17 00:00:00 2001 From: Sam Barker Date: Thu, 6 Jul 2023 16:44:32 +1200 Subject: [PATCH 1/5] Switch to a Caffeine cache. --- kroxylicious-filter/pom.xml | 8 ++ .../kroxylicious/ContextCacheLoader.java | 16 ++++ .../kroxylicious/FetchDecryptFilter.java | 84 +++++++++++++------ .../kroxylicious/TopicEncryptionConfig.java | 29 ++++++- 4 files changed, 111 insertions(+), 26 deletions(-) create mode 100644 kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/ContextCacheLoader.java diff --git a/kroxylicious-filter/pom.xml b/kroxylicious-filter/pom.xml index f599cef..c316fd7 100644 --- a/kroxylicious-filter/pom.xml +++ b/kroxylicious-filter/pom.xml @@ -6,6 +6,9 @@ topic-encryption 0.0.1-SNAPSHOT + + 3.1.6 + 4.0.0 kroxylicious-filter Kroxylicious Filter @@ -46,6 +49,11 @@ kafka-clients 3.4.0 + + com.github.ben-manes.caffeine + caffeine + ${caffeine.version} + io.kroxylicious.testing testing-junit5-extension diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/ContextCacheLoader.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/ContextCacheLoader.java new file mode 100644 index 0000000..2304dad --- /dev/null +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/ContextCacheLoader.java @@ -0,0 +1,16 @@ +package io.strimzi.kafka.topicenc.kroxylicious; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import org.apache.kafka.common.Uuid; + +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; + +public class ContextCacheLoader implements AsyncCacheLoader { + + @Override + public CompletableFuture asyncLoad(Uuid key, Executor executor) throws Exception { + return null; + } +} diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java index 7d391e2..777d841 100644 --- a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java @@ -1,10 +1,13 @@ package io.strimzi.kafka.topicenc.kroxylicious; -import io.kroxylicious.proxy.filter.FetchRequestFilter; -import io.kroxylicious.proxy.filter.FetchResponseFilter; -import io.kroxylicious.proxy.filter.KrpcFilterContext; -import io.kroxylicious.proxy.filter.MetadataResponseFilter; -import io.strimzi.kafka.topicenc.EncryptionModule; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; + import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.FetchRequestData; import org.apache.kafka.common.message.FetchResponseData; @@ -17,11 +20,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletionStage; -import java.util.function.Predicate; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; + +import io.strimzi.kafka.topicenc.EncryptionModule; +import io.strimzi.kafka.topicenc.common.Strings; + +import io.kroxylicious.proxy.filter.FetchRequestFilter; +import io.kroxylicious.proxy.filter.FetchResponseFilter; +import io.kroxylicious.proxy.filter.KrpcFilterContext; +import io.kroxylicious.proxy.filter.MetadataResponseFilter; import static io.strimzi.kafka.topicenc.common.Strings.isNullOrEmpty; import static java.util.stream.Collectors.toSet; @@ -30,12 +37,13 @@ public class FetchDecryptFilter implements FetchRequestFilter, FetchResponseFilt private static final Logger log = LoggerFactory.getLogger(FetchDecryptFilter.class); public static final short METADATA_VERSION_SUPPORTING_TOPIC_IDS = (short) 12; - private final Map topicUuidToName = new HashMap<>(); private final EncryptionModule module; + private final AsyncLoadingCache topicUuidToNameCache; public FetchDecryptFilter(TopicEncryptionConfig config) { module = new EncryptionModule(config.getPolicyRepository()); + topicUuidToNameCache = config.getTopicUuidToNameCache(); } @Override @@ -54,8 +62,10 @@ public void onFetchResponse(short apiVersion, ResponseHeaderData header, FetchRe var unresolvedTopicIds = getUnresolvedTopicIds(response); if (unresolvedTopicIds.isEmpty()) { decryptFetchResponse(header, response, context); - } else { - log.warn("We did not know all topic names for {} topic ids within a fetch response, requesting metadata and returning error response", unresolvedTopicIds.size()); + } + else { + log.warn("We did not know all topic names for {} topic ids within a fetch response, requesting metadata and returning error response", + unresolvedTopicIds.size()); log.debug("We did not know all topic names for topic ids {} within a fetch response, requesting metadata and returning error response", unresolvedTopicIds); // we return an error rather than delaying the response to prevent out-of-order responses to the Consumer client. // The Filter API only supports synchronous work currently. @@ -100,12 +110,13 @@ private void decryptFetchResponse(ResponseHeaderData header, FetchResponseData r Uuid originalUuid = fetchResponse.topicId(); String originalName = fetchResponse.topic(); if (isNullOrEmpty(originalName)) { - fetchResponse.setTopic(topicUuidToName.get(originalUuid)); + fetchResponse.setTopic(getTopicNameForUuid(originalUuid)); fetchResponse.setTopicId(null); } try { module.decrypt(fetchResponse); - } catch (Exception e) { + } + catch (Exception e) { log.error("Failed to decrypt a fetchResponse for topic: " + fetchResponse.topic(), e); throw new RuntimeException(e); } @@ -115,19 +126,42 @@ private void decryptFetchResponse(ResponseHeaderData header, FetchResponseData r context.forwardResponse(header, response); } + private String getTopicNameForUuid(Uuid originalUuid) { + //TODO revisit error handling + try { + final CompletableFuture topicNameFuture = topicUuidToNameCache.getIfPresent(originalUuid); + return topicNameFuture != null ? topicNameFuture.get(5, TimeUnit.SECONDS) : null; + } + catch (InterruptedException e) { + log.warn("Caught thread interrupt", e); + Thread.currentThread().interrupt(); + } + catch (ExecutionException | TimeoutException e) { + log.warn("Failed to get ", e); + } + return null; + } + + @Override + public void onMetadataResponse(short apiVersion, ResponseHeaderData header, MetadataResponseData response, KrpcFilterContext context) { + cacheTopicIdToName(response, apiVersion); + context.forwardResponse(header, response); + } private boolean isResolvable(FetchResponseData.FetchableTopicResponse fetchableTopicResponse) { - return !isNullOrEmpty(fetchableTopicResponse.topic()) || topicUuidToName.containsKey(fetchableTopicResponse.topicId()); + return hasTopicName(fetchableTopicResponse.topicId(), fetchableTopicResponse.topic()); } private boolean isResolvable(FetchRequestData.FetchTopic fetchTopic) { - return !isNullOrEmpty(fetchTopic.topic()) || topicUuidToName.containsKey(fetchTopic.topicId()); + return hasTopicName(fetchTopic.topicId(), fetchTopic.topic()); } - @Override - public void onMetadataResponse(short apiVersion, ResponseHeaderData header, MetadataResponseData response, KrpcFilterContext context) { - cacheTopicIdToName(response, apiVersion); - context.forwardResponse(header, response); + private boolean hasTopicName(Uuid topicId, String topicName) { + if (!isNullOrEmpty(topicName)) { + final CompletableFuture futureTopicName = topicUuidToNameCache.getIfPresent(topicId); + return futureTopicName != null && futureTopicName.isDone() && !Strings.isNullOrEmpty(futureTopicName.getNow(null)); + } + return false; } private void cacheTopicIdToName(MetadataResponseData response, short apiVersion) { @@ -137,11 +171,13 @@ private void cacheTopicIdToName(MetadataResponseData response, short apiVersion) response.topics().forEach(topic -> { if (topic.errorCode() == 0) { if (topic.topicId() != null && !isNullOrEmpty(topic.name())) { - topicUuidToName.put(topic.topicId(), topic.name()); - } else { + topicUuidToNameCache.put(topic.topicId(), CompletableFuture.completedFuture(topic.name())); + } + else { log.info("not caching uuid to name because a component was null or empty, topic id {}, topic name {}", topic.topicId(), topic.name()); } - } else { + } + else { log.warn("error {} on metadata request for topic id {}, topic name {}", Errors.forCode(topic.errorCode()), topic.topicId(), topic.name()); } }); diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java index 43e1cbc..6b61e1f 100644 --- a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java @@ -1,17 +1,30 @@ package io.strimzi.kafka.topicenc.kroxylicious; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.kafka.common.Uuid; + import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import io.kroxylicious.proxy.config.BaseConfig; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; + import io.strimzi.kafka.topicenc.policy.PolicyRepository; -import java.util.Objects; +import io.kroxylicious.proxy.config.BaseConfig; public class TopicEncryptionConfig extends BaseConfig { public static final String IN_MEMORY_POLICY_REPOSITORY_PROP_NAME = "inMemoryPolicyRepository"; private final InMemoryPolicyRepositoryConfig inMemoryPolicyRepository; + @JsonIgnore + private static final ConcurrentHashMap> virtualClusterToTopicUUIDToTopicNameCache = new ConcurrentHashMap<>(); + private final ContextCacheLoader contextCacheLoader = new ContextCacheLoader(); + @JsonCreator public TopicEncryptionConfig(@JsonProperty(value = IN_MEMORY_POLICY_REPOSITORY_PROP_NAME) InMemoryPolicyRepositoryConfig inMemoryPolicyRepository) { this.inMemoryPolicyRepository = inMemoryPolicyRepository; @@ -19,7 +32,19 @@ public TopicEncryptionConfig(@JsonProperty(value = IN_MEMORY_POLICY_REPOSITORY_P + " configuration is required as it is the only PolicyRepository implementation"); } + public ContextCacheLoader getContextCacheLoader() { + return contextCacheLoader; + } + public PolicyRepository getPolicyRepository() { return inMemoryPolicyRepository.getPolicyRepository(); } + + public AsyncLoadingCache getTopicUuidToNameCache() { + return virtualClusterToTopicUUIDToTopicNameCache.computeIfAbsent("VIRTUAL_CLUSTER_ID", (key) -> Caffeine + .newBuilder() + .expireAfterAccess(Duration.ofHours(1)) + .buildAsync(contextCacheLoader)); + } + } From 0b54d7575273f06b0e715a345674a78f4778da17 Mon Sep 17 00:00:00 2001 From: Sam Barker Date: Wed, 19 Jul 2023 17:07:53 +1200 Subject: [PATCH 2/5] Initial skeleton for separating out a TopicIdCache. --- kroxylicious-filter/pom.xml | 7 + .../kroxylicious/FetchDecryptFilter.java | 37 +---- .../kroxylicious/TopicEncryptionConfig.java | 9 +- .../topicenc/kroxylicious/TopicIdCache.java | 49 +++++++ .../kroxylicious/TopicIdCacheTest.java | 128 ++++++++++++++++++ 5 files changed, 191 insertions(+), 39 deletions(-) create mode 100644 kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCache.java create mode 100644 kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCacheTest.java diff --git a/kroxylicious-filter/pom.xml b/kroxylicious-filter/pom.xml index c316fd7..499bc8e 100644 --- a/kroxylicious-filter/pom.xml +++ b/kroxylicious-filter/pom.xml @@ -8,6 +8,7 @@ 3.1.6 + 3.24.2 4.0.0 kroxylicious-filter @@ -84,6 +85,12 @@ 1.18.3 test + + org.assertj + assertj-core + ${assertj.version} + test + diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java index 777d841..fafbdad 100644 --- a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java @@ -2,7 +2,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -11,7 +10,6 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.FetchRequestData; import org.apache.kafka.common.message.FetchResponseData; -import org.apache.kafka.common.message.MetadataRequestData; import org.apache.kafka.common.message.MetadataResponseData; import org.apache.kafka.common.message.MetadataResponseDataJsonConverter; import org.apache.kafka.common.message.RequestHeaderData; @@ -20,10 +18,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.benmanes.caffeine.cache.AsyncLoadingCache; - import io.strimzi.kafka.topicenc.EncryptionModule; -import io.strimzi.kafka.topicenc.common.Strings; import io.kroxylicious.proxy.filter.FetchRequestFilter; import io.kroxylicious.proxy.filter.FetchResponseFilter; @@ -39,7 +34,7 @@ public class FetchDecryptFilter implements FetchRequestFilter, FetchResponseFilt public static final short METADATA_VERSION_SUPPORTING_TOPIC_IDS = (short) 12; private final EncryptionModule module; - private final AsyncLoadingCache topicUuidToNameCache; + private final TopicIdCache topicUuidToNameCache; public FetchDecryptFilter(TopicEncryptionConfig config) { module = new EncryptionModule(config.getPolicyRepository()); @@ -93,16 +88,7 @@ private void resolveTopicsAndReturnError(ResponseHeaderData header, KrpcFilterCo } private void resolveAndCache(KrpcFilterContext context, Set topicIdsToResolve) { - MetadataRequestData request = new MetadataRequestData(); - topicIdsToResolve.forEach(uuid -> { - MetadataRequestData.MetadataRequestTopic e = new MetadataRequestData.MetadataRequestTopic(); - e.setTopicId(uuid); - request.topics().add(e); - }); - // if the client is sending topic ids we will assume the broker can support at least the lowest metadata apiVersion - // supporting topicIds - CompletionStage stage = context.sendRequest(METADATA_VERSION_SUPPORTING_TOPIC_IDS, request); - stage.thenAccept(response -> cacheTopicIdToName(response, METADATA_VERSION_SUPPORTING_TOPIC_IDS)); + topicUuidToNameCache.resolveTopicNames(context, topicIdsToResolve); } private void decryptFetchResponse(ResponseHeaderData header, FetchResponseData response, KrpcFilterContext context) { @@ -129,7 +115,7 @@ private void decryptFetchResponse(ResponseHeaderData header, FetchResponseData r private String getTopicNameForUuid(Uuid originalUuid) { //TODO revisit error handling try { - final CompletableFuture topicNameFuture = topicUuidToNameCache.getIfPresent(originalUuid); + final CompletableFuture topicNameFuture = topicUuidToNameCache.getTopicName(originalUuid); return topicNameFuture != null ? topicNameFuture.get(5, TimeUnit.SECONDS) : null; } catch (InterruptedException e) { @@ -157,11 +143,7 @@ private boolean isResolvable(FetchRequestData.FetchTopic fetchTopic) { } private boolean hasTopicName(Uuid topicId, String topicName) { - if (!isNullOrEmpty(topicName)) { - final CompletableFuture futureTopicName = topicUuidToNameCache.getIfPresent(topicId); - return futureTopicName != null && futureTopicName.isDone() && !Strings.isNullOrEmpty(futureTopicName.getNow(null)); - } - return false; + return !isNullOrEmpty(topicName) || topicUuidToNameCache.hasResolvedTopic(topicId); } private void cacheTopicIdToName(MetadataResponseData response, short apiVersion) { @@ -169,17 +151,6 @@ private void cacheTopicIdToName(MetadataResponseData response, short apiVersion) log.trace("received metadata response: {}", MetadataResponseDataJsonConverter.write(response, apiVersion)); } response.topics().forEach(topic -> { - if (topic.errorCode() == 0) { - if (topic.topicId() != null && !isNullOrEmpty(topic.name())) { - topicUuidToNameCache.put(topic.topicId(), CompletableFuture.completedFuture(topic.name())); - } - else { - log.info("not caching uuid to name because a component was null or empty, topic id {}, topic name {}", topic.topicId(), topic.name()); - } - } - else { - log.warn("error {} on metadata request for topic id {}, topic name {}", Errors.forCode(topic.errorCode()), topic.topicId(), topic.name()); - } }); } } diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java index 6b61e1f..47ad298 100644 --- a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java @@ -22,7 +22,7 @@ public class TopicEncryptionConfig extends BaseConfig { private final InMemoryPolicyRepositoryConfig inMemoryPolicyRepository; @JsonIgnore - private static final ConcurrentHashMap> virtualClusterToTopicUUIDToTopicNameCache = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap virtualClusterToTopicUUIDToTopicNameCache = new ConcurrentHashMap<>(); private final ContextCacheLoader contextCacheLoader = new ContextCacheLoader(); @JsonCreator @@ -40,11 +40,8 @@ public PolicyRepository getPolicyRepository() { return inMemoryPolicyRepository.getPolicyRepository(); } - public AsyncLoadingCache getTopicUuidToNameCache() { - return virtualClusterToTopicUUIDToTopicNameCache.computeIfAbsent("VIRTUAL_CLUSTER_ID", (key) -> Caffeine - .newBuilder() - .expireAfterAccess(Duration.ofHours(1)) - .buildAsync(contextCacheLoader)); + public TopicIdCache getTopicUuidToNameCache() { + return virtualClusterToTopicUUIDToTopicNameCache.computeIfAbsent("VIRTUAL_CLUSTER_ID", (key) -> new TopicIdCache()); } } diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCache.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCache.java new file mode 100644 index 0000000..34d2c13 --- /dev/null +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCache.java @@ -0,0 +1,49 @@ +package io.strimzi.kafka.topicenc.kroxylicious; + +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Future; + +import org.apache.kafka.common.Uuid; + +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.annotations.VisibleForTesting; + +import io.kroxylicious.proxy.filter.KrpcFilterContext; + +public class TopicIdCache { + private final AsyncLoadingCache topicNamesById; + + public TopicIdCache() { + this(Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(10)).buildAsync((key, executor) -> { + //TODO something clever. + return null; + })); + } + + TopicIdCache(AsyncLoadingCache topicNamesById) { + this.topicNamesById = topicNamesById; + } + + /** + * Exposes a future to avoid multiple clients triggering metadata requests for the same topicId. + * @param topicId to convert to a name + * @return the Future which will be completed when the topic name is resolved or null if the topic is not known (and is not currently being resolved) + */ + public CompletableFuture getTopicName(Uuid topicId) { + return topicNamesById.getIfPresent(topicId); + } + + public boolean hasResolvedTopic(Uuid topicId) { + final CompletableFuture topicNameFuture = topicNamesById.getIfPresent(topicId); + //Caffeine converts failed or cancelled futures to null internally, so we don't have to handle them explicitly + return topicNameFuture != null && topicNameFuture.isDone(); + } + + public void resolveTopicNames(KrpcFilterContext context, Set topicIdsToResolve) { + + } +} diff --git a/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCacheTest.java b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCacheTest.java new file mode 100644 index 0000000..5a50dc0 --- /dev/null +++ b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCacheTest.java @@ -0,0 +1,128 @@ +package io.strimzi.kafka.topicenc.kroxylicious; + +import java.util.concurrent.CompletableFuture; + +import org.apache.kafka.common.Uuid; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.github.benmanes.caffeine.cache.AsyncCache; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; + +import static org.assertj.core.api.Assertions.assertThat; + +class TopicIdCacheTest { + + private static final Uuid UNKNOWN_TOPIC_ID = Uuid.randomUuid(); + private static final Uuid KNOWN_TOPIC_ID = Uuid.randomUuid(); + private static final Uuid PENDING_TOPIC_ID = Uuid.randomUuid(); + private static final String KNOWN_TOPIC_NAME = "TOPIC_WIBBLE"; + private TopicIdCache topicIdCache; + private AsyncLoadingCache underlyingCache; + + @BeforeEach + void setUp() { + underlyingCache = Caffeine.newBuilder().buildAsync((key, executor) -> null); + underlyingCache.put(KNOWN_TOPIC_ID, CompletableFuture.completedFuture(KNOWN_TOPIC_NAME)); + underlyingCache.put(PENDING_TOPIC_ID, new CompletableFuture<>()); + topicIdCache = new TopicIdCache(underlyingCache); + } + + @Test + void shouldReturnFalseForUnknownTopicId() { + //Given + + //When + final boolean isResolved = topicIdCache.hasResolvedTopic(UNKNOWN_TOPIC_ID); + + //Then + assertThat(isResolved).isFalse(); + } + + @Test + void shouldReturnFalseForTopicIdWhichIsStillInProgress() { + //Given + + //When + final boolean isResolved = topicIdCache.hasResolvedTopic(PENDING_TOPIC_ID); + + //Then + assertThat(isResolved).isFalse(); + } + + @Test + void shouldReturnFalseForTopicIdWhichHasFailed() { + //Given + underlyingCache.put(PENDING_TOPIC_ID, CompletableFuture.failedFuture(new IllegalStateException("boom boom boom"))); + + //When + final boolean isResolved = topicIdCache.hasResolvedTopic(PENDING_TOPIC_ID); + + //Then + assertThat(isResolved).isFalse(); + } + + @Test + void shouldReturnFalseForTopicIdOfWhichResolutionWasCancelled() { + //Given + final CompletableFuture cancelledFuture = new CompletableFuture<>(); + cancelledFuture.cancel(true); + underlyingCache.put(PENDING_TOPIC_ID, cancelledFuture); + + //When + final boolean isResolved = topicIdCache.hasResolvedTopic(PENDING_TOPIC_ID); + + //Then + assertThat(isResolved).isFalse(); + } + + @Test + void shouldReturnFalseForTopicIdOfWhichResolutionWasCancelledAfterCaching() { + //Given + final CompletableFuture cancelledFuture = new CompletableFuture<>(); + underlyingCache.put(PENDING_TOPIC_ID, cancelledFuture); + cancelledFuture.cancel(true); + + //When + final boolean isResolved = topicIdCache.hasResolvedTopic(PENDING_TOPIC_ID); + + //Then + assertThat(isResolved).isFalse(); + } + + @Test + void shouldReturnTrueForKnownTopicId() { + //Given + + //When + final boolean isResolved = topicIdCache.hasResolvedTopic(KNOWN_TOPIC_ID); + + //Then + assertThat(isResolved).isTrue(); + } + + @Test + void shouldReturnNameFromGetForKnownTopic() { + //Given + + //When + final CompletableFuture topicName = topicIdCache.getTopicName(KNOWN_TOPIC_ID); + + //Then + Assertions.assertThat(topicName).isCompletedWithValue(KNOWN_TOPIC_NAME); + } + + @Test + void shouldReturnNullFromGetForUnresolvedTopic() { + //Given + + //When + final CompletableFuture topicName = topicIdCache.getTopicName(UNKNOWN_TOPIC_ID); + + //Then + Assertions.assertThat(topicName).isNull(); + } + +} \ No newline at end of file From a7ad3fc4b0306714f840c46ccf1b83cf84ed22b9 Mon Sep 17 00:00:00 2001 From: Sam Barker Date: Thu, 20 Jul 2023 11:38:44 +1200 Subject: [PATCH 3/5] Implement cache loading. --- kroxylicious-filter/pom.xml | 13 +++ .../topicenc/kroxylicious/TopicIdCache.java | 35 ++++-- .../kroxylicious/TopicIdCacheTest.java | 106 +++++++++++++++++- 3 files changed, 142 insertions(+), 12 deletions(-) diff --git a/kroxylicious-filter/pom.xml b/kroxylicious-filter/pom.xml index 499bc8e..9b9517b 100644 --- a/kroxylicious-filter/pom.xml +++ b/kroxylicious-filter/pom.xml @@ -9,6 +9,7 @@ 3.1.6 3.24.2 + 5.4.0 4.0.0 kroxylicious-filter @@ -91,6 +92,18 @@ ${assertj.version} test + + org.mockito + mockito-core + ${mockito.version} + test + + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test + diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCache.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCache.java index 34d2c13..3e09f97 100644 --- a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCache.java +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCache.java @@ -1,30 +1,28 @@ package io.strimzi.kafka.topicenc.kroxylicious; import java.time.Duration; +import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.Future; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.requests.MetadataRequest; -import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.AsyncCache; import com.github.benmanes.caffeine.cache.Caffeine; -import com.google.common.annotations.VisibleForTesting; import io.kroxylicious.proxy.filter.KrpcFilterContext; public class TopicIdCache { - private final AsyncLoadingCache topicNamesById; + private final AsyncCache topicNamesById; public TopicIdCache() { - this(Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(10)).buildAsync((key, executor) -> { - //TODO something clever. - return null; - })); + this(Caffeine.newBuilder().expireAfterAccess(Duration.ofMinutes(10)).buildAsync()); } - TopicIdCache(AsyncLoadingCache topicNamesById) { + TopicIdCache(AsyncCache topicNamesById) { this.topicNamesById = topicNamesById; } @@ -44,6 +42,21 @@ public boolean hasResolvedTopic(Uuid topicId) { } public void resolveTopicNames(KrpcFilterContext context, Set topicIdsToResolve) { - + final MetadataRequest.Builder builder = new MetadataRequest.Builder(List.copyOf(topicIdsToResolve)); + final MetadataRequest metadataRequest = builder.build(builder.latestAllowedVersion()); + topicIdsToResolve.forEach(uuid -> topicNamesById.put(uuid, new CompletableFuture<>())); + context. sendRequest(metadataRequest.version(), metadataRequest.data()) + .whenComplete((metadataResponseData, throwable) -> { + if (throwable != null) { + //TODO something sensible + } + else { + metadataResponseData.topics() + .forEach(metadataResponseTopic -> Objects.requireNonNull(topicNamesById.getIfPresent(metadataResponseTopic.topicId())) + .complete(metadataResponseTopic.name())); + //If we were to get null from getIfPresent it would imply we got a result for a topic we didn't expect + } + }); } + } diff --git a/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCacheTest.java b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCacheTest.java index 5a50dc0..7ca7e72 100644 --- a/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCacheTest.java +++ b/kroxylicious-filter/src/test/java/io/strimzi/kafka/topicenc/kroxylicious/TopicIdCacheTest.java @@ -1,26 +1,43 @@ package io.strimzi.kafka.topicenc.kroxylicious; +import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.MetadataRequestData; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.protocol.ApiMessage; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; -import com.github.benmanes.caffeine.cache.AsyncCache; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; +import io.kroxylicious.proxy.filter.KrpcFilterContext; + import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyShort; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +@ExtendWith(MockitoExtension.class) class TopicIdCacheTest { private static final Uuid UNKNOWN_TOPIC_ID = Uuid.randomUuid(); private static final Uuid KNOWN_TOPIC_ID = Uuid.randomUuid(); private static final Uuid PENDING_TOPIC_ID = Uuid.randomUuid(); private static final String KNOWN_TOPIC_NAME = "TOPIC_WIBBLE"; + private static final String RESOLVED_TOPIC_NAME = "TOPIC_RESOLVED"; private TopicIdCache topicIdCache; private AsyncLoadingCache underlyingCache; + private KrpcFilterContext filterContext; + private CompletableFuture pendingFuture; @BeforeEach void setUp() { @@ -28,6 +45,10 @@ void setUp() { underlyingCache.put(KNOWN_TOPIC_ID, CompletableFuture.completedFuture(KNOWN_TOPIC_NAME)); underlyingCache.put(PENDING_TOPIC_ID, new CompletableFuture<>()); topicIdCache = new TopicIdCache(underlyingCache); + filterContext = mock(KrpcFilterContext.class); + pendingFuture = new CompletableFuture<>(); + + lenient().when(filterContext.sendRequest(anyShort(), any())).thenReturn(pendingFuture); } @Test @@ -125,4 +146,87 @@ void shouldReturnNullFromGetForUnresolvedTopic() { Assertions.assertThat(topicName).isNull(); } + @Test + void shouldSendMetadataRequestToResolveTopicNames() { + //Given + + //When + topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID)); + + //Then + verify(filterContext).sendRequest(anyShort(), any(MetadataRequestData.class)); + } + + @Test + void shouldIncludeTopicId() { + //Given + + //When + topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID)); + + //Then + verify(filterContext).sendRequest(anyShort(), Mockito.argThat(apiMessage -> { + assertThat(apiMessage).isInstanceOf(MetadataRequestData.class); + assertThat(((MetadataRequestData) apiMessage).topics()).anyMatch(metadataRequestTopic -> UNKNOWN_TOPIC_ID.equals(metadataRequestTopic.topicId())); + return true; + })); + } + + @Test + void shouldIncludeMultipleTopicId() { + //Given + + //When + topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID, PENDING_TOPIC_ID)); + + //Then + verify(filterContext).sendRequest(anyShort(), Mockito.argThat(apiMessage -> { + assertThat(apiMessage).isInstanceOf(MetadataRequestData.class); + assertThat(((MetadataRequestData) apiMessage).topics()) + .allMatch( + metadataRequestTopic -> UNKNOWN_TOPIC_ID.equals(metadataRequestTopic.topicId()) || PENDING_TOPIC_ID.equals(metadataRequestTopic.topicId())); + return true; + })); + } + + @Test + void shouldReturnPendingFutures() { + //Given + + //When + topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID, PENDING_TOPIC_ID)); + + //Then + assertThat(topicIdCache.getTopicName(UNKNOWN_TOPIC_ID)).isNotNull().isNotDone(); + assertThat(topicIdCache.getTopicName(PENDING_TOPIC_ID)).isNotNull().isNotDone(); + } + + @Test + void shouldCacheFutureForTopicId() { + //Given + topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID)); + + //When + final CompletableFuture actualFuture = topicIdCache.getTopicName(UNKNOWN_TOPIC_ID); + + //Then + assertThat(actualFuture).isNotNull().isNotDone(); + } + + @Test + void shouldCompleteFutureWhenMetadataResponseDelivered() { + //Given + topicIdCache.resolveTopicNames(filterContext, Set.of(UNKNOWN_TOPIC_ID)); + final MetadataResponseData.MetadataResponseTopic responseTopic = new MetadataResponseData.MetadataResponseTopic(); + responseTopic.setTopicId(UNKNOWN_TOPIC_ID).setName(RESOLVED_TOPIC_NAME); + final MetadataResponseData.MetadataResponseTopicCollection metadataResponseTopics = new MetadataResponseData.MetadataResponseTopicCollection(); + metadataResponseTopics.add(responseTopic); + + //When + pendingFuture.complete(new MetadataResponseData().setTopics(metadataResponseTopics)); + + //Then + final CompletableFuture actualFuture = topicIdCache.getTopicName(UNKNOWN_TOPIC_ID); + assertThat(actualFuture).isCompletedWithValue(RESOLVED_TOPIC_NAME); + } } \ No newline at end of file From 224f2883ba6b2fdd8d7336bb9d3c8a6f38ea5c9b Mon Sep 17 00:00:00 2001 From: Sam Barker Date: Thu, 20 Jul 2023 11:47:04 +1200 Subject: [PATCH 4/5] Context cacheLoader can never help us as we can't trigger loading from a cache miss directly. --- .../topicenc/kroxylicious/TopicEncryptionConfig.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java index 47ad298..85a8be7 100644 --- a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/TopicEncryptionConfig.java @@ -1,16 +1,11 @@ package io.strimzi.kafka.topicenc.kroxylicious; -import java.time.Duration; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import org.apache.kafka.common.Uuid; - import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.github.benmanes.caffeine.cache.AsyncLoadingCache; -import com.github.benmanes.caffeine.cache.Caffeine; import io.strimzi.kafka.topicenc.policy.PolicyRepository; @@ -23,7 +18,6 @@ public class TopicEncryptionConfig extends BaseConfig { @JsonIgnore private static final ConcurrentHashMap virtualClusterToTopicUUIDToTopicNameCache = new ConcurrentHashMap<>(); - private final ContextCacheLoader contextCacheLoader = new ContextCacheLoader(); @JsonCreator public TopicEncryptionConfig(@JsonProperty(value = IN_MEMORY_POLICY_REPOSITORY_PROP_NAME) InMemoryPolicyRepositoryConfig inMemoryPolicyRepository) { @@ -32,10 +26,6 @@ public TopicEncryptionConfig(@JsonProperty(value = IN_MEMORY_POLICY_REPOSITORY_P + " configuration is required as it is the only PolicyRepository implementation"); } - public ContextCacheLoader getContextCacheLoader() { - return contextCacheLoader; - } - public PolicyRepository getPolicyRepository() { return inMemoryPolicyRepository.getPolicyRepository(); } From e130c3a729be4d5168ca29a39c09bcfe395ba35a Mon Sep 17 00:00:00 2001 From: Sam Barker Date: Thu, 20 Jul 2023 17:02:45 +1200 Subject: [PATCH 5/5] Don't block the event loop --- .../kroxylicious/FetchDecryptFilter.java | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java index fafbdad..a2026ed 100644 --- a/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java +++ b/kroxylicious-filter/src/main/java/io/strimzi/kafka/topicenc/kroxylicious/FetchDecryptFilter.java @@ -2,9 +2,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import org.apache.kafka.common.Uuid; @@ -114,18 +111,8 @@ private void decryptFetchResponse(ResponseHeaderData header, FetchResponseData r private String getTopicNameForUuid(Uuid originalUuid) { //TODO revisit error handling - try { - final CompletableFuture topicNameFuture = topicUuidToNameCache.getTopicName(originalUuid); - return topicNameFuture != null ? topicNameFuture.get(5, TimeUnit.SECONDS) : null; - } - catch (InterruptedException e) { - log.warn("Caught thread interrupt", e); - Thread.currentThread().interrupt(); - } - catch (ExecutionException | TimeoutException e) { - log.warn("Failed to get ", e); - } - return null; + final CompletableFuture topicNameFuture = topicUuidToNameCache.getTopicName(originalUuid); + return topicNameFuture != null ? topicNameFuture.getNow(null) : null; } @Override