From a2d5ca4f996a42ac0f2f32e0736efb5396f91ff3 Mon Sep 17 00:00:00 2001 From: Santiago Pericas-Geertsen Date: Thu, 5 Dec 2024 10:21:09 -0500 Subject: [PATCH] Creates FT executor using our ThreadPoolSupplier to ensure context propagation. Adds new test. (#9555) --- .../faulttolerance/FaultTolerance.java | 12 ++++++---- .../io/helidon/faulttolerance/AsyncTest.java | 23 ++++++++++++++++++- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/fault-tolerance/fault-tolerance/src/main/java/io/helidon/faulttolerance/FaultTolerance.java b/fault-tolerance/fault-tolerance/src/main/java/io/helidon/faulttolerance/FaultTolerance.java index 6b32e8f6cb7..9ed7b017353 100644 --- a/fault-tolerance/fault-tolerance/src/main/java/io/helidon/faulttolerance/FaultTolerance.java +++ b/fault-tolerance/fault-tolerance/src/main/java/io/helidon/faulttolerance/FaultTolerance.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2023 Oracle and/or its affiliates. + * Copyright (c) 2020, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,11 +20,11 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import io.helidon.common.LazyValue; +import io.helidon.common.configurable.ThreadPoolSupplier; import io.helidon.config.Config; import static java.lang.System.Logger.Level.ERROR; @@ -54,9 +54,11 @@ public final class FaultTolerance { private static final AtomicReference CONFIG = new AtomicReference<>(Config.empty()); static { - EXECUTOR.set(LazyValue.create(() -> Executors.newThreadPerTaskExecutor(Thread.ofVirtual() - .name("helidon-ft-", 0) - .factory()))); + EXECUTOR.set(LazyValue.create(() -> ThreadPoolSupplier.builder() + .threadNamePrefix("helidon-ft-") + .virtualThreads(true) + .build() + .get())); } private FaultTolerance() { diff --git a/fault-tolerance/fault-tolerance/src/test/java/io/helidon/faulttolerance/AsyncTest.java b/fault-tolerance/fault-tolerance/src/test/java/io/helidon/faulttolerance/AsyncTest.java index f9d40053bfb..1b77023fea7 100644 --- a/fault-tolerance/fault-tolerance/src/test/java/io/helidon/faulttolerance/AsyncTest.java +++ b/fault-tolerance/fault-tolerance/src/test/java/io/helidon/faulttolerance/AsyncTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, 2023 Oracle and/or its affiliates. + * Copyright (c) 2022, 2024 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import io.helidon.common.context.Context; +import io.helidon.common.context.Contexts; + import org.junit.jupiter.api.Test; import static org.hamcrest.CoreMatchers.endsWith; @@ -61,6 +64,24 @@ void testThreadName() throws Exception { assertThat(threadName, endsWith(": async")); } + @Test + void testContextPropagation() throws Exception { + Context context = Context.create(); + CompletableFuture cf = new CompletableFuture<>(); + Contexts.runInContext(context, () -> { + try { + Async async = Async.create(); + async.invoke(() -> { + cf.complete(Contexts.context().orElse(null)); + return null; + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + assertThat(cf.get(WAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS), is(context)); + } + private Thread testAsync(Async async) { try { CompletableFuture cf = new CompletableFuture<>();