Skip to content

Commit

Permalink
Manually count number of offered tasks instead of relying solely on t…
Browse files Browse the repository at this point in the history
…he inaccurate value returned by pool.getActiveCount(). This ensures proper thread pool growth when several tasks are submitted at once (and before they start running).
  • Loading branch information
spericas committed Jan 18, 2024
1 parent 75f8f1b commit 10c787b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, 2022 Oracle and/or its affiliates.
* Copyright (c) 2019, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -693,6 +693,8 @@ static final class DynamicPoolWorkQueue extends WorkQueue {
// We can't make pool final because it is a circular dependency, but we set it during the construction of
// the pool itself and therefore don't have to worry about concurrent access.
private ThreadPool pool;
private final AtomicInteger underOffer = new AtomicInteger(0);


/**
* Constructor.
Expand Down Expand Up @@ -721,36 +723,41 @@ public boolean offer(Runnable task) {
if (currentSize >= maxPoolSize) {

// Yes, so enqueue if we can

Event.add(Event.Type.MAX, pool, this);
return super.offer(task);

} else if (pool.getActiveThreads() < currentSize) {

// No, but we've got idle threads so enqueue if we can

Event.add(Event.Type.IDLE, pool, this);
return super.offer(task);

} else {
int underOfferValue = underOffer.incrementAndGet();
try {
int virtualSize = underOfferValue + size();
if (pool.getActiveCount() + virtualSize <= currentSize) {

// Ok, we might want to add a thread so ask our policy
// No, but we've got idle threads so enqueue if we can
Event.add(Event.Type.IDLE, pool, this);
return super.offer(task);

if (growthPolicy.test(pool)) {
} else {

// Add a thread. Note that this can still result in a rejection due to a race condition
// in which the pool has not yet grown from a previous false return (and so our maxPoolSize
// check above is not accurate); in this case, the rejection handler will just add it to
// the queue.
// Ok, we might want to add a thread so ask our policy
if (growthPolicy.test(pool)) {

if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Adding a thread, pool size = " + pool.getPoolSize() + ", queue size = " + size());
}
return false;
// Add a thread. Note that this can still result in a rejection due to a race
// condition in which the pool has not yet grown from a previous false return (and so
// our maxPoolSize check above is not accurate); in this case, the rejection handler
// will just add it to the queue.
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "Adding a thread, pool size = " + pool.getPoolSize()
+ " queue size = " + size());
}
return false;

} else {
// Enqueue if we can
return super.offer(task);
} else {
// Enqueue if we can
return super.offer(task);
}
}
} finally {
underOffer.decrementAndGet();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, 2022 Oracle and/or its affiliates.
* Copyright (c) 2019, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -35,6 +36,7 @@
import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.function.Executable;

import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -44,6 +46,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -567,6 +570,34 @@ public void run() {
assertThat(pool.getTotalTasks(), is(pool.getCompletedTasks() + pool.getFailedTasks()));
}

@Test
@Timeout(10)
public void testGrowWithHighThroughput() {
// Wait for 4 tasks to be executing concurrently
int concurrency = 4;
Phaser concurrencyPhaser = new Phaser(concurrency);

// Wait for all tasks, +1 for the test thread
int numTasks = concurrency * 10;
Phaser globalPhaser = new Phaser(numTasks + 1);

Runnable r = () -> {
globalPhaser.arrive();
concurrencyPhaser.arriveAndAwaitAdvance();
};
ThreadPool threadPool = newPool(1, concurrency, 0, 100);

// Launch all tasks, "concurrency" tasks should get a dedicated worker (by
// growing), the other ones should be queued.
// Once "concurrency" tasks arrive at concurrencyPhaser, they are unlocked, and
// queued ones can execute. Until all tasks are executed.
for (int i = 0; i < numTasks; i++) {
threadPool.submit(r);
}
globalPhaser.arriveAndAwaitAdvance();
assertEquals(concurrency, threadPool.getLargestPoolSize());
}

private CountDownLatch addTasks(int count) {
final CountDownLatch awaitRunning = new CountDownLatch(count);
IntStream.range(0, count).forEach(n -> {
Expand Down

0 comments on commit 10c787b

Please sign in to comment.