Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: let consumeRun throw to let message broker handle failures #91

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright (c) 2024, RTE (http://www.rte-france.com)
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package com.powsybl.ws.commons.computation;

/**
* @author Joris Mancini <joris.mancini_externe at rte-france.com>
*/
public class ComputationException extends RuntimeException {
public ComputationException(String message) {
super(message);
}

public ComputationException(String message, Throwable cause) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably better to add the constructor without cause right now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.powsybl.iidm.network.VariantManagerConstants;
import com.powsybl.network.store.client.NetworkStoreService;
import com.powsybl.network.store.client.PreloadingStrategy;
import com.powsybl.ws.commons.computation.ComputationException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -23,10 +24,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -143,15 +141,12 @@ public Consumer<Message<String>> consumeRun() {
sendResultMessage(resultContext, result);
LOGGER.info("{} complete (resultUuid='{}')", getComputationType(), resultContext.getResultUuid());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (CancellationException e) {
LOGGER.info("Computation was interrupted");
Copy link
Collaborator

@jonenst jonenst Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add context (resultUuid) to this log just like above it ?

} catch (Exception e) {
if (!(e instanceof CancellationException)) {
LOGGER.error(NotificationService.getFailedMessage(getComputationType()), e);
publishFail(resultContext, e.getMessage());
resultService.delete(resultContext.getResultUuid());
this.handleNonCancellationException(resultContext, e, rootReporter);
}
resultService.delete(resultContext.getResultUuid());
this.handleNonCancellationException(resultContext, e, rootReporter);
throw new ComputationException(String.format("%s: %s", NotificationService.getFailedMessage(getComputationType()), e.getMessage()), e.getCause());
} finally {
clean(resultContext);
}
Expand Down Expand Up @@ -192,11 +187,6 @@ protected void sendResultMessage(AbstractResultContext<C> resultContext, R ignor
resultContext.getRunContext().getUserId(), null);
}

protected void publishFail(AbstractResultContext<C> resultContext, String message) {
notificationService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(),
message, resultContext.getRunContext().getUserId(), getComputationType(), null);
}

/**
* Do some extra task before running the computation, e.g. print log or init extra data for the run context
* @param ignoredRunContext This context may be used for further computation in overriding classes
Expand All @@ -205,7 +195,7 @@ protected void preRun(C ignoredRunContext) {
LOGGER.info("Run {} computation...", getComputationType());
}

protected R run(C runContext, UUID resultUuid, AtomicReference<ReportNode> rootReporter) throws Exception {
protected R run(C runContext, UUID resultUuid, AtomicReference<ReportNode> rootReporter) {
String provider = runContext.getProvider();
ReportNode reportNode = ReportNode.NO_OP;

Expand All @@ -223,7 +213,7 @@ protected R run(C runContext, UUID resultUuid, AtomicReference<ReportNode> rootR

preRun(runContext);
CompletableFuture<R> future = runAsync(runContext, provider, resultUuid);
R result = future == null ? null : observer.observeRun("run", runContext, future::get);
R result = future == null ? null : observer.observeRun("run", runContext, future::join);
postRun(runContext, rootReporter, result);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import java.util.Map;
import java.util.UUID;

import static com.powsybl.ws.commons.computation.utils.MessageUtils.shortenMessage;

/**
* @author Etienne Homer <etienne.homer at rte-france.com
*/
Expand Down Expand Up @@ -94,21 +92,6 @@ public void publishStop(UUID resultUuid, String receiver, String computationLabe
publisher.send(publishPrefix + "Stopped-out-0", message);
}

@PostCompletion
public void publishFail(UUID resultUuid, String receiver, String causeMessage, String userId, String computationLabel, Map<String, Object> additionalHeaders) {
MessageBuilder<String> builder = MessageBuilder
.withPayload("")
.setHeader(HEADER_RESULT_UUID, resultUuid.toString())
.setHeader(HEADER_RECEIVER, receiver)
.setHeader(HEADER_MESSAGE, shortenMessage(
getFailedMessage(computationLabel) + " : " + causeMessage))
.setHeader(HEADER_USER_ID, userId)
.copyHeaders(additionalHeaders);
Message<String> message = builder.build();
FAILED_MESSAGE_LOGGER.debug(SENDING_MESSAGE, message);
publisher.send(publishPrefix + "Failed-out-0", message);
}

@PostCompletion
public void publishCancelFailed(UUID resultUuid, String receiver, String computationLabel, String userId) {
Message<String> message = MessageBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.powsybl.ws.commons.computation.service.AbstractComputationService;
import com.powsybl.ws.commons.computation.service.AbstractResultContext;
import com.powsybl.ws.commons.computation.service.AbstractWorkerService;
import com.powsybl.ws.commons.computation.service.CancelContext;
import com.powsybl.ws.commons.computation.service.ExecutionService;
import com.powsybl.ws.commons.computation.service.NotificationService;
import com.powsybl.ws.commons.computation.service.ReportService;
Expand All @@ -29,6 +28,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
Expand All @@ -43,7 +43,7 @@
import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_RECEIVER;
import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_RESULT_UUID;
import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_USER_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
Expand Down Expand Up @@ -189,6 +189,10 @@ protected CompletableFuture<Object> getCompletableFuture(MockComputationRunConte
}
return completableFuture;
}

public void addFuture(UUID id, CompletableFuture<Object> future) {
this.futures.put(id, future);
}
}

private MockComputationWorkerService workerService;
Expand All @@ -203,10 +207,11 @@ protected CompletableFuture<Object> getCompletableFuture(MockComputationRunConte
final String provider = "MockComputation_Provider";
Message<String> message;
MockComputationRunContext runContext;
MockComputationResultService resultService;

@BeforeEach
void init() {
MockComputationResultService resultService = new MockComputationResultService();
resultService = new MockComputationResultService();
notificationService = new NotificationService(publisher);
workerService = new MockComputationWorkerService(
networkStoreService,
Expand Down Expand Up @@ -257,54 +262,58 @@ void testComputationFailed() {
runContext.setComputationResWanted(ComputationResultWanted.FAIL);

// execution / cleaning
workerService.consumeRun().accept(message);
assertThrows(ComputationException.class, () -> workerService.consumeRun().accept(message));
}

// test the course
verify(notificationService.getPublisher(), times(1)).send(eq("publishFailed-out-0"), isA(Message.class));
@Test
void testStopComputationSendsCancelMessage() {
computationService.stop(RESULT_UUID, receiver);
verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class));
}

@Test
void testComputationCancelled() {
MockComputationStatus baseStatus = MockComputationStatus.NOT_DONE;
MockComputationStatus baseStatus = MockComputationStatus.RUNNING;
computationService.setStatus(List.of(RESULT_UUID), baseStatus);
assertEquals(baseStatus, computationService.getStatus(RESULT_UUID));

computationService.stop(RESULT_UUID, receiver);

// test the course
verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class));
CompletableFuture<Object> futureThatCouldBeCancelled = Mockito.mock(CompletableFuture.class);
when(futureThatCouldBeCancelled.cancel(true)).thenReturn(true);
workerService.addFuture(RESULT_UUID, futureThatCouldBeCancelled);

Message<String> cancelMessage = MessageBuilder.withPayload("")
.setHeader(HEADER_RESULT_UUID, RESULT_UUID.toString())
.setHeader(HEADER_RECEIVER, receiver)
.build();
CancelContext cancelContext = CancelContext.fromMessage(cancelMessage);
assertEquals(RESULT_UUID, cancelContext.resultUuid());
assertEquals(receiver, cancelContext.receiver());
workerService.consumeCancel().accept(message);
assertNull(resultService.findStatus(RESULT_UUID));
verify(notificationService.getPublisher(), times(1)).send(eq("publishStopped-out-0"), isA(Message.class));
}

@Test
void testComputationCancelFailed() {
MockComputationStatus baseStatus = MockComputationStatus.COMPLETED;
MockComputationStatus baseStatus = MockComputationStatus.RUNNING;
computationService.setStatus(List.of(RESULT_UUID), baseStatus);
assertEquals(baseStatus, computationService.getStatus(RESULT_UUID));

computationService.stop(RESULT_UUID, receiver, userId);
CompletableFuture<Object> futureThatCouldNotBeCancelled = Mockito.mock(CompletableFuture.class);
when(futureThatCouldNotBeCancelled.cancel(true)).thenReturn(false);
workerService.addFuture(RESULT_UUID, futureThatCouldNotBeCancelled);

// test the course
verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class));

Message<String> cancelMessage = MessageBuilder.withPayload("")
.setHeader(HEADER_RESULT_UUID, RESULT_UUID.toString())
.setHeader(HEADER_RECEIVER, receiver)
.setHeader(HEADER_USER_ID, userId)
.build();
CancelContext cancelContext = CancelContext.fromMessage(cancelMessage);
assertEquals(RESULT_UUID, cancelContext.resultUuid());
assertEquals(receiver, cancelContext.receiver());
assertEquals(userId, cancelContext.userId());
workerService.consumeCancel().accept(message);
assertNotNull(resultService.findStatus(RESULT_UUID));
verify(notificationService.getPublisher(), times(1)).send(eq("publishCancelFailed-out-0"), isA(Message.class));
}

@Test
void testComputationCancelFailsIfNoMatchingFuture() {
workerService.consumeCancel().accept(message);
assertNull(resultService.findStatus(RESULT_UUID));
verify(notificationService.getPublisher(), times(1)).send(eq("publishCancelFailed-out-0"), isA(Message.class));
}

@Test
void testComputationCancelledBeforeRunReturnsNoResult() {
workerService.consumeCancel().accept(message);

initComputationExecution();
workerService.consumeRun().accept(message);
verify(notificationService.getPublisher(), times(0)).send(eq("publishResult-out-0"), isA(Message.class));
}
}
Loading