Skip to content

Commit

Permalink
Merge pull request kroxylicious#1020 from robobario/resilient-kms-fix
Browse files Browse the repository at this point in the history
Bugfix: KMS retry logic failing with Null Pointers
  • Loading branch information
robobario authored Feb 21, 2024
2 parents ca82d17 + 36a64eb commit 53144df
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Please enumerate **all user-facing** changes using format `<githib issue/pr numb

## 0.5.0

* [#1020](https://github.com/kroxylicious/kroxylicious/pull/1020): KMS retry logic failing with Null Pointers
* [#1019](https://github.com/kroxylicious/kroxylicious/pull/1019): Stop logging license header as part of the startup banner.
* [#1004](https://github.com/kroxylicious/kroxylicious/pull/1004): Publish images to Quay kroxylicious/kroxylicious rather than kroxylicious-developer
* [#997](https://github.com/kroxylicious/kroxylicious/issues/997): Add hardcoded maximum frame size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package io.kroxylicious.filter.encryption;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;

Expand Down Expand Up @@ -38,6 +39,11 @@
@Plugin(configType = EnvelopeEncryption.Config.class)
public class EnvelopeEncryption<K, E> implements FilterFactory<EnvelopeEncryption.Config, SharedEncryptionContext<K, E>> {

static final ScheduledExecutorService RETRY_POOL = Executors.newSingleThreadScheduledExecutor(r -> {
Thread retryThread = new Thread(r, "kmsRetry");
retryThread.setDaemon(true);
return retryThread;
});
private static KmsMetrics kmsMetrics = MicrometerKmsMetrics.create(Metrics.globalRegistry);

record Config(
Expand Down Expand Up @@ -105,7 +111,7 @@ private static <K, E> Kms<K, E> buildKms(FilterFactoryContext context, Config co
kms = InstrumentedKms.wrap(kms, kmsMetrics);
ExponentialJitterBackoffStrategy backoffStrategy = new ExponentialJitterBackoffStrategy(Duration.ofMillis(500), Duration.ofSeconds(5), 2d,
ThreadLocalRandom.current());
kms = ResilientKms.wrap(kms, context.eventLoop(), backoffStrategy, 3);
kms = ResilientKms.wrap(kms, RETRY_POOL, backoffStrategy, 3);
return wrapWithCachingKms(configuration, kms);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;

import static java.util.Objects.requireNonNull;
import static java.util.Optional.ofNullable;

public class ResilientKms<K, E> implements Kms<K, E> {
Expand All @@ -39,19 +40,19 @@ public class ResilientKms<K, E> implements Kms<K, E> {
private final BackoffStrategy strategy;
private final int retries;

private ResilientKms(Kms<K, E> inner,
ScheduledExecutorService executorService,
BackoffStrategy backoffStrategy,
private ResilientKms(@NonNull Kms<K, E> inner,
@NonNull ScheduledExecutorService executorService,
@NonNull BackoffStrategy backoffStrategy,
int retries) {
this.inner = inner;
this.executorService = executorService;
strategy = backoffStrategy;
this.inner = requireNonNull(inner);
this.executorService = requireNonNull(executorService);
strategy = requireNonNull(backoffStrategy);
this.retries = retries;
}

public static <K, E> Kms<K, E> wrap(Kms<K, E> delegate,
ScheduledExecutorService executorService,
BackoffStrategy strategy,
public static <K, E> Kms<K, E> wrap(@NonNull Kms<K, E> delegate,
@NonNull ScheduledExecutorService executorService,
@NonNull BackoffStrategy strategy,
int retries) {
return new ResilientKms<>(delegate, executorService,
strategy, retries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package io.kroxylicious.filter.encryption;

import java.time.Duration;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -57,4 +58,13 @@ void testKmsCacheConfigDefaults() {
assertThat(config.resolvedAliasExpireAfterWriteDuration()).isEqualTo(Duration.ofMinutes(10));
assertThat(config.resolvedAliasRefreshAfterWriteDuration()).isEqualTo(Duration.ofMinutes(8));
}

@Test
void testRetryPool() {
Future<Thread> thread = EnvelopeEncryption.RETRY_POOL.submit(Thread::currentThread);
assertThat(thread).succeedsWithin(Duration.ofSeconds(5)).satisfies(thread1 -> {
assertThat(thread1.getName()).isEqualTo("kmsRetry");
assertThat(thread1.isDaemon()).isTrue();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -360,6 +361,34 @@ void testGetEdekSerde() {
assertThat(serde).isSameAs(mockSerde);
}

@Test
void testExecutorNotNullable() {
Kms<Long, Long> kms = Mockito.mock(Kms.class);
Serde mockSerde = mock(Serde.class);
when(kms.edekSerde()).thenReturn(mockSerde);
BackoffStrategy backoffStrategy = mock(BackoffStrategy.class);
assertThatThrownBy(() -> ResilientKms.wrap(kms, null, backoffStrategy, 3))
.isInstanceOf(NullPointerException.class);
}

@Test
void testInnerKmsNotNullable() {
BackoffStrategy backoffStrategy = mock(BackoffStrategy.class);
ScheduledExecutorService executor = getMockExecutor();
assertThatThrownBy(() -> ResilientKms.wrap(null, executor, backoffStrategy, 3))
.isInstanceOf(NullPointerException.class);
}

@Test
void testBackoffStrategyNotNullable() {
Kms<Long, Long> kms = Mockito.mock(Kms.class);
Serde mockSerde = mock(Serde.class);
when(kms.edekSerde()).thenReturn(mockSerde);
ScheduledExecutorService executor = getMockExecutor();
assertThatThrownBy(() -> ResilientKms.wrap(kms, executor, null, 3))
.isInstanceOf(NullPointerException.class);
}

@NonNull
private static ScheduledExecutorService getMockExecutor() {
ScheduledExecutorService mockExecutor = Mockito.mock(ScheduledExecutorService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,21 +147,27 @@ void encryptionDistinguishedByName() {

static ByteBuffer encode(String topicName, ByteBuffer in) {
var out = ByteBuffer.allocate(in.limit());
byte rot = (byte) (topicName.hashCode() % Byte.MAX_VALUE);
for (int index = 0; index < in.limit(); index++) {
byte rot = getRot(topicName, index);
byte b = in.get(index);
byte rotated = (byte) (b + rot);
out.put(index, rotated);
}
return out;
}

private static byte getRot(String topicName, int index) {
char c = topicName.charAt(index % topicName.length());
int i = topicName.hashCode() + c;
return (byte) (i % Byte.MAX_VALUE);
}

static ByteBuffer decode(String topicName, ByteBuffer in) {
var out = ByteBuffer.allocate(in.limit());
out.limit(in.limit());
byte rot = (byte) -(topicName.hashCode() % Byte.MAX_VALUE);
for (int index = 0; index < in.limit(); index++) {
byte b = in.get(index);
byte rot = (byte) -getRot(topicName, index);
byte rotated = (byte) (b + rot);
out.put(index, rotated);
}
Expand Down

0 comments on commit 53144df

Please sign in to comment.