From 5927afbdfc5ba7ec9d062b82e4e6dbead58eed4e Mon Sep 17 00:00:00 2001 From: Katharina Schmid Date: Fri, 26 Jul 2024 16:47:15 +0200 Subject: [PATCH] Customize reporting for partial retries --- CHANGES.md | 3 +++ .../dbmdz/flusswerk/framework/engine/Worker.java | 9 +++++++-- .../reporting/DefaultProcessReport.java | 16 ++++++++++++++++ .../framework/reporting/ProcessReport.java | 11 +++++++++++ .../flusswerk/framework/engine/WorkerTest.java | 7 +++++-- 5 files changed, 42 insertions(+), 4 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 3a4f2ede..5c99ec3f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed * Flusswerk Logger can handle function calls with the format String as sole argument. +### Changed +* Reporting of partial retries can now be customized by overriding `ProcessReport.reportComplexRetry(Message message, RetryProcessingException e)` and `ProcessReport.reportComplexFailedAfterMaxRetries(Message message, RetryProcessingException e)`. + ## [7.0.0] - 2024-03-15 ### Fixed diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Worker.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Worker.java index 80dcb4f7..49b03a88 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Worker.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/engine/Worker.java @@ -135,18 +135,23 @@ private void retryOrFail(Message receivedMessage, RuntimeException e) { private void complexRetry(Message receivedMessage, RetryProcessingException e) { messageBroker.ack(receivedMessage); + boolean isRejected = false; for (Message retryMessage : e.getMessagesToRetry()) { Envelope envelope = retryMessage.getEnvelope(); envelope.setRetries(receivedMessage.getEnvelope().getRetries()); envelope.setSource(receivedMessage.getEnvelope().getSource()); tracing.ensureFor(retryMessage); - messageBroker.reject(retryMessage); + isRejected = messageBroker.reject(retryMessage); } // Send the messages that should be sent anyway tracing.ensureFor(e.getMessagesToSend()); messageBroker.send(e.getMessagesToSend()); - processReport.reportRetry(receivedMessage, e); + if (isRejected) { + processReport.reportComplexRetry(receivedMessage, e); + } else { + processReport.reportComplexFailedAfterMaxRetries(receivedMessage, e); + } } private void fail(Message message, StopProcessingException e) { diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/reporting/DefaultProcessReport.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/reporting/DefaultProcessReport.java index 584541ba..c41b10ef 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/reporting/DefaultProcessReport.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/reporting/DefaultProcessReport.java @@ -97,6 +97,22 @@ public void reportComplexRetry(Message message, RetryProcessingException e) { e); } + @Override + public void reportComplexFailedAfterMaxRetries(Message message, RetryProcessingException e) { + int messagesSent = e.getMessagesToSend().size(); + Envelope envelope = message.getEnvelope(); + getLogger() + .warn( + "{} failed after maximum number of retries with ({} sent) and : {}", + name, + messagesSent, + e.getMessage(), + keyValue("will_retry", false), + keyValue("incoming_queue", envelope.getSource()), + keyValue("retries", envelope.getRetries()), + e); + } + @Override public void reportSkip(Message message, Exception skip) { getLogger().info("Skipped: {}", skip.getMessage()); diff --git a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/reporting/ProcessReport.java b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/reporting/ProcessReport.java index b762da71..b07c9877 100644 --- a/framework/src/main/java/com/github/dbmdz/flusswerk/framework/reporting/ProcessReport.java +++ b/framework/src/main/java/com/github/dbmdz/flusswerk/framework/reporting/ProcessReport.java @@ -60,6 +60,17 @@ default void reportComplexRetry(Message message, RetryProcessingException e) { reportReject(message, e); } + /** + * Report that a message has failed after the maximum number of retries and potentially also has + * messages sent anyway. + * + * @param message The message which finally failed after the maximum number of retries + * @param e The exception why the message failed + */ + default void reportComplexFailedAfterMaxRetries(Message message, RetryProcessingException e) { + reportReject(message, e); + } + default void reportSkip(Message message, Exception skip) { reportSuccess(message); } diff --git a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/engine/WorkerTest.java b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/engine/WorkerTest.java index 1cbe5ead..08d6b74c 100644 --- a/framework/src/test/java/com/github/dbmdz/flusswerk/framework/engine/WorkerTest.java +++ b/framework/src/test/java/com/github/dbmdz/flusswerk/framework/engine/WorkerTest.java @@ -223,12 +223,15 @@ void shouldPerformComplexRetrySendingMessages() { void shouldPerformComplexRetryWithNewMessages() { Message incomingMessage = new TestMessage("incoming"); List messagesToRetry = List.of(new TestMessage("retry1"), new TestMessage("retry2")); - when(flow.process(incomingMessage)) - .thenThrow(new RetryProcessingException("Retry processing").retry(messagesToRetry)); + RetryProcessingException retryException = + new RetryProcessingException("Retry processing").retry(messagesToRetry); + when(flow.process(incomingMessage)).thenThrow(retryException); + when(messageBroker.reject(any())).thenReturn(true); worker.process(incomingMessage); verify(messageBroker).ack(incomingMessage); for (Message message : messagesToRetry) { verify(messageBroker).reject(message); } + verify(processReport).reportComplexRetry(incomingMessage, retryException); } }