From a9c3e62573d391af0f312243517b4f4f1a72575a Mon Sep 17 00:00:00 2001 From: Nathaniel Bauernfeind Date: Mon, 29 Jan 2024 16:38:31 -0700 Subject: [PATCH] Try out limited fixed thread pool --- .../io/deephaven/engine/context/QueryCompiler.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/engine/context/src/main/java/io/deephaven/engine/context/QueryCompiler.java b/engine/context/src/main/java/io/deephaven/engine/context/QueryCompiler.java index 6d409c81612..b688b53c8b4 100644 --- a/engine/context/src/main/java/io/deephaven/engine/context/QueryCompiler.java +++ b/engine/context/src/main/java/io/deephaven/engine/context/QueryCompiler.java @@ -39,7 +39,6 @@ import java.util.jar.Attributes; import java.util.jar.Manifest; import java.util.stream.Collectors; -import java.util.stream.IntStream; import java.util.stream.Stream; public class QueryCompiler { @@ -47,6 +46,7 @@ public class QueryCompiler { public static volatile int PARALLELISM_FACTOR = ForkJoinPool.getCommonPoolParallelism(); public static volatile int REQUESTS_PER_TASK = 0; public static volatile boolean DISABLE_SHARED_COMPILER = false; + private static final ExecutorService COMPILER_EXECUTOR = Executors.newFixedThreadPool(4); private static final Logger log = LoggerFactory.getLogger(QueryCompiler.class); /** @@ -848,16 +848,22 @@ private void maybeCreateClass( 0, requests.length); } else { int numTasks = (requests.length + requestsPerTask - 1) / requestsPerTask; - ForkJoinTask[] tasks = new ForkJoinTask[numTasks]; + final Future[] tasks = new Future[numTasks]; for (int jobId = 0; jobId < numTasks; ++jobId) { final int startInclusive = jobId * requestsPerTask; final int endExclusive = Math.min(requests.length, (jobId + 1) * requestsPerTask); - tasks[jobId] = ForkJoinPool.commonPool().submit(() -> { + tasks[jobId] = COMPILER_EXECUTOR.submit(() -> { maybeCreateClassHelper(compiler, fileManager, requests, rootPathAsString, tempDirAsString, startInclusive, endExclusive); }); } - Arrays.stream(tasks).forEach(ForkJoinTask::join); + for (int jobId = 0; jobId < numTasks; ++jobId) { + try { + tasks[jobId].get(); + } catch (Exception err) { + throw new UncheckedDeephavenException("Exception waiting for compilation task", err); + } + } } log.error().append("Compiled in ").append(Double.toString((System.nanoTime() - startTm) / 1e9)).append("s.").endl(); } catch (final Throwable t) {