Skip to content
This repository has been archived by the owner on Sep 6, 2024. It is now read-only.

Commit

Permalink
retry change message visibility request on SdkClientException
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephen Tobayiwa authored and Sebastian Martinka committed Aug 16, 2023
1 parent f551476 commit 05823f9
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 13 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
<artifactId>spring-cloud-aws-messaging</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.awaitility</groupId>
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/com/mercateo/sqs/utils/visibility/RetryStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.mercateo.sqs.utils.visibility;

import com.github.rholder.retry.StopStrategy;
import com.github.rholder.retry.WaitStrategy;

import lombok.NonNull;
import lombok.Value;

@Value
public class RetryStrategy {

@NonNull
WaitStrategy retryWaitStrategy;

@NonNull
StopStrategy retryStopStrategy;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityResult;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy;

import java.net.UnknownHostException;
import java.time.Duration;

import lombok.NonNull;
Expand All @@ -35,15 +39,24 @@ public class VisibilityTimeoutExtender implements Runnable {
private final ChangeMessageVisibilityRequest request;

private final Message<?> message;

private final ErrorHandlingStrategy<?> errorHandlingStrategy;

private final Retryer<ChangeMessageVisibilityResult> retryer;

VisibilityTimeoutExtender(@NonNull AmazonSQS sqsClient, @NonNull Duration newVisibilityTimeout,
@NonNull Message<?> message, @NonNull String queueUrl, @NonNull ErrorHandlingStrategy<?> errorHandlingStrategy) {
@NonNull Message<?> message, @NonNull String queueUrl,
@NonNull ErrorHandlingStrategy<?> errorHandlingStrategy,
@NonNull RetryStrategy retryStrategy) {
this.sqsClient = sqsClient;
this.message = message;
this.errorHandlingStrategy=errorHandlingStrategy;
this.errorHandlingStrategy = errorHandlingStrategy;
this.retryer = RetryerBuilder
.<ChangeMessageVisibilityResult> newBuilder()
.retryIfException(t -> (t.getCause() instanceof UnknownHostException))
.withWaitStrategy(retryStrategy.getRetryWaitStrategy())
.withStopStrategy(retryStrategy.getRetryStopStrategy())
.build();

request = new ChangeMessageVisibilityRequest().withQueueUrl(queueUrl).withReceiptHandle(
message.getHeaders().get("ReceiptHandle", String.class)).withVisibilityTimeout(
Expand All @@ -58,13 +71,13 @@ private Integer timeoutInSeconds(Duration timeout) {
public void run() {
try {
log.trace("changing message visibility: " + request);
sqsClient.changeMessageVisibility(request);
retryer.call(() -> sqsClient.changeMessageVisibility(request));
} catch (AmazonServiceException e) {
errorHandlingStrategy.handleExtendVisibilityTimeoutException(e, message);
} catch (Exception e) {
log.error("error while extending message visibility for " + message.getHeaders().get("MessageId",
String.class), e);
throw e;
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
package com.mercateo.sqs.utils.visibility;

import com.amazonaws.services.sqs.AmazonSQS;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy;
import com.mercateo.sqs.utils.queue.Queue;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import javax.inject.Inject;
import javax.inject.Named;
Expand All @@ -44,6 +47,8 @@ public VisibilityTimeoutExtender get(@NonNull Message<?> message, @NonNull Queue
Duration defaultVisibilityTimeout = queue.getDefaultVisibilityTimeout();

return new VisibilityTimeoutExtender(sqsClient, defaultVisibilityTimeout, message, queue
.getUrl(), errorHandlingStrategy);
.getUrl(), errorHandlingStrategy,
new RetryStrategy(WaitStrategies.fixedWait(1000, TimeUnit.SECONDS),
StopStrategies.stopAfterAttempt(5)));
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
package com.mercateo.sqs.utils.visibility;

import static org.junit.Assert.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.amazonaws.SdkClientException;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.testing.NullPointerTester;
import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy;

import java.net.UnknownHostException;
import java.time.Duration;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -21,11 +31,11 @@

class VisibilityTimeoutExtenderTest {

private VisibilityTimeoutExtender uut;

@Mock
private AmazonSQS sqsClient;

private VisibilityTimeoutExtender uut;

@Mock
private ErrorHandlingStrategy<?> errorHandlingStrategy;

Expand All @@ -36,7 +46,10 @@ public void setUp() throws Exception {
headerMap.put("ReceiptHandle", "rhd");
GenericMessage<Object> message = new GenericMessage<>(new Object(), new MessageHeaders(
headerMap));
uut = new VisibilityTimeoutExtender(sqsClient, Duration.ofMinutes(10), message, "queue", errorHandlingStrategy);
RetryStrategy retryStrategy = new RetryStrategy(WaitStrategies.fixedWait(1, TimeUnit.MICROSECONDS),
StopStrategies.stopAfterAttempt(5));
uut = new VisibilityTimeoutExtender(sqsClient, Duration.ofMinutes(10), message, "queue",
errorHandlingStrategy, retryStrategy);
}

@Test
Expand All @@ -62,9 +75,42 @@ void testRun() {
verify(sqsClient).changeMessageVisibility(captor.capture());
ChangeMessageVisibilityRequest request = captor.getValue();

assertEquals("rhd", request.getReceiptHandle());
assertEquals("queue", request.getQueueUrl());
assertEquals(600, request.getVisibilityTimeout().intValue());
assertThat(request.getReceiptHandle()).isEqualTo("rhd");
assertThat(request.getQueueUrl()).isEqualTo("queue");
assertThat(request.getVisibilityTimeout().intValue()).isEqualTo(600);

}

@Test
void retryForUnknownHostException() {

SdkClientException sdkClientException = new SdkClientException("foo", new UnknownHostException());

// given
when(sqsClient.changeMessageVisibility(any()))
.thenThrow(sdkClientException);
// when
Throwable result = catchThrowable(() -> uut.run());

// then
assertThat(result).isInstanceOf(RuntimeException.class);
assertThat(result.getCause()).isInstanceOf(RetryException.class);
verify(sqsClient, times(5)).changeMessageVisibility(any());
}

@Test
void dontRetryForSdkClientExceptionsInGeneral() {

SdkClientException sdkClientException = new SdkClientException("foo");

// given
when(sqsClient.changeMessageVisibility(any())).thenThrow(sdkClientException);
// when
Throwable result = catchThrowable(() -> uut.run());

// then
assertThat(result).isInstanceOf(RuntimeException.class);
verify(sqsClient, times(1)).changeMessageVisibility(any());
}

}

0 comments on commit 05823f9

Please sign in to comment.