Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PropagatedContextElement executed 2x on blocking ExecutorService #11355

Open
jefersonestevo opened this issue Nov 18, 2024 · 0 comments
Open

Comments

@jefersonestevo
Copy link

jefersonestevo commented Nov 18, 2024

Expected Behavior

When running a PropagatedContextElement using the blocking ExecutorService, it must executes the updateThreadContext() just 1 time for each execution (aside from the initial call to the propagate() method).

Actual Behaviour

Actually, when using the blocking ExecutorService, the method updateThreadContext() is being called twice while using the io and virtual ExecutorServices it`s called correctly (1x for each runnable).

Investigating a little inside the Micronaut code I think the problem happens because:

  1. To created the blocking ExecutorService (on the IOExecutorServiceConfig), it gets the io OR the virtual ExecutorService (depending if the environment has Virtual Threads or not) already generated instance
  2. On the other hand, the ExecutorServiceInstrumenter (from micronaut-context-propagation) will wrap every bean of type ExecutorService on an internal instance of the InstrumentedExecutorService
  3. Given this situation, the timeline is:
    -> The io ExecutorService is created and wrapped on the InstrumentedExecutorService
    -> The virtual ExecutorService is created and wrapped on the InstrumentedExecutorService
    -> The blocking ExecutorService is mapped to use the io OR virtual ExecutorServices, that was already wrapped on the InstrumentedExecutorService, and wraps it again (when it pass through the ExecutorServiceInstrumenter)

The simplest solution that I can think of (and I dont know exactly the implications to the whole framework) would be to change the ExecutorServiceInstrumenterto **no** create an instrumenter if theExecutorService is already instrumented. Itll need that instead of using an dynamic class for the InstrumentedExecutorService that we have a named class to compare (instanceof) against the current event.getBean())

Steps To Reproduce

I do not have an example application but here are the steps to easily reproduce this situation:

  1. Create a new app:
    mn create-app --build=gradle --jdk=21 --lang=java --test=spock com.example.micronaut_instrumenter_test

  2. Add the following dependency to the build.gradle:
    implementation("io.micronaut:micronaut-context-propagation")

  3. Use the following unit test to see that using the blocking ExecutorService we execute the PropagatedContextElement 3x (instead of 2x from the others ExecutorServices):

package com.example

import io.micronaut.context.ApplicationContext
import io.micronaut.core.propagation.PropagatedContext
import io.micronaut.core.propagation.ThreadPropagatedContextElement
import io.micronaut.inject.qualifiers.Qualifiers
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import spock.lang.Specification

import java.util.concurrent.ExecutorService
import java.util.concurrent.atomic.AtomicInteger

@MicronautTest
class PropagatedContextSpec extends Specification {

    @Inject
    ApplicationContext applicationContext

    void 'test PropagatedContext are correctly called for ExecutorServices io, virtual and blocking'() {
        given:
        ExecutorService io = applicationContext.getBean(ExecutorService, Qualifiers.byName("io"))
        ExecutorService virtual = applicationContext.getBean(ExecutorService, Qualifiers.byName("virtual"))
        ExecutorService blocking = applicationContext.getBean(ExecutorService, Qualifiers.byName("blocking"))

        and:
        TestPropagatedContext contextForIo = new TestPropagatedContext("io")
        TestPropagatedContext contextForVirtual = new TestPropagatedContext("virtual")
        TestPropagatedContext contextForBlocking = new TestPropagatedContext("blocking")

        when:
        println("---------")
        println("Running IO ExecutorService:")
        try (PropagatedContext.Scope ignored = PropagatedContext.getOrEmpty().plus(contextForIo).propagate()) {
            io.submit {
                println("Executing IO Thread Service")
            }.get()
        }

        println("---------")
        println("Running Virtual ExecutorService:")
        try (PropagatedContext.Scope ignored = PropagatedContext.getOrEmpty().plus(contextForVirtual).propagate()) {
            virtual.submit {
                println("Executing Virtual Thread Service")
            }.get()
        }

        println("---------")
        println("Running Blocking ExecutorService:")
        try (PropagatedContext.Scope ignored = PropagatedContext.getOrEmpty().plus(contextForBlocking).propagate()) {
            blocking.submit {
                println("Executing Blocking Thread Service")
            }.get()
        }

        then: "Should be called 1x on the propagate() method and 1x by the ExecutorServiceInstrumenter"
        contextForIo.state() == 2

        and: "Should be called 1x on the propagate() method and 1x by the ExecutorServiceInstrumenter"
        contextForVirtual.state() == 2

        and: "Should be called 1x on the propagate() method and 1x by the ExecutorServiceInstrumenter but it is called 2x by the instrumenter"
        contextForBlocking.state() == 2 // FAIL - Returns 3
    }

    class TestPropagatedContext implements ThreadPropagatedContextElement<Integer> {

        private final String name
        private AtomicInteger counter = new AtomicInteger(0)

        TestPropagatedContext(String name) {
            this.name = name
        }

        Integer updateThreadContext() {
            int value = counter.incrementAndGet();
            println("Updating thread context for $name: $value")
            return value;
        }

        void restoreThreadContext(Integer oldState) {
            println("Restoring thread context for $name: $oldState")
        }

        Integer state() {
            return this.counter.get()
        }
    }

}

Environment Information

  • Operation System: Mac OS
  • JDK: 21.0.4

Example Application

No response

Version

4.7.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant