diff --git a/instrumentation/kamon-executors/src/main/scala/kamon/instrumentation/executor/ExecutorInstrumentation.scala b/instrumentation/kamon-executors/src/main/scala/kamon/instrumentation/executor/ExecutorInstrumentation.scala index 85dea9717..99ce6e908 100644 --- a/instrumentation/kamon-executors/src/main/scala/kamon/instrumentation/executor/ExecutorInstrumentation.scala +++ b/instrumentation/kamon-executors/src/main/scala/kamon/instrumentation/executor/ExecutorInstrumentation.scala @@ -148,7 +148,6 @@ object ExecutorInstrumentation { */ def instrument(executor: ExecutorService, name: String, extraTags: TagSet, settings: Settings): ExecutorService = { executor match { - case es: ExecutorService if isWrapper(es) => instrument(unwrap(es), name, extraTags, settings) case tpe: ThreadPoolExecutor => new InstrumentedThreadPool(tpe, name, extraTags, settings) case jfjp: JavaForkJoinPool => new InstrumentedForkJoinPool(jfjp, ForkJoinPoolTelemetryReader.forJava(jfjp), name, extraTags, settings) case sfjp: ScalaForkJoinPool => new InstrumentedForkJoinPool(sfjp, ForkJoinPoolTelemetryReader.forScala(sfjp), name, extraTags, settings) @@ -172,8 +171,6 @@ object ExecutorInstrumentation { */ def instrumentScheduledExecutor(executor: ScheduledExecutorService, name: String, extraTags: TagSet): ScheduledExecutorService = { executor match { - case es: ScheduledExecutorService if isWrapper(es) => - instrumentScheduledExecutor(unwrap(es).asInstanceOf[ScheduledExecutorService], name, extraTags) case stpe: ScheduledThreadPoolExecutor => new InstrumentedScheduledThreadPoolExecutor(stpe, name, extraTags.withTag("scheduled", true)) @@ -251,34 +248,12 @@ object ExecutorInstrumentation { new Settings(shouldTrackTimeInQueue, false) } - - private val _delegatedExecutorClass = Class.forName("java.util.concurrent.Executors$DelegatedExecutorService") - private val _finalizableDelegatedClass = Class.forName("java.util.concurrent.Executors$FinalizableDelegatedExecutorService") - private val _delegateScheduledClass = Class.forName("java.util.concurrent.Executors$DelegatedScheduledExecutorService") - private val _delegatedExecutorField = { - val field = _delegatedExecutorClass.getDeclaredField("e") - field.setAccessible(true) - field - } - private val _executionContextExecutorField = { val field = Class.forName("scala.concurrent.impl.ExecutionContextImpl").getDeclaredField("executor") field.setAccessible(true) field } - private def isAssignableTo(executor: ExecutorService, expectedClass: Class[_]): Boolean = - expectedClass.isAssignableFrom(executor.getClass) - - private def isWrapper(executor: ExecutorService): Boolean = { - isAssignableTo(executor, _delegatedExecutorClass) || - isAssignableTo(executor, _finalizableDelegatedClass) || - isAssignableTo(executor, _delegateScheduledClass) - } - - private def unwrap(delegatedExecutor: ExecutorService): ExecutorService = - _delegatedExecutorField.get(delegatedExecutor).asInstanceOf[ExecutorService] - private def unwrapExecutionContext(executionContext: ExecutionContext): Option[ExecutorService] = try { @@ -286,7 +261,7 @@ object ExecutorInstrumentation { // or ExecutionContext.fromExecutorService. Some(_executionContextExecutorField.get(executionContext).asInstanceOf[ExecutorService]) } catch { - case anyError => + case _: Throwable => _logger.warn("Cannot unwrap unsupported ExecutionContext [{}]", executionContext) None } diff --git a/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/ExecutorMetricsSpec.scala b/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/ExecutorMetricsSpec.scala index d2af589b6..b82d6a6b0 100644 --- a/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/ExecutorMetricsSpec.scala +++ b/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/ExecutorMetricsSpec.scala @@ -38,16 +38,6 @@ class ExecutorMetricsSpec extends AnyWordSpec with Matchers with InstrumentInspe "the ExecutorServiceMetrics" should { - "register a SingleThreadPool, collect their metrics and remove it" in { - val singleThreadPoolExecutor = JavaExecutors.newSingleThreadExecutor() - val registeredPool = ExecutorInstrumentation.instrument(singleThreadPoolExecutor, "single-thread-pool-metrics") - - ExecutorMetrics.ThreadsActive.tagValues("name") should contain ("single-thread-pool-metrics") - ExecutorMetrics.ThreadsActive.tagValues("type") should contain ("ThreadPoolExecutor") - - registeredPool.shutdown() - } - "register a ThreadPoolExecutor, collect their metrics and remove it" in { val threadPoolExecutor = JavaExecutors.newCachedThreadPool() val registeredPool = ExecutorInstrumentation.instrument(threadPoolExecutor, "thread-pool-executor-metrics") @@ -59,7 +49,7 @@ class ExecutorMetricsSpec extends AnyWordSpec with Matchers with InstrumentInspe } "register a ScheduledThreadPoolExecutor, collect their metrics and remove it" in { - val scheduledThreadPoolExecutor = JavaExecutors.newSingleThreadScheduledExecutor() + val scheduledThreadPoolExecutor = JavaExecutors.newScheduledThreadPool(1) val registeredPool = ExecutorInstrumentation.instrument(scheduledThreadPoolExecutor, "scheduled-thread-pool-executor-metrics") ExecutorMetrics.ThreadsActive.tagValues("name") should contain ("scheduled-thread-pool-executor-metrics") diff --git a/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/ExecutorsRegistrationSpec.scala b/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/ExecutorsRegistrationSpec.scala index d532a5f93..c34a032b0 100644 --- a/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/ExecutorsRegistrationSpec.scala +++ b/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/ExecutorsRegistrationSpec.scala @@ -34,31 +34,15 @@ class ExecutorsRegistrationSpec extends AnyWordSpec with Matchers with MetricIns val registeredForkJoin = ExecutorInstrumentation.instrument(new JavaForkJoinPool(1), "fjp") val registeredThreadPool = ExecutorInstrumentation.instrument(JavaExecutors.newFixedThreadPool(1), "thread-pool") val registeredScheduled = ExecutorInstrumentation.instrument(JavaExecutors.newScheduledThreadPool(1), "scheduled-thread-pool") - val registeredSingle = ExecutorInstrumentation.instrument(JavaExecutors.newSingleThreadExecutor(), "single-thread-pool") - val registeredSingleScheduled = ExecutorInstrumentation.instrument(JavaExecutors.newSingleThreadScheduledExecutor(), "single-scheduled-thread-pool") - val registeredSingleAsScheduled = ExecutorInstrumentation.instrumentScheduledExecutor(JavaExecutors.newSingleThreadScheduledExecutor(), "single-scheduled-thread-pool-as-scheduled") - val registeredUThreadPool = ExecutorInstrumentation.instrument(JavaExecutors.unconfigurableExecutorService(JavaExecutors.newFixedThreadPool(1)), "unconfigurable-thread-pool") - val registeredUScheduled = ExecutorInstrumentation.instrument(JavaExecutors.unconfigurableScheduledExecutorService(JavaExecutors.newScheduledThreadPool(1)), "unconfigurable-scheduled-thread-pool") val registeredExecContext = ExecutorInstrumentation.instrumentExecutionContext(ExecutionContext.fromExecutorService(JavaExecutors.newFixedThreadPool(1)), "execution-context") assertContainsAllExecutorNames(ThreadsActive.tagValues("name")) assertContainsAllExecutorNames(TasksSubmitted.tagValues("name")) assertContainsAllExecutorNames(QueueSize.tagValues("name")) - val (scheduledPoolOne, _) = ThreadsActive.instruments(TagSet.of("name", "single-scheduled-thread-pool")).head - val (scheduledPoolTwo, _) = ThreadsActive.instruments(TagSet.of("name", "single-scheduled-thread-pool-as-scheduled")).head - - scheduledPoolOne.get(coerce("type")) shouldBe "ThreadPoolExecutor" - scheduledPoolTwo.get(coerce("type")) shouldBe "ScheduledThreadPoolExecutor" - registeredForkJoin.shutdown() registeredThreadPool.shutdown() registeredScheduled.shutdown() - registeredSingle.shutdown() - registeredSingleScheduled.shutdown() - registeredSingleAsScheduled.shutdown() - registeredUThreadPool.shutdown() - registeredUScheduled.shutdown() registeredExecContext.shutdown() assertDoesNotContainAllExecutorNames(ThreadsActive.tagValues("name")) @@ -79,11 +63,6 @@ class ExecutorsRegistrationSpec extends AnyWordSpec with Matchers with MetricIns "fjp", "thread-pool", "scheduled-thread-pool", - "single-thread-pool", - "single-scheduled-thread-pool", - "single-scheduled-thread-pool-as-scheduled", - "unconfigurable-thread-pool", - "unconfigurable-scheduled-thread-pool", "execution-context" ) } @@ -93,11 +72,6 @@ class ExecutorsRegistrationSpec extends AnyWordSpec with Matchers with MetricIns "fjp", "thread-pool", "scheduled-thread-pool", - "single-thread-pool", - "single-scheduled-thread-pool", - "single-scheduled-thread-pool-as-scheduled", - "unconfigurable-thread-pool", - "unconfigurable-scheduled-thread-pool", "execution-context" ) } diff --git a/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/OnSubmitContextPropagationSpec.scala b/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/OnSubmitContextPropagationSpec.scala index f7c869c63..bf3bcb454 100644 --- a/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/OnSubmitContextPropagationSpec.scala +++ b/instrumentation/kamon-executors/src/test/scala/kamon/instrumentation/executor/OnSubmitContextPropagationSpec.scala @@ -45,7 +45,7 @@ class OnSubmitContextPropagationSpec extends AnyWordSpec with Matchers with Cont } "capture the context when call execute(Runnable) in ThreadPool" in { - val executor = instrument(JavaExecutors.newSingleThreadExecutor()) + val executor = instrument(JavaExecutors.newFixedThreadPool(1)) val ctx = Kamon.runWithContext(testContext("in-runnable-body")) { val runnable = new SimpleRunnable executor.execute(runnable) @@ -57,7 +57,7 @@ class OnSubmitContextPropagationSpec extends AnyWordSpec with Matchers with Cont } "capture the context when call submit(Runnable) in ThreadPool" in { - val executor = instrument(JavaExecutors.newSingleThreadExecutor()) + val executor = instrument(JavaExecutors.newFixedThreadPool(1)) val ctx = Kamon.runWithContext(testContext("in-runnable-body")) { val runnable = new SimpleRunnable executor.submit(runnable) @@ -121,7 +121,7 @@ class OnSubmitContextPropagationSpec extends AnyWordSpec with Matchers with Cont } "capture the context when call submit(Callable) in ThreadPool" in { - val executor = instrument(JavaExecutors.newSingleThreadExecutor()) + val executor = instrument(JavaExecutors.newFixedThreadPool(1)) val ctx = Kamon.runWithContext(testContext("in-callable-body")) { val callable = new SimpleCallable executor.submit(callable)