Skip to content

Commit

Permalink
Merge pull request #565 from dbmdz/complex_retry_failed
Browse files Browse the repository at this point in the history
Customize reporting for partial retries
  • Loading branch information
schmika authored Jul 26, 2024
2 parents 12422c0 + 5927afb commit a1df784
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
* Flusswerk Logger can handle function calls with the format String as sole argument.

### Changed
* Reporting of partial retries can now be customized by overriding `ProcessReport.reportComplexRetry(Message message, RetryProcessingException e)` and `ProcessReport.reportComplexFailedAfterMaxRetries(Message message, RetryProcessingException e)`.

## [7.0.0] - 2024-03-15

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,23 @@ private void retryOrFail(Message receivedMessage, RuntimeException e) {

private void complexRetry(Message receivedMessage, RetryProcessingException e) {
messageBroker.ack(receivedMessage);
boolean isRejected = false;
for (Message retryMessage : e.getMessagesToRetry()) {
Envelope envelope = retryMessage.getEnvelope();
envelope.setRetries(receivedMessage.getEnvelope().getRetries());
envelope.setSource(receivedMessage.getEnvelope().getSource());
tracing.ensureFor(retryMessage);
messageBroker.reject(retryMessage);
isRejected = messageBroker.reject(retryMessage);
}
// Send the messages that should be sent anyway
tracing.ensureFor(e.getMessagesToSend());
messageBroker.send(e.getMessagesToSend());

processReport.reportRetry(receivedMessage, e);
if (isRejected) {
processReport.reportComplexRetry(receivedMessage, e);
} else {
processReport.reportComplexFailedAfterMaxRetries(receivedMessage, e);
}
}

private void fail(Message message, StopProcessingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,22 @@ public void reportComplexRetry(Message message, RetryProcessingException e) {
e);
}

@Override
public void reportComplexFailedAfterMaxRetries(Message message, RetryProcessingException e) {
int messagesSent = e.getMessagesToSend().size();
Envelope envelope = message.getEnvelope();
getLogger()
.warn(
"{} failed after maximum number of retries with ({} sent) and : {}",
name,
messagesSent,
e.getMessage(),
keyValue("will_retry", false),
keyValue("incoming_queue", envelope.getSource()),
keyValue("retries", envelope.getRetries()),
e);
}

@Override
public void reportSkip(Message message, Exception skip) {
getLogger().info("Skipped: {}", skip.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ default void reportComplexRetry(Message message, RetryProcessingException e) {
reportReject(message, e);
}

/**
* Report that a message has failed after the maximum number of retries and potentially also has
* messages sent anyway.
*
* @param message The message which finally failed after the maximum number of retries
* @param e The exception why the message failed
*/
default void reportComplexFailedAfterMaxRetries(Message message, RetryProcessingException e) {
reportReject(message, e);
}

default void reportSkip(Message message, Exception skip) {
reportSuccess(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,15 @@ void shouldPerformComplexRetrySendingMessages() {
void shouldPerformComplexRetryWithNewMessages() {
Message incomingMessage = new TestMessage("incoming");
List<Message> messagesToRetry = List.of(new TestMessage("retry1"), new TestMessage("retry2"));
when(flow.process(incomingMessage))
.thenThrow(new RetryProcessingException("Retry processing").retry(messagesToRetry));
RetryProcessingException retryException =
new RetryProcessingException("Retry processing").retry(messagesToRetry);
when(flow.process(incomingMessage)).thenThrow(retryException);
when(messageBroker.reject(any())).thenReturn(true);
worker.process(incomingMessage);
verify(messageBroker).ack(incomingMessage);
for (Message message : messagesToRetry) {
verify(messageBroker).reject(message);
}
verify(processReport).reportComplexRetry(incomingMessage, retryException);
}
}

0 comments on commit a1df784

Please sign in to comment.