From c94141c5bf4bc15b881bf8778be96321bbe754ea Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Thu, 18 Jan 2024 16:35:23 -0500 Subject: [PATCH] Manually count number of offered tasks instead of relying solely on the inaccurate value returned by pool.getActiveCount(). This ensures proper thread pool growth when several tasks are submitted at once (and before they start running). --- .../common/configurable/ThreadPool.java | 58 +++++++++++-------- .../common/configurable/ThreadPoolTest.java | 33 ++++++++++- 2 files changed, 65 insertions(+), 26 deletions(-) diff --git a/common/configurable/src/main/java/io/helidon/common/configurable/ThreadPool.java b/common/configurable/src/main/java/io/helidon/common/configurable/ThreadPool.java index b180e8b74d3..8b5797e6e17 100644 --- a/common/configurable/src/main/java/io/helidon/common/configurable/ThreadPool.java +++ b/common/configurable/src/main/java/io/helidon/common/configurable/ThreadPool.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, 2022 Oracle and/or its affiliates. + * Copyright (c) 2019, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -693,6 +693,8 @@ static final class DynamicPoolWorkQueue extends WorkQueue { // We can't make pool final because it is a circular dependency, but we set it during the construction of // the pool itself and therefore don't have to worry about concurrent access. private ThreadPool pool; + private final AtomicInteger underOffer = new AtomicInteger(0); + /** * Constructor. @@ -721,36 +723,42 @@ public boolean offer(Runnable task) { if (currentSize >= maxPoolSize) { // Yes, so enqueue if we can - Event.add(Event.Type.MAX, pool, this); return super.offer(task); - } else if (pool.getActiveThreads() < currentSize) { - - // No, but we've got idle threads so enqueue if we can - - Event.add(Event.Type.IDLE, pool, this); - return super.offer(task); - } else { + int underOfferSize = underOffer.incrementAndGet(); + try { + // Include count of tasks active, offered and queued to ensure proper growth + int activeSize = pool.getActiveCount() + underOfferSize + size(); + if (activeSize <= currentSize) { + + // No, but we've got idle threads so enqueue if we can + Event.add(Event.Type.IDLE, pool, this); + return super.offer(task); + + } else { + + // Ok, we might want to add a thread so ask our policy + if (growthPolicy.test(pool)) { + + // Add a thread. Note that this can still result in a rejection due to a race + // condition in which the pool has not yet grown from a previous false return (and so + // our maxPoolSize check above is not accurate); in this case, the rejection handler + // will just add it to the queue. + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> "Adding a thread, pool size = " + pool.getPoolSize() + + " queue size = " + size()); + } + return false; - // Ok, we might want to add a thread so ask our policy - - if (growthPolicy.test(pool)) { - - // Add a thread. Note that this can still result in a rejection due to a race condition - // in which the pool has not yet grown from a previous false return (and so our maxPoolSize - // check above is not accurate); in this case, the rejection handler will just add it to - // the queue. - - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Adding a thread, pool size = " + pool.getPoolSize() + ", queue size = " + size()); + } else { + // Enqueue if we can + return super.offer(task); + } } - return false; - - } else { - // Enqueue if we can - return super.offer(task); + } finally { + underOffer.decrementAndGet(); } } } diff --git a/common/configurable/src/test/java/io/helidon/common/configurable/ThreadPoolTest.java b/common/configurable/src/test/java/io/helidon/common/configurable/ThreadPoolTest.java index f1402be5bd3..1c9d170beb0 100644 --- a/common/configurable/src/test/java/io/helidon/common/configurable/ThreadPoolTest.java +++ b/common/configurable/src/test/java/io/helidon/common/configurable/ThreadPoolTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, 2022 Oracle and/or its affiliates. + * Copyright (c) 2019, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; @@ -35,6 +36,7 @@ import org.awaitility.core.ConditionTimeoutException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.function.Executable; import static java.util.concurrent.TimeUnit.SECONDS; @@ -44,6 +46,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; @@ -567,6 +570,34 @@ public void run() { assertThat(pool.getTotalTasks(), is(pool.getCompletedTasks() + pool.getFailedTasks())); } + @Test + @Timeout(10) + public void testGrowWithHighThroughput() { + // Wait for 4 tasks to be executing concurrently + int concurrency = 4; + Phaser concurrencyPhaser = new Phaser(concurrency); + + // Wait for all tasks, +1 for the test thread + int numTasks = concurrency * 10; + Phaser globalPhaser = new Phaser(numTasks + 1); + + Runnable r = () -> { + globalPhaser.arrive(); + concurrencyPhaser.arriveAndAwaitAdvance(); + }; + ThreadPool threadPool = newPool(1, concurrency, 0, 100); + + // Launch all tasks, "concurrency" tasks should get a dedicated worker (by + // growing), the other ones should be queued. + // Once "concurrency" tasks arrive at concurrencyPhaser, they are unlocked, and + // queued ones can execute. Until all tasks are executed. + for (int i = 0; i < numTasks; i++) { + threadPool.submit(r); + } + globalPhaser.arriveAndAwaitAdvance(); + assertEquals(concurrency, threadPool.getLargestPoolSize()); + } + private CountDownLatch addTasks(int count) { final CountDownLatch awaitRunning = new CountDownLatch(count); IntStream.range(0, count).forEach(n -> {