diff --git a/pom.xml b/pom.xml index 9ce8fbe8..e82a2335 100644 --- a/pom.xml +++ b/pom.xml @@ -110,6 +110,12 @@ spring-cloud-aws-messaging 2.4.4 + + com.github.rholder + guava-retrying + 2.0.0 + + org.awaitility diff --git a/src/main/java/com/mercateo/sqs/utils/visibility/RetryStrategy.java b/src/main/java/com/mercateo/sqs/utils/visibility/RetryStrategy.java new file mode 100644 index 00000000..671460a9 --- /dev/null +++ b/src/main/java/com/mercateo/sqs/utils/visibility/RetryStrategy.java @@ -0,0 +1,17 @@ +package com.mercateo.sqs.utils.visibility; + +import com.github.rholder.retry.StopStrategy; +import com.github.rholder.retry.WaitStrategy; + +import lombok.NonNull; +import lombok.Value; + +@Value +public class RetryStrategy { + + @NonNull + WaitStrategy retryWaitStrategy; + + @NonNull + StopStrategy retryStopStrategy; +} diff --git a/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java b/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java index dc36036f..066fbdcf 100644 --- a/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java +++ b/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtender.java @@ -18,8 +18,12 @@ import com.amazonaws.AmazonServiceException; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest; +import com.amazonaws.services.sqs.model.ChangeMessageVisibilityResult; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy; +import java.net.UnknownHostException; import java.time.Duration; import lombok.NonNull; @@ -35,15 +39,24 @@ public class VisibilityTimeoutExtender implements Runnable { private final ChangeMessageVisibilityRequest request; private final Message message; - + private final ErrorHandlingStrategy errorHandlingStrategy; + private final Retryer retryer; VisibilityTimeoutExtender(@NonNull AmazonSQS sqsClient, @NonNull Duration newVisibilityTimeout, - @NonNull Message message, @NonNull String queueUrl, @NonNull ErrorHandlingStrategy errorHandlingStrategy) { + @NonNull Message message, @NonNull String queueUrl, + @NonNull ErrorHandlingStrategy errorHandlingStrategy, + @NonNull RetryStrategy retryStrategy) { this.sqsClient = sqsClient; this.message = message; - this.errorHandlingStrategy=errorHandlingStrategy; + this.errorHandlingStrategy = errorHandlingStrategy; + this.retryer = RetryerBuilder + . newBuilder() + .retryIfException(t -> (t.getCause() instanceof UnknownHostException)) + .withWaitStrategy(retryStrategy.getRetryWaitStrategy()) + .withStopStrategy(retryStrategy.getRetryStopStrategy()) + .build(); request = new ChangeMessageVisibilityRequest().withQueueUrl(queueUrl).withReceiptHandle( message.getHeaders().get("ReceiptHandle", String.class)).withVisibilityTimeout( @@ -58,13 +71,13 @@ private Integer timeoutInSeconds(Duration timeout) { public void run() { try { log.trace("changing message visibility: " + request); - sqsClient.changeMessageVisibility(request); + retryer.call(() -> sqsClient.changeMessageVisibility(request)); } catch (AmazonServiceException e) { errorHandlingStrategy.handleExtendVisibilityTimeoutException(e, message); } catch (Exception e) { log.error("error while extending message visibility for " + message.getHeaders().get("MessageId", String.class), e); - throw e; + throw new RuntimeException(e); } } } \ No newline at end of file diff --git a/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactory.java b/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactory.java index d917ed2c..20423228 100644 --- a/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactory.java +++ b/src/main/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderFactory.java @@ -16,10 +16,13 @@ package com.mercateo.sqs.utils.visibility; import com.amazonaws.services.sqs.AmazonSQS; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy; import com.mercateo.sqs.utils.queue.Queue; import java.time.Duration; +import java.util.concurrent.TimeUnit; import javax.inject.Inject; import javax.inject.Named; @@ -44,6 +47,8 @@ public VisibilityTimeoutExtender get(@NonNull Message message, @NonNull Queue Duration defaultVisibilityTimeout = queue.getDefaultVisibilityTimeout(); return new VisibilityTimeoutExtender(sqsClient, defaultVisibilityTimeout, message, queue - .getUrl(), errorHandlingStrategy); + .getUrl(), errorHandlingStrategy, + new RetryStrategy(WaitStrategies.fixedWait(1000, TimeUnit.SECONDS), + StopStrategies.stopAfterAttempt(5))); } } \ No newline at end of file diff --git a/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java b/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java index a6bd4827..6afd6b0a 100644 --- a/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java +++ b/src/test/java/com/mercateo/sqs/utils/visibility/VisibilityTimeoutExtenderTest.java @@ -1,15 +1,25 @@ package com.mercateo.sqs.utils.visibility; -import static org.junit.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import com.amazonaws.SdkClientException; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; import com.google.common.testing.NullPointerTester; import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy; +import java.net.UnknownHostException; import java.time.Duration; import java.util.HashMap; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -21,11 +31,11 @@ class VisibilityTimeoutExtenderTest { + private VisibilityTimeoutExtender uut; + @Mock private AmazonSQS sqsClient; - private VisibilityTimeoutExtender uut; - @Mock private ErrorHandlingStrategy errorHandlingStrategy; @@ -36,7 +46,10 @@ public void setUp() throws Exception { headerMap.put("ReceiptHandle", "rhd"); GenericMessage message = new GenericMessage<>(new Object(), new MessageHeaders( headerMap)); - uut = new VisibilityTimeoutExtender(sqsClient, Duration.ofMinutes(10), message, "queue", errorHandlingStrategy); + RetryStrategy retryStrategy = new RetryStrategy(WaitStrategies.fixedWait(1, TimeUnit.MICROSECONDS), + StopStrategies.stopAfterAttempt(5)); + uut = new VisibilityTimeoutExtender(sqsClient, Duration.ofMinutes(10), message, "queue", + errorHandlingStrategy, retryStrategy); } @Test @@ -62,9 +75,42 @@ void testRun() { verify(sqsClient).changeMessageVisibility(captor.capture()); ChangeMessageVisibilityRequest request = captor.getValue(); - assertEquals("rhd", request.getReceiptHandle()); - assertEquals("queue", request.getQueueUrl()); - assertEquals(600, request.getVisibilityTimeout().intValue()); + assertThat(request.getReceiptHandle()).isEqualTo("rhd"); + assertThat(request.getQueueUrl()).isEqualTo("queue"); + assertThat(request.getVisibilityTimeout().intValue()).isEqualTo(600); + + } + + @Test + void retryForUnknownHostException() { + + SdkClientException sdkClientException = new SdkClientException("foo", new UnknownHostException()); + + // given + when(sqsClient.changeMessageVisibility(any())) + .thenThrow(sdkClientException); + // when + Throwable result = catchThrowable(() -> uut.run()); + + // then + assertThat(result).isInstanceOf(RuntimeException.class); + assertThat(result.getCause()).isInstanceOf(RetryException.class); + verify(sqsClient, times(5)).changeMessageVisibility(any()); + } + + @Test + void dontRetryForSdkClientExceptionsInGeneral() { + + SdkClientException sdkClientException = new SdkClientException("foo"); + + // given + when(sqsClient.changeMessageVisibility(any())).thenThrow(sdkClientException); + // when + Throwable result = catchThrowable(() -> uut.run()); + + // then + assertThat(result).isInstanceOf(RuntimeException.class); + verify(sqsClient, times(1)).changeMessageVisibility(any()); } } \ No newline at end of file