Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add exponential backoff retry strategy #428

Merged
merged 4 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,21 @@ public class BaseCacheRetryTestClass {
protected static final Duration DEFAULT_TTL_SECONDS = Duration.ofSeconds(60);
protected static final Duration FIVE_SECONDS = Duration.ofSeconds(5);
protected static CacheClient cacheClient;
protected static CredentialProvider credentialProvider;
protected static TestRetryMetricsCollector testRetryMetricsCollector;
protected static TestRetryMetricsMiddlewareArgs testRetryMetricsMiddlewareArgs;
protected static TestRetryMetricsMiddleware testRetryMetricsMiddleware;
protected static Logger logger;

protected final String hostname =
Optional.ofNullable(System.getenv("MOMENTO_HOSTNAME")).orElse("127.0.0.1");
protected final int port =
Integer.parseInt(Optional.ofNullable(System.getenv("MOMENTO_PORT")).orElse("8080"));

@BeforeEach
void beforeEach() {
testRetryMetricsCollector = new TestRetryMetricsCollector();
logger = getLogger(BaseCacheRetryTestClass.class);
credentialProvider = CredentialProvider.fromEnvVar("MOMENTO_API_KEY");
final CredentialProvider credentialProvider = new MomentoLocalProvider(hostname, port);
testRetryMetricsMiddlewareArgs =
new TestRetryMetricsMiddlewareArgs.Builder(
logger, testRetryMetricsCollector, UUID.randomUUID().toString())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package momento.sdk.retry;

import static momento.sdk.retry.BaseCacheRetryTestClass.FIVE_SECONDS;
import static momento.sdk.retry.BaseCacheRetryTestClass.withCacheAndCacheClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.slf4j.LoggerFactory.getLogger;

import java.time.Duration;
import java.util.Collections;
import java.util.UUID;
import momento.sdk.exceptions.MomentoErrorCode;
import momento.sdk.exceptions.SdkException;
import momento.sdk.responses.cache.GetResponse;
import momento.sdk.responses.cache.IncrementResponse;
import momento.sdk.retry.utils.TestRetryMetricsCollector;
import momento.sdk.retry.utils.TestRetryMetricsMiddlewareArgs;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;

public class ExponentialBackoffRetryStrategyIntegTest {

private static TestRetryMetricsCollector testRetryMetricsCollector;
private static Logger logger;

private static final Duration CLIENT_TIMEOUT_MILLIS = Duration.ofMillis(2000L);

@BeforeAll
static void setup() {
testRetryMetricsCollector = new TestRetryMetricsCollector();
logger = getLogger(ExponentialBackoffRetryStrategyIntegTest.class);
}

@Test
void testRetryEligibleApi_shouldHitDeadline_WhenFullNetworkOutage() throws Exception {
final ExponentialBackoffRetryStrategy retryStrategy =
new ExponentialBackoffRetryStrategy(100, 500);

final TestRetryMetricsMiddlewareArgs testRetryMetricsMiddlewareArgs =
new TestRetryMetricsMiddlewareArgs.Builder(
logger, testRetryMetricsCollector, UUID.randomUUID().toString())
.returnError(MomentoErrorCode.SERVER_UNAVAILABLE.name())
.errorRpcList(Collections.singletonList(MomentoRpcMethod.GET.getRequestName()))
.build();

withCacheAndCacheClient(
config -> config.withRetryStrategy(retryStrategy).withTimeout(CLIENT_TIMEOUT_MILLIS),
testRetryMetricsMiddlewareArgs,
(cacheClient, cacheName) ->
assertThat(cacheClient.get(cacheName, "key"))
.succeedsWithin(FIVE_SECONDS)
.asInstanceOf(InstanceOfAssertFactories.type(GetResponse.Error.class))
.extracting(SdkException::getErrorCode)
.isEqualTo(MomentoErrorCode.TIMEOUT_ERROR));
}

@Test
void testNonRetryEligibleApi_shouldMakeNoAttempts_WhenFullNetworkOutage() throws Exception {
final ExponentialBackoffRetryStrategy retryStrategy =
new ExponentialBackoffRetryStrategy(100, 500);

final TestRetryMetricsMiddlewareArgs testRetryMetricsMiddlewareArgs =
new TestRetryMetricsMiddlewareArgs.Builder(
logger, testRetryMetricsCollector, UUID.randomUUID().toString())
.returnError(MomentoErrorCode.SERVER_UNAVAILABLE.name())
.errorRpcList(Collections.singletonList(MomentoRpcMethod.INCREMENT.getRequestName()))
.build();

withCacheAndCacheClient(
config -> config.withRetryStrategy(retryStrategy).withTimeout(CLIENT_TIMEOUT_MILLIS),
testRetryMetricsMiddlewareArgs,
(cacheClient, cacheName) -> {
assertThat(cacheClient.increment(cacheName, "key", 1))
.succeedsWithin(FIVE_SECONDS)
.asInstanceOf(InstanceOfAssertFactories.type(IncrementResponse.Error.class))
.extracting(SdkException::getErrorCode)
.isEqualTo(MomentoErrorCode.SERVER_UNAVAILABLE);

assertThat(
testRetryMetricsCollector.getTotalRetryCount(
cacheName, MomentoRpcMethod.INCREMENT))
.isEqualTo(0);
});
}

@Test
void testRetryEligibleApi_shouldSucceed_WhenTemporaryNetworkOutage() throws Exception {
final ExponentialBackoffRetryStrategy retryStrategy =
new ExponentialBackoffRetryStrategy(100, 500);

final TestRetryMetricsMiddlewareArgs testRetryMetricsMiddlewareArgs =
new TestRetryMetricsMiddlewareArgs.Builder(
logger, testRetryMetricsCollector, UUID.randomUUID().toString())
.returnError(MomentoErrorCode.SERVER_UNAVAILABLE.name())
.errorRpcList(Collections.singletonList(MomentoRpcMethod.GET.getRequestName()))
.errorCount(2)
.build();

withCacheAndCacheClient(
config -> config.withRetryStrategy(retryStrategy).withTimeout(CLIENT_TIMEOUT_MILLIS),
testRetryMetricsMiddlewareArgs,
(cacheClient, cacheName) -> {
assertThat(cacheClient.get(cacheName, "key"))
.succeedsWithin(FIVE_SECONDS)
.isInstanceOf(GetResponse.Miss.class);

assertThat(testRetryMetricsCollector.getTotalRetryCount(cacheName, MomentoRpcMethod.GET))
.isGreaterThan(0);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void testRetryEligibleApi_shouldMakeMaxAttempts_WhenFullNetworkOutage() throws E

@Test
void testNonRetryEligibleApi_shouldMakeNoAttempts_WhenFullNetworkOutage() throws Exception {
RetryEligibilityStrategy eligibilityStrategy = (status, methodName) -> true;
RetryEligibilityStrategy eligibilityStrategy = (status, methodName) -> false;

FixedDelayRetryStrategy retryStrategy =
new FixedDelayRetryStrategy(maxAttempts, delayMillis, maxDelayMillis, eligibilityStrategy);
Expand All @@ -91,12 +91,12 @@ void testNonRetryEligibleApi_shouldMakeNoAttempts_WhenFullNetworkOutage() throws
assertThat(
testRetryMetricsCollector.getTotalRetryCount(
cacheName, MomentoRpcMethod.INCREMENT))
.isGreaterThanOrEqualTo(0);
.isEqualTo(0);
});
}

@Test
void testNonRetryEligibleApi_shouldMakeLessThanMaxAttempts_WhenTemporaryNetworkOutage()
void testRetryEligibleApi_shouldMakeLessThanMaxAttempts_WhenTemporaryNetworkOutage()
throws Exception {
RetryEligibilityStrategy eligibilityStrategy = (status, methodName) -> true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
*
* @see RetryStrategy
* @see RetryEligibilityStrategy
* @param <ReqT> The type of the request message.
* @param <RespT> The type of the response message.
*/
final class RetryClientInterceptor implements ClientInterceptor {

Expand Down Expand Up @@ -115,6 +113,12 @@ public void onClose(Status status, Metadata trailers) {
return;
}

// If the deadline is expired, we don't want to retry
if (callOptions.getDeadline() != null && callOptions.getDeadline().isExpired()) {
super.onClose(Status.DEADLINE_EXCEEDED, trailers);
return;
}

// now we can safely start retrying

attemptNumber++;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package momento.sdk.retry;

import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.time.Duration;
import java.util.Optional;

/**
* A retry strategy that retries a failed gRPC call an unlimited number of times until the gRPC
* deadline, with a jittered, exponentially increasing delay between attempts. The delay of the
* first attempt is randomly selected within a range between the initial delay, and the initial
* delay * 3. The maximum possible delay is randomly selected within a range between the max backoff
* / 2, and the max backoff * 3.
*
* <p>The `FixedCountRetryStrategy` implements the {@link RetryStrategy} interface and provides a
* fixed number of retry attempts without any delay between retries. It allows clients to specify
* the maximum number of retry attempts.
*/
public class ExponentialBackoffRetryStrategy implements RetryStrategy {
private static final int GROWTH_FACTOR = 2;

private final long initialDelayMillis;
private final long maxBackoffMillis;
private final RetryEligibilityStrategy retryEligibilityStrategy;

/**
* Constructs an `ExponentialBackoffRetryStrategy` with an initial delay of 1 millisecond and a
* max backoff of 8 milliseconds. Including jitter, the first retry delay will be between 1 and 3
* ms, and the maximum retry delay will be between 8 and 12 ms.
*/
public ExponentialBackoffRetryStrategy() {
this(1, 8, new DefaultRetryEligibilityStrategy());
}

/**
* Constructs an `ExponentialBackoffRetryStrategy` with the given initial delay and max backoff
* times.
*
* @param initialDelayMillis The lower bound for the first retry delay. The initial delay range is
* between this value, and this value * 3.
* @param maxBackoffMillis The upper bound for the retry delay growth. The largest delay range is
* between this value / 2, and this value * 3.
*/
public ExponentialBackoffRetryStrategy(int initialDelayMillis, int maxBackoffMillis) {
this(initialDelayMillis, maxBackoffMillis, new DefaultRetryEligibilityStrategy());
}

/**
* Constructs an `ExponentialBackoffRetryStrategy` with the given initial delay and max backoff
* times, and the given eligibility strategy.
*
* @param initialDelayMillis The lower bound for the first retry delay. The initial delay range is
* between this value, and this value * 3.
* @param maxBackoffMillis The upper bound for the retry delay growth. The largest delay range is
* between this value / 2, and this value * 3.
* @param retryEligibilityStrategy Determines if a call is eligible to be retried based on the
* method being called and the gRPC status code of the previous failure.
*/
public ExponentialBackoffRetryStrategy(
int initialDelayMillis,
int maxBackoffMillis,
RetryEligibilityStrategy retryEligibilityStrategy) {
this.initialDelayMillis = initialDelayMillis;
this.maxBackoffMillis = maxBackoffMillis;
this.retryEligibilityStrategy = retryEligibilityStrategy;
}

@Override
public Optional<Duration> determineWhenToRetry(
Status status,
@SuppressWarnings("rawtypes") MethodDescriptor methodDescriptor,
int currentAttempt) {
if (!retryEligibilityStrategy.isEligibileForRetry(
status, methodDescriptor.getFullMethodName())) {
return Optional.empty();
}

final long baseDelay = computeBaseDelay(currentAttempt);
final long previousBaseDelay = computePreviousBaseDelay(baseDelay);
final long maxDelay = previousBaseDelay * 3;
final long jitteredDelay = randomInRange(baseDelay, maxDelay);

return Optional.of(Duration.ofMillis(jitteredDelay));
}

private long computeBaseDelay(int attemptNumber) {
if (attemptNumber <= 0) {
return this.initialDelayMillis;
}

try {
final long multiplier = (long) Math.pow(GROWTH_FACTOR, attemptNumber);
final long baseDelay = Math.multiplyExact(this.initialDelayMillis, multiplier);
return Math.min(baseDelay, maxBackoffMillis);
} catch (ArithmeticException e) {
return maxBackoffMillis;
}
}

private long computePreviousBaseDelay(long currentBaseDelay) {
if (currentBaseDelay == initialDelayMillis) {
return initialDelayMillis;
}

return currentBaseDelay / GROWTH_FACTOR;
}

private long randomInRange(long min, long max) {
if (min >= max) {
return min;
}
return min + (long) (Math.random() * (max - min));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package momento.sdk.retry;

import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.lenient;

import io.grpc.MethodDescriptor;
import io.grpc.Status;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
public class ExponentialBackoffRetryStrategyTest {

@SuppressWarnings("rawtypes")
@Mock
private MethodDescriptor methodDescriptor;

@Mock private Status status;
@Mock private RetryEligibilityStrategy eligibilityStrategy;

@BeforeEach
public void setup() {
lenient().when(methodDescriptor.getFullMethodName()).thenReturn("methodName");
lenient()
.when(eligibilityStrategy.isEligibileForRetry(eq(status), anyString()))
.thenReturn(true);
}

@Test
void whenFirstAttempt_shouldReturnInitialDelayWithJitter() {
final ExponentialBackoffRetryStrategy strategy =
new ExponentialBackoffRetryStrategy(100, 1000, eligibilityStrategy);

for (int i = 0; i < 100; i++) {
assertThat(strategy.determineWhenToRetry(status, methodDescriptor, 0))
.hasValueSatisfying(delay -> assertThat(delay.toMillis()).isBetween(100L, 300L));
}
}

@Test
void whenSecondAttempt_shouldDoubleBaseDelayWithJitter() {
final ExponentialBackoffRetryStrategy strategy =
new ExponentialBackoffRetryStrategy(100, 1000, eligibilityStrategy);

for (int i = 0; i < 100; i++) {
assertThat(strategy.determineWhenToRetry(status, methodDescriptor, 1))
.hasValueSatisfying(delay -> assertThat(delay.toMillis()).isBetween(200L, 600L));
}
}

@Test
void whenMaxBackoffReached_shouldNotExceedLimit() {
final ExponentialBackoffRetryStrategy strategy =
new ExponentialBackoffRetryStrategy(100, 500, eligibilityStrategy);

assertThat(strategy.determineWhenToRetry(status, methodDescriptor, 100))
.hasValueSatisfying(delay -> assertThat(delay.toMillis()).isBetween(500L, 1500L));
}

@Test
void whenRetryNotEligible_shouldReturnEmpty() {
final ExponentialBackoffRetryStrategy strategy =
new ExponentialBackoffRetryStrategy(100, 1000, eligibilityStrategy);

lenient()
.when(eligibilityStrategy.isEligibileForRetry(eq(status), anyString()))
.thenReturn(false);

assertThat(strategy.determineWhenToRetry(status, methodDescriptor, 0)).isEmpty();
}

@Test
void whenArithmeticOverflow_shouldUseMaxBackoff() {
final ExponentialBackoffRetryStrategy strategy =
new ExponentialBackoffRetryStrategy(1000, 10000, eligibilityStrategy);

assertThat(strategy.determineWhenToRetry(status, methodDescriptor, Integer.MAX_VALUE))
.hasValueSatisfying(delay -> assertThat(delay.toMillis()).isBetween(10000L, 30000L));
}
}
Loading