Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: let consumeRun throw to let message broker handle failures #91

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright (c) 2024, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package com.powsybl.ws.commons.computation;

/**
* @author Joris Mancini <joris.mancini_externe at rte-france.com>
*/
public class ComputationException extends RuntimeException {
public ComputationException(String message) {
super(message);
}

public ComputationException(String message, Throwable cause) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably better to add the constructor without cause right now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.powsybl.iidm.network.VariantManagerConstants;
import com.powsybl.network.store.client.NetworkStoreService;
import com.powsybl.network.store.client.PreloadingStrategy;
import com.powsybl.ws.commons.computation.ComputationException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -23,10 +24,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -143,15 +141,12 @@ public Consumer<Message<String>> consumeRun() {
sendResultMessage(resultContext, result);
LOGGER.info("{} complete (resultUuid='{}')", getComputationType(), resultContext.getResultUuid());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (CancellationException e) {
LOGGER.info("Computation was interrupted");
Copy link
Collaborator

@jonenst jonenst Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add context (resultUuid) to this log just like above it ?

} catch (Exception e) {
if (!(e instanceof CancellationException)) {
LOGGER.error(NotificationService.getFailedMessage(getComputationType()), e);
publishFail(resultContext, e.getMessage());
resultService.delete(resultContext.getResultUuid());
this.handleNonCancellationException(resultContext, e, rootReporter);
}
resultService.delete(resultContext.getResultUuid());
this.handleNonCancellationException(resultContext, e, rootReporter);
throw new ComputationException(String.format("%s: %s", NotificationService.getFailedMessage(getComputationType()), e.getMessage()), e.getCause());
} finally {
clean(resultContext);
}
Expand Down Expand Up @@ -192,11 +187,6 @@ protected void sendResultMessage(AbstractResultContext<C> resultContext, R ignor
resultContext.getRunContext().getUserId(), null);
}

protected void publishFail(AbstractResultContext<C> resultContext, String message) {
notificationService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(),
message, resultContext.getRunContext().getUserId(), getComputationType(), null);
}

/**
* Do some extra task before running the computation, e.g. print log or init extra data for the run context
* @param ignoredRunContext This context may be used for further computation in overriding classes
Expand All @@ -205,7 +195,7 @@ protected void preRun(C ignoredRunContext) {
LOGGER.info("Run {} computation...", getComputationType());
}

protected R run(C runContext, UUID resultUuid, AtomicReference<ReportNode> rootReporter) throws Exception {
protected R run(C runContext, UUID resultUuid, AtomicReference<ReportNode> rootReporter) {
String provider = runContext.getProvider();
ReportNode reportNode = ReportNode.NO_OP;

Expand All @@ -223,7 +213,7 @@ protected R run(C runContext, UUID resultUuid, AtomicReference<ReportNode> rootR

preRun(runContext);
CompletableFuture<R> future = runAsync(runContext, provider, resultUuid);
R result = future == null ? null : observer.observeRun("run", runContext, future::get);
R result = future == null ? null : observer.observeRun("run", runContext, future::join);
postRun(runContext, rootReporter, result);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import java.util.Map;
import java.util.UUID;

import static com.powsybl.ws.commons.computation.utils.MessageUtils.shortenMessage;

/**
* @author Etienne Homer <etienne.homer at rte-france.com
*/
Expand Down Expand Up @@ -94,21 +92,6 @@ public void publishStop(UUID resultUuid, String receiver, String computationLabe
publisher.send(publishPrefix + "Stopped-out-0", message);
}

@PostCompletion
public void publishFail(UUID resultUuid, String receiver, String causeMessage, String userId, String computationLabel, Map<String, Object> additionalHeaders) {
MessageBuilder<String> builder = MessageBuilder
.withPayload("")
.setHeader(HEADER_RESULT_UUID, resultUuid.toString())
.setHeader(HEADER_RECEIVER, receiver)
.setHeader(HEADER_MESSAGE, shortenMessage(
getFailedMessage(computationLabel) + " : " + causeMessage))
.setHeader(HEADER_USER_ID, userId)
.copyHeaders(additionalHeaders);
Message<String> message = builder.build();
FAILED_MESSAGE_LOGGER.debug(SENDING_MESSAGE, message);
publisher.send(publishPrefix + "Failed-out-0", message);
}

@PostCompletion
public void publishCancelFailed(UUID resultUuid, String receiver, String computationLabel, String userId) {
Message<String> message = MessageBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.powsybl.ws.commons.computation.service.AbstractComputationService;
import com.powsybl.ws.commons.computation.service.AbstractResultContext;
import com.powsybl.ws.commons.computation.service.AbstractWorkerService;
import com.powsybl.ws.commons.computation.service.CancelContext;
import com.powsybl.ws.commons.computation.service.ExecutionService;
import com.powsybl.ws.commons.computation.service.NotificationService;
import com.powsybl.ws.commons.computation.service.ReportService;
Expand All @@ -29,6 +28,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
Expand All @@ -43,7 +43,7 @@
import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_RECEIVER;
import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_RESULT_UUID;
import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_USER_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
Expand Down Expand Up @@ -189,6 +189,10 @@ protected CompletableFuture<Object> getCompletableFuture(MockComputationRunConte
}
return completableFuture;
}

public void addFuture(UUID id, CompletableFuture<Object> future) {
this.futures.put(id, future);
}
}

private MockComputationWorkerService workerService;
Expand All @@ -203,10 +207,11 @@ protected CompletableFuture<Object> getCompletableFuture(MockComputationRunConte
final String provider = "MockComputation_Provider";
Message<String> message;
MockComputationRunContext runContext;
MockComputationResultService resultService;

@BeforeEach
void init() {
MockComputationResultService resultService = new MockComputationResultService();
resultService = new MockComputationResultService();
notificationService = new NotificationService(publisher);
workerService = new MockComputationWorkerService(
networkStoreService,
Expand Down Expand Up @@ -257,10 +262,7 @@ void testComputationFailed() {
runContext.setComputationResWanted(ComputationResultWanted.FAIL);

// execution / cleaning
workerService.consumeRun().accept(message);

// test the course
verify(notificationService.getPublisher(), times(1)).send(eq("publishFailed-out-0"), isA(Message.class));
assertThrows(ComputationException.class, () -> workerService.consumeRun().accept(message));
}

@Test
Expand All @@ -269,42 +271,52 @@ void testComputationCancelled() {
computationService.setStatus(List.of(RESULT_UUID), baseStatus);
assertEquals(baseStatus, computationService.getStatus(RESULT_UUID));

// Test message is sent on stop call
computationService.stop(RESULT_UUID, receiver);
verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class));

// test the course
// Test data is cleaned and message is sent in stopped
CompletableFuture<Object> futureThatCouldNotBeCancelled = Mockito.mock(CompletableFuture.class);
when(futureThatCouldNotBeCancelled.cancel(true)).thenReturn(true);
workerService.addFuture(RESULT_UUID, futureThatCouldNotBeCancelled);
workerService.consumeCancel().accept(message);
assertNull(resultService.findStatus(RESULT_UUID));
verify(notificationService.getPublisher(), times(1)).send(eq("publishStopped-out-0"), isA(Message.class));
}

@Test
void testComputationNotCancelledIfNoMatchingFuture() {
MockComputationStatus baseStatus = MockComputationStatus.NOT_DONE;
computationService.setStatus(List.of(RESULT_UUID), baseStatus);
assertEquals(baseStatus, computationService.getStatus(RESULT_UUID));

// Test message is sent on stop call
computationService.stop(RESULT_UUID, receiver);
verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class));

Message<String> cancelMessage = MessageBuilder.withPayload("")
.setHeader(HEADER_RESULT_UUID, RESULT_UUID.toString())
.setHeader(HEADER_RECEIVER, receiver)
.build();
CancelContext cancelContext = CancelContext.fromMessage(cancelMessage);
assertEquals(RESULT_UUID, cancelContext.resultUuid());
assertEquals(receiver, cancelContext.receiver());
// Test data is not cleaned and message is sent in cancelfailed
workerService.addFuture(UUID.randomUUID(), Mockito.mock(CompletableFuture.class));
workerService.consumeCancel().accept(message);
assertNotNull(resultService.findStatus(RESULT_UUID));
verify(notificationService.getPublisher(), times(1)).send(eq("publishCancelFailed-out-0"), isA(Message.class));
}

@Test
void testComputationCancelFailed() {
MockComputationStatus baseStatus = MockComputationStatus.COMPLETED;
MockComputationStatus baseStatus = MockComputationStatus.NOT_DONE;
computationService.setStatus(List.of(RESULT_UUID), baseStatus);
assertEquals(baseStatus, computationService.getStatus(RESULT_UUID));

computationService.stop(RESULT_UUID, receiver, userId);

// test the course
// Test message is sent on stop call
computationService.stop(RESULT_UUID, receiver);
verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class));

Message<String> cancelMessage = MessageBuilder.withPayload("")
.setHeader(HEADER_RESULT_UUID, RESULT_UUID.toString())
.setHeader(HEADER_RECEIVER, receiver)
.setHeader(HEADER_USER_ID, userId)
.build();
CancelContext cancelContext = CancelContext.fromMessage(cancelMessage);
assertEquals(RESULT_UUID, cancelContext.resultUuid());
assertEquals(receiver, cancelContext.receiver());
assertEquals(userId, cancelContext.userId());

// Test data is not cleaned and message is sent in cancelfailed
CompletableFuture<Object> futureThatCouldNotBeCancelled = Mockito.mock(CompletableFuture.class);
when(futureThatCouldNotBeCancelled.cancel(true)).thenReturn(false);
workerService.addFuture(RESULT_UUID, futureThatCouldNotBeCancelled);
workerService.consumeCancel().accept(message);
assertNotNull(resultService.findStatus(RESULT_UUID));
verify(notificationService.getPublisher(), times(1)).send(eq("publishCancelFailed-out-0"), isA(Message.class));
}
}
Loading