Skip to content

Commit

Permalink
Properly count completed and failed thread pool tasks (#4244)
Browse files Browse the repository at this point in the history
* Properly count completed and failed thread pool tasks. New test.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Also check for throwable being not null.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>
  • Loading branch information
spericas authored May 20, 2022
1 parent 09be6c3 commit 1f6c10e
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, 2021 Oracle and/or its affiliates.
* Copyright (c) 2019, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -376,7 +377,22 @@ protected void beforeExecute(Thread t, Runnable r) {

@Override
protected void afterExecute(Runnable r, Throwable t) {
completedTasks.incrementAndGet();
boolean failed = (t != null);
if (!failed && r instanceof Future<?>) { // extract exception
Future<?> f = (Future<?>) r;
if (f.isDone()) {
try {
f.get();
} catch (Exception e) {
failed = true;
}
}
}
if (failed) {
failedTasks.incrementAndGet();
} else {
completedTasks.incrementAndGet();
}
totalActiveThreads.add(activeThreads.getAndDecrement());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, 2020 Oracle and/or its affiliates.
* Copyright (c) 2019, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -500,6 +500,40 @@ protected void throwException(ThreadPoolExecutor executor) {
}
}

@Test
void testFailedAndCompletedCounters() throws InterruptedException {
pool = newPool( 4, 4, 10, 20);

CountDownLatch running1 = new CountDownLatch(1);
CountDownLatch running2 = new CountDownLatch(1);
Task task1 = new Task(running1);
Task task2 = new Task(running2);
pool.submit(task1);
pool.submit(task2);
running1.await();
running2.await();
task1.finish();
task2.finish();
Task task3 = new Task() {
@Override
public void run() {
throw new RuntimeException("Oops");
}
};
pool.submit(task3);
Task task4 = new Task() {
@Override
public void run() {
throw new RuntimeException("Oops");
}
};
pool.submit(task4);
pool.shutdown();
assertThat(pool.getCompletedTasks(), is(2));
assertThat(pool.getFailedTasks(), is(2));
assertThat(pool.getTotalTasks(), is(pool.getCompletedTasks() + pool.getFailedTasks()));
}

private CountDownLatch addTasks(int count) {
final CountDownLatch awaitRunning = new CountDownLatch(count);
IntStream.range(0, count).forEach(n -> {
Expand Down

0 comments on commit 1f6c10e

Please sign in to comment.