From b07f7a0c40a571f798bbee4b63a8a2d8aaf54f4d Mon Sep 17 00:00:00 2001 From: Robert Young Date: Tue, 20 Feb 2024 16:23:00 +1300 Subject: [PATCH 1/3] Wire single threaded retry executor into Resilient KMS Why: With the refactor moving KMS construction to filter factory initialisation time, the eventloop became null and retries failed immediately with NPE. Signed-off-by: Robert Young --- CHANGELOG.md | 1 + .../filter/encryption/EnvelopeEncryption.java | 8 +++++++- .../filter/encryption/EnvelopeEncryptionTest.java | 10 ++++++++++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f3906a07d..7a7804e37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Please enumerate **all user-facing** changes using format ` implements FilterFactory> { + static final ScheduledExecutorService RETRY_POOL = Executors.newSingleThreadScheduledExecutor(r -> { + Thread retryThread = new Thread(r, "kmsRetry"); + retryThread.setDaemon(true); + return retryThread; + }); private static KmsMetrics kmsMetrics = MicrometerKmsMetrics.create(Metrics.globalRegistry); record Config( @@ -105,7 +111,7 @@ private static Kms buildKms(FilterFactoryContext context, Config co kms = InstrumentedKms.wrap(kms, kmsMetrics); ExponentialJitterBackoffStrategy backoffStrategy = new ExponentialJitterBackoffStrategy(Duration.ofMillis(500), Duration.ofSeconds(5), 2d, ThreadLocalRandom.current()); - kms = ResilientKms.wrap(kms, context.eventLoop(), backoffStrategy, 3); + kms = ResilientKms.wrap(kms, RETRY_POOL, backoffStrategy, 3); return wrapWithCachingKms(configuration, kms); } diff --git a/kroxylicious-filters/kroxylicious-encryption/src/test/java/io/kroxylicious/filter/encryption/EnvelopeEncryptionTest.java b/kroxylicious-filters/kroxylicious-encryption/src/test/java/io/kroxylicious/filter/encryption/EnvelopeEncryptionTest.java index b2bb14eec..00663e8cd 100644 --- a/kroxylicious-filters/kroxylicious-encryption/src/test/java/io/kroxylicious/filter/encryption/EnvelopeEncryptionTest.java +++ b/kroxylicious-filters/kroxylicious-encryption/src/test/java/io/kroxylicious/filter/encryption/EnvelopeEncryptionTest.java @@ -7,6 +7,7 @@ package io.kroxylicious.filter.encryption; import java.time.Duration; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import org.junit.jupiter.api.Test; @@ -57,4 +58,13 @@ void testKmsCacheConfigDefaults() { assertThat(config.resolvedAliasExpireAfterWriteDuration()).isEqualTo(Duration.ofMinutes(10)); assertThat(config.resolvedAliasRefreshAfterWriteDuration()).isEqualTo(Duration.ofMinutes(8)); } + + @Test + void testRetryPool() { + Future thread = EnvelopeEncryption.RETRY_POOL.submit(Thread::currentThread); + assertThat(thread).succeedsWithin(Duration.ofSeconds(5)).satisfies(thread1 -> { + assertThat(thread1.getName()).isEqualTo("kmsRetry"); + assertThat(thread1.isDaemon()).isTrue(); + }); + } } From 438dec6f6af05c1f75c5aad5c551a618324b0d01 Mon Sep 17 00:00:00 2001 From: Robert Young Date: Tue, 20 Feb 2024 16:27:40 +1300 Subject: [PATCH 2/3] Resilient KMS verifies it's components are not null Why: We had a problem where the executor was null and we didn't find out until we needed to retry. Checking the assumption early would have exposed it faster. Signed-off-by: Robert Young --- .../filter/encryption/ResilientKms.java | 19 ++++++------ .../filter/encryption/ResilientKmsTest.java | 29 +++++++++++++++++++ 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/kroxylicious-filters/kroxylicious-encryption/src/main/java/io/kroxylicious/filter/encryption/ResilientKms.java b/kroxylicious-filters/kroxylicious-encryption/src/main/java/io/kroxylicious/filter/encryption/ResilientKms.java index 4ce0aa694..603e6d4d1 100644 --- a/kroxylicious-filters/kroxylicious-encryption/src/main/java/io/kroxylicious/filter/encryption/ResilientKms.java +++ b/kroxylicious-filters/kroxylicious-encryption/src/main/java/io/kroxylicious/filter/encryption/ResilientKms.java @@ -29,6 +29,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; +import static java.util.Objects.requireNonNull; import static java.util.Optional.ofNullable; public class ResilientKms implements Kms { @@ -39,19 +40,19 @@ public class ResilientKms implements Kms { private final BackoffStrategy strategy; private final int retries; - private ResilientKms(Kms inner, - ScheduledExecutorService executorService, - BackoffStrategy backoffStrategy, + private ResilientKms(@NonNull Kms inner, + @NonNull ScheduledExecutorService executorService, + @NonNull BackoffStrategy backoffStrategy, int retries) { - this.inner = inner; - this.executorService = executorService; - strategy = backoffStrategy; + this.inner = requireNonNull(inner); + this.executorService = requireNonNull(executorService); + strategy = requireNonNull(backoffStrategy); this.retries = retries; } - public static Kms wrap(Kms delegate, - ScheduledExecutorService executorService, - BackoffStrategy strategy, + public static Kms wrap(@NonNull Kms delegate, + @NonNull ScheduledExecutorService executorService, + @NonNull BackoffStrategy strategy, int retries) { return new ResilientKms<>(delegate, executorService, strategy, retries); diff --git a/kroxylicious-filters/kroxylicious-encryption/src/test/java/io/kroxylicious/filter/encryption/ResilientKmsTest.java b/kroxylicious-filters/kroxylicious-encryption/src/test/java/io/kroxylicious/filter/encryption/ResilientKmsTest.java index ffb00c87a..e34125b01 100644 --- a/kroxylicious-filters/kroxylicious-encryption/src/test/java/io/kroxylicious/filter/encryption/ResilientKmsTest.java +++ b/kroxylicious-filters/kroxylicious-encryption/src/test/java/io/kroxylicious/filter/encryption/ResilientKmsTest.java @@ -29,6 +29,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -360,6 +361,34 @@ void testGetEdekSerde() { assertThat(serde).isSameAs(mockSerde); } + @Test + void testExecutorNotNullable() { + Kms kms = Mockito.mock(Kms.class); + Serde mockSerde = mock(Serde.class); + when(kms.edekSerde()).thenReturn(mockSerde); + BackoffStrategy backoffStrategy = mock(BackoffStrategy.class); + assertThatThrownBy(() -> ResilientKms.wrap(kms, null, backoffStrategy, 3)) + .isInstanceOf(NullPointerException.class); + } + + @Test + void testInnerKmsNotNullable() { + BackoffStrategy backoffStrategy = mock(BackoffStrategy.class); + ScheduledExecutorService executor = getMockExecutor(); + assertThatThrownBy(() -> ResilientKms.wrap(null, executor, backoffStrategy, 3)) + .isInstanceOf(NullPointerException.class); + } + + @Test + void testBackoffStrategyNotNullable() { + Kms kms = Mockito.mock(Kms.class); + Serde mockSerde = mock(Serde.class); + when(kms.edekSerde()).thenReturn(mockSerde); + ScheduledExecutorService executor = getMockExecutor(); + assertThatThrownBy(() -> ResilientKms.wrap(kms, executor, null, 3)) + .isInstanceOf(NullPointerException.class); + } + @NonNull private static ScheduledExecutorService getMockExecutor() { ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class); From 36a64eb5cc192ccff808253d6375a545e336ac30 Mon Sep 17 00:00:00 2001 From: Robert Young Date: Wed, 21 Feb 2024 12:16:34 +1300 Subject: [PATCH 3/3] Fix flaky unit test Why: The test wants to check that our test cipher produces different output for different topic names. The cipher rotated all the bytes by a fixed amount determined by the topic name hashcode. Meaning there was a ~0.0039 chance of the same rotation byte being picked and causing the test to fail. This change varies the rotation byte by using the characters of the topicname to modify the rotation for each index. Signed-off-by: Robert Young --- .../src/test/java/io/kroxylicious/proxy/FilterIT.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/FilterIT.java b/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/FilterIT.java index ca9f2547a..2d2c0eddb 100644 --- a/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/FilterIT.java +++ b/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/FilterIT.java @@ -147,8 +147,8 @@ void encryptionDistinguishedByName() { static ByteBuffer encode(String topicName, ByteBuffer in) { var out = ByteBuffer.allocate(in.limit()); - byte rot = (byte) (topicName.hashCode() % Byte.MAX_VALUE); for (int index = 0; index < in.limit(); index++) { + byte rot = getRot(topicName, index); byte b = in.get(index); byte rotated = (byte) (b + rot); out.put(index, rotated); @@ -156,12 +156,18 @@ static ByteBuffer encode(String topicName, ByteBuffer in) { return out; } + private static byte getRot(String topicName, int index) { + char c = topicName.charAt(index % topicName.length()); + int i = topicName.hashCode() + c; + return (byte) (i % Byte.MAX_VALUE); + } + static ByteBuffer decode(String topicName, ByteBuffer in) { var out = ByteBuffer.allocate(in.limit()); out.limit(in.limit()); - byte rot = (byte) -(topicName.hashCode() % Byte.MAX_VALUE); for (int index = 0; index < in.limit(); index++) { byte b = in.get(index); + byte rot = (byte) -getRot(topicName, index); byte rotated = (byte) (b + rot); out.put(index, rotated); }