From dfa0f28f2fdb49b1230c9c0c1a8af036e011753f Mon Sep 17 00:00:00 2001 From: Joris Mancini Date: Wed, 11 Dec 2024 17:45:51 +0100 Subject: [PATCH 1/4] feat: let consume run throw to let message broker handle failures Signed-off-by: Joris Mancini --- .../computation/ComputationException.java | 16 ++++++++++++++++ .../service/AbstractWorkerService.java | 14 +++----------- .../service/NotificationService.java | 17 ----------------- .../ws/commons/computation/ComputationTest.java | 6 ++---- 4 files changed, 21 insertions(+), 32 deletions(-) create mode 100644 src/main/java/com/powsybl/ws/commons/computation/ComputationException.java diff --git a/src/main/java/com/powsybl/ws/commons/computation/ComputationException.java b/src/main/java/com/powsybl/ws/commons/computation/ComputationException.java new file mode 100644 index 0000000..60b183e --- /dev/null +++ b/src/main/java/com/powsybl/ws/commons/computation/ComputationException.java @@ -0,0 +1,16 @@ +/** + * Copyright (c) 2024, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package com.powsybl.ws.commons.computation; + +/** + * @author Joris Mancini + */ +public class ComputationException extends RuntimeException { + public ComputationException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java b/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java index 4787ad4..867cd3c 100644 --- a/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java +++ b/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java @@ -13,6 +13,7 @@ import com.powsybl.iidm.network.VariantManagerConstants; import com.powsybl.network.store.client.NetworkStoreService; import com.powsybl.network.store.client.PreloadingStrategy; +import com.powsybl.ws.commons.computation.ComputationException; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,10 +24,7 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -147,10 +145,9 @@ public Consumer> consumeRun() { Thread.currentThread().interrupt(); } catch (Exception e) { if (!(e instanceof CancellationException)) { - LOGGER.error(NotificationService.getFailedMessage(getComputationType()), e); - publishFail(resultContext, e.getMessage()); resultService.delete(resultContext.getResultUuid()); this.handleNonCancellationException(resultContext, e, rootReporter); + throw new ComputationException(NotificationService.getFailedMessage(getComputationType()), e); } } finally { clean(resultContext); @@ -192,11 +189,6 @@ protected void sendResultMessage(AbstractResultContext resultContext, R ignor resultContext.getRunContext().getUserId(), null); } - protected void publishFail(AbstractResultContext resultContext, String message) { - notificationService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(), - message, resultContext.getRunContext().getUserId(), getComputationType(), null); - } - /** * Do some extra task before running the computation, e.g. print log or init extra data for the run context * @param ignoredRunContext This context may be used for further computation in overriding classes diff --git a/src/main/java/com/powsybl/ws/commons/computation/service/NotificationService.java b/src/main/java/com/powsybl/ws/commons/computation/service/NotificationService.java index 59d9a01..478b253 100644 --- a/src/main/java/com/powsybl/ws/commons/computation/service/NotificationService.java +++ b/src/main/java/com/powsybl/ws/commons/computation/service/NotificationService.java @@ -21,8 +21,6 @@ import java.util.Map; import java.util.UUID; -import static com.powsybl.ws.commons.computation.utils.MessageUtils.shortenMessage; - /** * @author Etienne Homer additionalHeaders) { - MessageBuilder builder = MessageBuilder - .withPayload("") - .setHeader(HEADER_RESULT_UUID, resultUuid.toString()) - .setHeader(HEADER_RECEIVER, receiver) - .setHeader(HEADER_MESSAGE, shortenMessage( - getFailedMessage(computationLabel) + " : " + causeMessage)) - .setHeader(HEADER_USER_ID, userId) - .copyHeaders(additionalHeaders); - Message message = builder.build(); - FAILED_MESSAGE_LOGGER.debug(SENDING_MESSAGE, message); - publisher.send(publishPrefix + "Failed-out-0", message); - } - @PostCompletion public void publishCancelFailed(UUID resultUuid, String receiver, String computationLabel, String userId) { Message message = MessageBuilder diff --git a/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java b/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java index 8dfd238..89ac554 100644 --- a/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java +++ b/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java @@ -44,6 +44,7 @@ import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_RESULT_UUID; import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_USER_ID; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; @@ -257,10 +258,7 @@ void testComputationFailed() { runContext.setComputationResWanted(ComputationResultWanted.FAIL); // execution / cleaning - workerService.consumeRun().accept(message); - - // test the course - verify(notificationService.getPublisher(), times(1)).send(eq("publishFailed-out-0"), isA(Message.class)); + assertThrows(ComputationException.class, () -> workerService.consumeRun().accept(message)); } @Test From 1538ab0e25b3a4f6e78d7263c689a97cf5277a60 Mon Sep 17 00:00:00 2001 From: Joris Mancini Date: Fri, 13 Dec 2024 14:01:56 +0100 Subject: [PATCH 2/4] feat: improve handling of cancellation Signed-off-by: Joris Mancini --- .../service/AbstractWorkerService.java | 35 +++++----- .../commons/computation/ComputationTest.java | 66 +++++++++++-------- 2 files changed, 56 insertions(+), 45 deletions(-) diff --git a/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java b/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java index 867cd3c..77f32ea 100644 --- a/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java +++ b/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java @@ -92,24 +92,26 @@ protected void cleanResultsAndPublishCancel(UUID resultUuid, String receiver) { } } - private boolean cancelAsync(CancelContext cancelContext) { + private void cancelAsync(CancelContext cancelContext) { lockRunAndCancel.lock(); - boolean isCanceled = false; try { cancelComputationRequests.put(cancelContext.resultUuid(), cancelContext); // find the completableFuture associated with result uuid CompletableFuture future = futures.get(cancelContext.resultUuid()); - if (future != null) { - isCanceled = future.cancel(true); // cancel computation in progress - if (isCanceled) { + if (future == null) { + cleanResultsAndPublishCancel(cancelContext.resultUuid(), cancelContext.receiver()); + } else { + boolean isCanceled = future.cancel(true); // cancel computation in progress + if (future.isDone() || isCanceled) { cleanResultsAndPublishCancel(cancelContext.resultUuid(), cancelContext.receiver()); + } else { + notificationService.publishCancelFailed(cancelContext.resultUuid(), cancelContext.receiver(), getComputationType(), cancelContext.userId()); } } } finally { lockRunAndCancel.unlock(); } - return isCanceled; } protected abstract AbstractResultContext fromMessage(Message message); @@ -141,14 +143,12 @@ public Consumer> consumeRun() { sendResultMessage(resultContext, result); LOGGER.info("{} complete (resultUuid='{}')", getComputationType(), resultContext.getResultUuid()); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + } catch (CancellationException e) { + LOGGER.info("Computation was interrupted"); } catch (Exception e) { - if (!(e instanceof CancellationException)) { - resultService.delete(resultContext.getResultUuid()); - this.handleNonCancellationException(resultContext, e, rootReporter); - throw new ComputationException(NotificationService.getFailedMessage(getComputationType()), e); - } + resultService.delete(resultContext.getResultUuid()); + this.handleNonCancellationException(resultContext, e, rootReporter); + throw new ComputationException(String.format("%s: %s", NotificationService.getFailedMessage(getComputationType()), e.getMessage()), e.getCause()); } finally { clean(resultContext); } @@ -175,10 +175,7 @@ protected void handleNonCancellationException(AbstractResultContext resultCon public Consumer> consumeCancel() { return message -> { CancelContext cancelContext = CancelContext.fromMessage(message); - boolean isCancelled = cancelAsync(cancelContext); - if (!isCancelled) { - notificationService.publishCancelFailed(cancelContext.resultUuid(), cancelContext.receiver(), getComputationType(), cancelContext.userId()); - } + cancelAsync(cancelContext); }; } @@ -197,7 +194,7 @@ protected void preRun(C ignoredRunContext) { LOGGER.info("Run {} computation...", getComputationType()); } - protected R run(C runContext, UUID resultUuid, AtomicReference rootReporter) throws Exception { + protected R run(C runContext, UUID resultUuid, AtomicReference rootReporter) { String provider = runContext.getProvider(); ReportNode reportNode = ReportNode.NO_OP; @@ -215,7 +212,7 @@ protected R run(C runContext, UUID resultUuid, AtomicReference rootR preRun(runContext); CompletableFuture future = runAsync(runContext, provider, resultUuid); - R result = future == null ? null : observer.observeRun("run", runContext, future::get); + R result = future == null ? null : observer.observeRun("run", runContext, future::join); postRun(runContext, rootReporter, result); return result; } diff --git a/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java b/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java index 89ac554..b5654f3 100644 --- a/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java +++ b/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java @@ -12,7 +12,6 @@ import com.powsybl.ws.commons.computation.service.AbstractComputationService; import com.powsybl.ws.commons.computation.service.AbstractResultContext; import com.powsybl.ws.commons.computation.service.AbstractWorkerService; -import com.powsybl.ws.commons.computation.service.CancelContext; import com.powsybl.ws.commons.computation.service.ExecutionService; import com.powsybl.ws.commons.computation.service.NotificationService; import com.powsybl.ws.commons.computation.service.ReportService; @@ -29,6 +28,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.messaging.Message; @@ -43,8 +43,7 @@ import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_RECEIVER; import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_RESULT_UUID; import static com.powsybl.ws.commons.computation.service.NotificationService.HEADER_USER_ID; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; @@ -190,6 +189,10 @@ protected CompletableFuture getCompletableFuture(MockComputationRunConte } return completableFuture; } + + public void addFuture(UUID id, CompletableFuture future) { + this.futures.put(id, future); + } } private MockComputationWorkerService workerService; @@ -204,10 +207,11 @@ protected CompletableFuture getCompletableFuture(MockComputationRunConte final String provider = "MockComputation_Provider"; Message message; MockComputationRunContext runContext; + MockComputationResultService resultService; @BeforeEach void init() { - MockComputationResultService resultService = new MockComputationResultService(); + resultService = new MockComputationResultService(); notificationService = new NotificationService(publisher); workerService = new MockComputationWorkerService( networkStoreService, @@ -267,42 +271,52 @@ void testComputationCancelled() { computationService.setStatus(List.of(RESULT_UUID), baseStatus); assertEquals(baseStatus, computationService.getStatus(RESULT_UUID)); + // Test message is sent on stop call computationService.stop(RESULT_UUID, receiver); + verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class)); - // test the course + // Test data is cleaned and message is sent in stopped + CompletableFuture futureThatCouldNotBeCancelled = Mockito.mock(CompletableFuture.class); + when(futureThatCouldNotBeCancelled.cancel(true)).thenReturn(true); + workerService.addFuture(RESULT_UUID, futureThatCouldNotBeCancelled); + workerService.consumeCancel().accept(message); + assertNull(resultService.findStatus(RESULT_UUID)); + verify(notificationService.getPublisher(), times(1)).send(eq("publishStopped-out-0"), isA(Message.class)); + } + + @Test + void testComputationCancelledIfNoMatchingFuture() { + MockComputationStatus baseStatus = MockComputationStatus.NOT_DONE; + computationService.setStatus(List.of(RESULT_UUID), baseStatus); + assertEquals(baseStatus, computationService.getStatus(RESULT_UUID)); + + // Test message is sent on stop call + computationService.stop(RESULT_UUID, receiver); verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class)); - Message cancelMessage = MessageBuilder.withPayload("") - .setHeader(HEADER_RESULT_UUID, RESULT_UUID.toString()) - .setHeader(HEADER_RECEIVER, receiver) - .build(); - CancelContext cancelContext = CancelContext.fromMessage(cancelMessage); - assertEquals(RESULT_UUID, cancelContext.resultUuid()); - assertEquals(receiver, cancelContext.receiver()); + // Test data is cleaned and message is sent in stopped + workerService.addFuture(UUID.randomUUID(), Mockito.mock(CompletableFuture.class)); + workerService.consumeCancel().accept(message); + assertNull(resultService.findStatus(RESULT_UUID)); + verify(notificationService.getPublisher(), times(1)).send(eq("publishStopped-out-0"), isA(Message.class)); } @Test void testComputationCancelFailed() { - MockComputationStatus baseStatus = MockComputationStatus.COMPLETED; + MockComputationStatus baseStatus = MockComputationStatus.NOT_DONE; computationService.setStatus(List.of(RESULT_UUID), baseStatus); assertEquals(baseStatus, computationService.getStatus(RESULT_UUID)); - computationService.stop(RESULT_UUID, receiver, userId); - - // test the course + // Test message is sent on stop call + computationService.stop(RESULT_UUID, receiver); verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class)); - Message cancelMessage = MessageBuilder.withPayload("") - .setHeader(HEADER_RESULT_UUID, RESULT_UUID.toString()) - .setHeader(HEADER_RECEIVER, receiver) - .setHeader(HEADER_USER_ID, userId) - .build(); - CancelContext cancelContext = CancelContext.fromMessage(cancelMessage); - assertEquals(RESULT_UUID, cancelContext.resultUuid()); - assertEquals(receiver, cancelContext.receiver()); - assertEquals(userId, cancelContext.userId()); - + // Test data is not cleaned and message is sent in stopped + CompletableFuture futureThatCouldNotBeCancelled = Mockito.mock(CompletableFuture.class); + when(futureThatCouldNotBeCancelled.cancel(true)).thenReturn(false); + workerService.addFuture(RESULT_UUID, futureThatCouldNotBeCancelled); workerService.consumeCancel().accept(message); + assertNotNull(resultService.findStatus(RESULT_UUID)); verify(notificationService.getPublisher(), times(1)).send(eq("publishCancelFailed-out-0"), isA(Message.class)); } } From 61bf49ee9c625c2eae50d7b80c363c33db3da856 Mon Sep 17 00:00:00 2001 From: Joris Mancini Date: Mon, 16 Dec 2024 16:12:11 +0100 Subject: [PATCH 3/4] fix: corrections after review Signed-off-by: Joris Mancini --- .../computation/ComputationException.java | 4 ++++ .../service/AbstractWorkerService.java | 19 ++++++++++--------- .../commons/computation/ComputationTest.java | 10 +++++----- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/powsybl/ws/commons/computation/ComputationException.java b/src/main/java/com/powsybl/ws/commons/computation/ComputationException.java index 60b183e..ef7e3e1 100644 --- a/src/main/java/com/powsybl/ws/commons/computation/ComputationException.java +++ b/src/main/java/com/powsybl/ws/commons/computation/ComputationException.java @@ -10,6 +10,10 @@ * @author Joris Mancini */ public class ComputationException extends RuntimeException { + public ComputationException(String message) { + super(message); + } + public ComputationException(String message, Throwable cause) { super(message, cause); } diff --git a/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java b/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java index 77f32ea..d98bf96 100644 --- a/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java +++ b/src/main/java/com/powsybl/ws/commons/computation/service/AbstractWorkerService.java @@ -92,26 +92,24 @@ protected void cleanResultsAndPublishCancel(UUID resultUuid, String receiver) { } } - private void cancelAsync(CancelContext cancelContext) { + private boolean cancelAsync(CancelContext cancelContext) { lockRunAndCancel.lock(); + boolean isCanceled = false; try { cancelComputationRequests.put(cancelContext.resultUuid(), cancelContext); // find the completableFuture associated with result uuid CompletableFuture future = futures.get(cancelContext.resultUuid()); - if (future == null) { - cleanResultsAndPublishCancel(cancelContext.resultUuid(), cancelContext.receiver()); - } else { - boolean isCanceled = future.cancel(true); // cancel computation in progress - if (future.isDone() || isCanceled) { + if (future != null) { + isCanceled = future.cancel(true); // cancel computation in progress + if (isCanceled) { cleanResultsAndPublishCancel(cancelContext.resultUuid(), cancelContext.receiver()); - } else { - notificationService.publishCancelFailed(cancelContext.resultUuid(), cancelContext.receiver(), getComputationType(), cancelContext.userId()); } } } finally { lockRunAndCancel.unlock(); } + return isCanceled; } protected abstract AbstractResultContext fromMessage(Message message); @@ -175,7 +173,10 @@ protected void handleNonCancellationException(AbstractResultContext resultCon public Consumer> consumeCancel() { return message -> { CancelContext cancelContext = CancelContext.fromMessage(message); - cancelAsync(cancelContext); + boolean isCancelled = cancelAsync(cancelContext); + if (!isCancelled) { + notificationService.publishCancelFailed(cancelContext.resultUuid(), cancelContext.receiver(), getComputationType(), cancelContext.userId()); + } }; } diff --git a/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java b/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java index b5654f3..ce2abe2 100644 --- a/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java +++ b/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java @@ -285,7 +285,7 @@ void testComputationCancelled() { } @Test - void testComputationCancelledIfNoMatchingFuture() { + void testComputationNotCancelledIfNoMatchingFuture() { MockComputationStatus baseStatus = MockComputationStatus.NOT_DONE; computationService.setStatus(List.of(RESULT_UUID), baseStatus); assertEquals(baseStatus, computationService.getStatus(RESULT_UUID)); @@ -294,11 +294,11 @@ void testComputationCancelledIfNoMatchingFuture() { computationService.stop(RESULT_UUID, receiver); verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class)); - // Test data is cleaned and message is sent in stopped + // Test data is not cleaned and message is sent in cancelfailed workerService.addFuture(UUID.randomUUID(), Mockito.mock(CompletableFuture.class)); workerService.consumeCancel().accept(message); - assertNull(resultService.findStatus(RESULT_UUID)); - verify(notificationService.getPublisher(), times(1)).send(eq("publishStopped-out-0"), isA(Message.class)); + assertNotNull(resultService.findStatus(RESULT_UUID)); + verify(notificationService.getPublisher(), times(1)).send(eq("publishCancelFailed-out-0"), isA(Message.class)); } @Test @@ -311,7 +311,7 @@ void testComputationCancelFailed() { computationService.stop(RESULT_UUID, receiver); verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class)); - // Test data is not cleaned and message is sent in stopped + // Test data is not cleaned and message is sent in cancelfailed CompletableFuture futureThatCouldNotBeCancelled = Mockito.mock(CompletableFuture.class); when(futureThatCouldNotBeCancelled.cancel(true)).thenReturn(false); workerService.addFuture(RESULT_UUID, futureThatCouldNotBeCancelled); From 3a7c58341be7a62821242f5a8e53765f3457fb28 Mon Sep 17 00:00:00 2001 From: Joris Mancini Date: Tue, 17 Dec 2024 15:32:52 +0100 Subject: [PATCH 4/4] fix: improve cancellation tests Signed-off-by: Joris Mancini --- .../commons/computation/ComputationTest.java | 55 +++++++++---------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java b/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java index ce2abe2..95de814 100644 --- a/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java +++ b/src/test/java/com/powsybl/ws/commons/computation/ComputationTest.java @@ -266,57 +266,54 @@ void testComputationFailed() { } @Test - void testComputationCancelled() { - MockComputationStatus baseStatus = MockComputationStatus.NOT_DONE; - computationService.setStatus(List.of(RESULT_UUID), baseStatus); - assertEquals(baseStatus, computationService.getStatus(RESULT_UUID)); - - // Test message is sent on stop call + void testStopComputationSendsCancelMessage() { computationService.stop(RESULT_UUID, receiver); verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class)); - - // Test data is cleaned and message is sent in stopped - CompletableFuture futureThatCouldNotBeCancelled = Mockito.mock(CompletableFuture.class); - when(futureThatCouldNotBeCancelled.cancel(true)).thenReturn(true); - workerService.addFuture(RESULT_UUID, futureThatCouldNotBeCancelled); - workerService.consumeCancel().accept(message); - assertNull(resultService.findStatus(RESULT_UUID)); - verify(notificationService.getPublisher(), times(1)).send(eq("publishStopped-out-0"), isA(Message.class)); } @Test - void testComputationNotCancelledIfNoMatchingFuture() { - MockComputationStatus baseStatus = MockComputationStatus.NOT_DONE; + void testComputationCancelled() { + MockComputationStatus baseStatus = MockComputationStatus.RUNNING; computationService.setStatus(List.of(RESULT_UUID), baseStatus); assertEquals(baseStatus, computationService.getStatus(RESULT_UUID)); - // Test message is sent on stop call - computationService.stop(RESULT_UUID, receiver); - verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class)); + CompletableFuture futureThatCouldBeCancelled = Mockito.mock(CompletableFuture.class); + when(futureThatCouldBeCancelled.cancel(true)).thenReturn(true); + workerService.addFuture(RESULT_UUID, futureThatCouldBeCancelled); - // Test data is not cleaned and message is sent in cancelfailed - workerService.addFuture(UUID.randomUUID(), Mockito.mock(CompletableFuture.class)); workerService.consumeCancel().accept(message); - assertNotNull(resultService.findStatus(RESULT_UUID)); - verify(notificationService.getPublisher(), times(1)).send(eq("publishCancelFailed-out-0"), isA(Message.class)); + assertNull(resultService.findStatus(RESULT_UUID)); + verify(notificationService.getPublisher(), times(1)).send(eq("publishStopped-out-0"), isA(Message.class)); } @Test void testComputationCancelFailed() { - MockComputationStatus baseStatus = MockComputationStatus.NOT_DONE; + MockComputationStatus baseStatus = MockComputationStatus.RUNNING; computationService.setStatus(List.of(RESULT_UUID), baseStatus); assertEquals(baseStatus, computationService.getStatus(RESULT_UUID)); - // Test message is sent on stop call - computationService.stop(RESULT_UUID, receiver); - verify(notificationService.getPublisher(), times(1)).send(eq("publishCancel-out-0"), isA(Message.class)); - - // Test data is not cleaned and message is sent in cancelfailed CompletableFuture futureThatCouldNotBeCancelled = Mockito.mock(CompletableFuture.class); when(futureThatCouldNotBeCancelled.cancel(true)).thenReturn(false); workerService.addFuture(RESULT_UUID, futureThatCouldNotBeCancelled); + workerService.consumeCancel().accept(message); assertNotNull(resultService.findStatus(RESULT_UUID)); verify(notificationService.getPublisher(), times(1)).send(eq("publishCancelFailed-out-0"), isA(Message.class)); } + + @Test + void testComputationCancelFailsIfNoMatchingFuture() { + workerService.consumeCancel().accept(message); + assertNull(resultService.findStatus(RESULT_UUID)); + verify(notificationService.getPublisher(), times(1)).send(eq("publishCancelFailed-out-0"), isA(Message.class)); + } + + @Test + void testComputationCancelledBeforeRunReturnsNoResult() { + workerService.consumeCancel().accept(message); + + initComputationExecution(); + workerService.consumeRun().accept(message); + verify(notificationService.getPublisher(), times(0)).send(eq("publishResult-out-0"), isA(Message.class)); + } }