diff --git a/common/configurable/src/main/java/io/helidon/common/configurable/ThreadPool.java b/common/configurable/src/main/java/io/helidon/common/configurable/ThreadPool.java index b180e8b74d3..8b5797e6e17 100644 --- a/common/configurable/src/main/java/io/helidon/common/configurable/ThreadPool.java +++ b/common/configurable/src/main/java/io/helidon/common/configurable/ThreadPool.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, 2022 Oracle and/or its affiliates. + * Copyright (c) 2019, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -693,6 +693,8 @@ static final class DynamicPoolWorkQueue extends WorkQueue { // We can't make pool final because it is a circular dependency, but we set it during the construction of // the pool itself and therefore don't have to worry about concurrent access. private ThreadPool pool; + private final AtomicInteger underOffer = new AtomicInteger(0); + /** * Constructor. @@ -721,36 +723,42 @@ public boolean offer(Runnable task) { if (currentSize >= maxPoolSize) { // Yes, so enqueue if we can - Event.add(Event.Type.MAX, pool, this); return super.offer(task); - } else if (pool.getActiveThreads() < currentSize) { - - // No, but we've got idle threads so enqueue if we can - - Event.add(Event.Type.IDLE, pool, this); - return super.offer(task); - } else { + int underOfferSize = underOffer.incrementAndGet(); + try { + // Include count of tasks active, offered and queued to ensure proper growth + int activeSize = pool.getActiveCount() + underOfferSize + size(); + if (activeSize <= currentSize) { + + // No, but we've got idle threads so enqueue if we can + Event.add(Event.Type.IDLE, pool, this); + return super.offer(task); + + } else { + + // Ok, we might want to add a thread so ask our policy + if (growthPolicy.test(pool)) { + + // Add a thread. Note that this can still result in a rejection due to a race + // condition in which the pool has not yet grown from a previous false return (and so + // our maxPoolSize check above is not accurate); in this case, the rejection handler + // will just add it to the queue. + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> "Adding a thread, pool size = " + pool.getPoolSize() + + " queue size = " + size()); + } + return false; - // Ok, we might want to add a thread so ask our policy - - if (growthPolicy.test(pool)) { - - // Add a thread. Note that this can still result in a rejection due to a race condition - // in which the pool has not yet grown from a previous false return (and so our maxPoolSize - // check above is not accurate); in this case, the rejection handler will just add it to - // the queue. - - if (LOGGER.isLoggable(Level.FINE)) { - LOGGER.fine("Adding a thread, pool size = " + pool.getPoolSize() + ", queue size = " + size()); + } else { + // Enqueue if we can + return super.offer(task); + } } - return false; - - } else { - // Enqueue if we can - return super.offer(task); + } finally { + underOffer.decrementAndGet(); } } } diff --git a/common/configurable/src/test/java/io/helidon/common/configurable/ThreadPoolTest.java b/common/configurable/src/test/java/io/helidon/common/configurable/ThreadPoolTest.java index f1402be5bd3..1c9d170beb0 100644 --- a/common/configurable/src/test/java/io/helidon/common/configurable/ThreadPoolTest.java +++ b/common/configurable/src/test/java/io/helidon/common/configurable/ThreadPoolTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, 2022 Oracle and/or its affiliates. + * Copyright (c) 2019, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; @@ -35,6 +36,7 @@ import org.awaitility.core.ConditionTimeoutException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.function.Executable; import static java.util.concurrent.TimeUnit.SECONDS; @@ -44,6 +46,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; @@ -567,6 +570,34 @@ public void run() { assertThat(pool.getTotalTasks(), is(pool.getCompletedTasks() + pool.getFailedTasks())); } + @Test + @Timeout(10) + public void testGrowWithHighThroughput() { + // Wait for 4 tasks to be executing concurrently + int concurrency = 4; + Phaser concurrencyPhaser = new Phaser(concurrency); + + // Wait for all tasks, +1 for the test thread + int numTasks = concurrency * 10; + Phaser globalPhaser = new Phaser(numTasks + 1); + + Runnable r = () -> { + globalPhaser.arrive(); + concurrencyPhaser.arriveAndAwaitAdvance(); + }; + ThreadPool threadPool = newPool(1, concurrency, 0, 100); + + // Launch all tasks, "concurrency" tasks should get a dedicated worker (by + // growing), the other ones should be queued. + // Once "concurrency" tasks arrive at concurrencyPhaser, they are unlocked, and + // queued ones can execute. Until all tasks are executed. + for (int i = 0; i < numTasks; i++) { + threadPool.submit(r); + } + globalPhaser.arriveAndAwaitAdvance(); + assertEquals(concurrency, threadPool.getLargestPoolSize()); + } + private CountDownLatch addTasks(int count) { final CountDownLatch awaitRunning = new CountDownLatch(count); IntStream.range(0, count).forEach(n -> {