diff --git a/common/configurable/src/main/java/io/helidon/common/configurable/ThreadPool.java b/common/configurable/src/main/java/io/helidon/common/configurable/ThreadPool.java index bc017ad9b4a..ad42a00d72f 100644 --- a/common/configurable/src/main/java/io/helidon/common/configurable/ThreadPool.java +++ b/common/configurable/src/main/java/io/helidon/common/configurable/ThreadPool.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, 2021 Oracle and/or its affiliates. + * Copyright (c) 2019, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.Semaphore; @@ -376,7 +377,22 @@ protected void beforeExecute(Thread t, Runnable r) { @Override protected void afterExecute(Runnable r, Throwable t) { - completedTasks.incrementAndGet(); + boolean failed = (t != null); + if (!failed && r instanceof Future) { // extract exception + Future f = (Future) r; + if (f.isDone()) { + try { + f.get(); + } catch (Exception e) { + failed = true; + } + } + } + if (failed) { + failedTasks.incrementAndGet(); + } else { + completedTasks.incrementAndGet(); + } totalActiveThreads.add(activeThreads.getAndDecrement()); } diff --git a/common/configurable/src/test/java/io/helidon/common/configurable/ThreadPoolTest.java b/common/configurable/src/test/java/io/helidon/common/configurable/ThreadPoolTest.java index 5f918c4b18c..8d7a35b87d0 100644 --- a/common/configurable/src/test/java/io/helidon/common/configurable/ThreadPoolTest.java +++ b/common/configurable/src/test/java/io/helidon/common/configurable/ThreadPoolTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, 2020 Oracle and/or its affiliates. + * Copyright (c) 2019, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -500,6 +500,40 @@ protected void throwException(ThreadPoolExecutor executor) { } } + @Test + void testFailedAndCompletedCounters() throws InterruptedException { + pool = newPool( 4, 4, 10, 20); + + CountDownLatch running1 = new CountDownLatch(1); + CountDownLatch running2 = new CountDownLatch(1); + Task task1 = new Task(running1); + Task task2 = new Task(running2); + pool.submit(task1); + pool.submit(task2); + running1.await(); + running2.await(); + task1.finish(); + task2.finish(); + Task task3 = new Task() { + @Override + public void run() { + throw new RuntimeException("Oops"); + } + }; + pool.submit(task3); + Task task4 = new Task() { + @Override + public void run() { + throw new RuntimeException("Oops"); + } + }; + pool.submit(task4); + pool.shutdown(); + assertThat(pool.getCompletedTasks(), is(2)); + assertThat(pool.getFailedTasks(), is(2)); + assertThat(pool.getTotalTasks(), is(pool.getCompletedTasks() + pool.getFailedTasks())); + } + private CountDownLatch addTasks(int count) { final CountDownLatch awaitRunning = new CountDownLatch(count); IntStream.range(0, count).forEach(n -> {