Skip to content

Commit

Permalink
remove reflective access from the executors instrumentation, fixes #641
Browse files Browse the repository at this point in the history
… (#1136)
  • Loading branch information
ivantopo authored Mar 7, 2022
1 parent f44dca0 commit 2a916af
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ object ExecutorInstrumentation {
*/
def instrument(executor: ExecutorService, name: String, extraTags: TagSet, settings: Settings): ExecutorService = {
executor match {
case es: ExecutorService if isWrapper(es) => instrument(unwrap(es), name, extraTags, settings)
case tpe: ThreadPoolExecutor => new InstrumentedThreadPool(tpe, name, extraTags, settings)
case jfjp: JavaForkJoinPool => new InstrumentedForkJoinPool(jfjp, ForkJoinPoolTelemetryReader.forJava(jfjp), name, extraTags, settings)
case sfjp: ScalaForkJoinPool => new InstrumentedForkJoinPool(sfjp, ForkJoinPoolTelemetryReader.forScala(sfjp), name, extraTags, settings)
Expand All @@ -172,8 +171,6 @@ object ExecutorInstrumentation {
*/
def instrumentScheduledExecutor(executor: ScheduledExecutorService, name: String, extraTags: TagSet): ScheduledExecutorService = {
executor match {
case es: ScheduledExecutorService if isWrapper(es) =>
instrumentScheduledExecutor(unwrap(es).asInstanceOf[ScheduledExecutorService], name, extraTags)

case stpe: ScheduledThreadPoolExecutor =>
new InstrumentedScheduledThreadPoolExecutor(stpe, name, extraTags.withTag("scheduled", true))
Expand Down Expand Up @@ -251,42 +248,20 @@ object ExecutorInstrumentation {
new Settings(shouldTrackTimeInQueue, false)
}


private val _delegatedExecutorClass = Class.forName("java.util.concurrent.Executors$DelegatedExecutorService")
private val _finalizableDelegatedClass = Class.forName("java.util.concurrent.Executors$FinalizableDelegatedExecutorService")
private val _delegateScheduledClass = Class.forName("java.util.concurrent.Executors$DelegatedScheduledExecutorService")
private val _delegatedExecutorField = {
val field = _delegatedExecutorClass.getDeclaredField("e")
field.setAccessible(true)
field
}

private val _executionContextExecutorField = {
val field = Class.forName("scala.concurrent.impl.ExecutionContextImpl").getDeclaredField("executor")
field.setAccessible(true)
field
}

private def isAssignableTo(executor: ExecutorService, expectedClass: Class[_]): Boolean =
expectedClass.isAssignableFrom(executor.getClass)

private def isWrapper(executor: ExecutorService): Boolean = {
isAssignableTo(executor, _delegatedExecutorClass) ||
isAssignableTo(executor, _finalizableDelegatedClass) ||
isAssignableTo(executor, _delegateScheduledClass)
}

private def unwrap(delegatedExecutor: ExecutorService): ExecutorService =
_delegatedExecutorField.get(delegatedExecutor).asInstanceOf[ExecutorService]

private def unwrapExecutionContext(executionContext: ExecutionContext): Option[ExecutorService] =
try {

// This function can only unwrap ExecutionContext instances created via ExecutionContext.fromExecutor
// or ExecutionContext.fromExecutorService.
Some(_executionContextExecutorField.get(executionContext).asInstanceOf[ExecutorService])
} catch {
case anyError =>
case _: Throwable =>
_logger.warn("Cannot unwrap unsupported ExecutionContext [{}]", executionContext)
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,6 @@ class ExecutorMetricsSpec extends AnyWordSpec with Matchers with InstrumentInspe


"the ExecutorServiceMetrics" should {
"register a SingleThreadPool, collect their metrics and remove it" in {
val singleThreadPoolExecutor = JavaExecutors.newSingleThreadExecutor()
val registeredPool = ExecutorInstrumentation.instrument(singleThreadPoolExecutor, "single-thread-pool-metrics")

ExecutorMetrics.ThreadsActive.tagValues("name") should contain ("single-thread-pool-metrics")
ExecutorMetrics.ThreadsActive.tagValues("type") should contain ("ThreadPoolExecutor")

registeredPool.shutdown()
}

"register a ThreadPoolExecutor, collect their metrics and remove it" in {
val threadPoolExecutor = JavaExecutors.newCachedThreadPool()
val registeredPool = ExecutorInstrumentation.instrument(threadPoolExecutor, "thread-pool-executor-metrics")
Expand All @@ -59,7 +49,7 @@ class ExecutorMetricsSpec extends AnyWordSpec with Matchers with InstrumentInspe
}

"register a ScheduledThreadPoolExecutor, collect their metrics and remove it" in {
val scheduledThreadPoolExecutor = JavaExecutors.newSingleThreadScheduledExecutor()
val scheduledThreadPoolExecutor = JavaExecutors.newScheduledThreadPool(1)
val registeredPool = ExecutorInstrumentation.instrument(scheduledThreadPoolExecutor, "scheduled-thread-pool-executor-metrics")

ExecutorMetrics.ThreadsActive.tagValues("name") should contain ("scheduled-thread-pool-executor-metrics")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,31 +34,15 @@ class ExecutorsRegistrationSpec extends AnyWordSpec with Matchers with MetricIns
val registeredForkJoin = ExecutorInstrumentation.instrument(new JavaForkJoinPool(1), "fjp")
val registeredThreadPool = ExecutorInstrumentation.instrument(JavaExecutors.newFixedThreadPool(1), "thread-pool")
val registeredScheduled = ExecutorInstrumentation.instrument(JavaExecutors.newScheduledThreadPool(1), "scheduled-thread-pool")
val registeredSingle = ExecutorInstrumentation.instrument(JavaExecutors.newSingleThreadExecutor(), "single-thread-pool")
val registeredSingleScheduled = ExecutorInstrumentation.instrument(JavaExecutors.newSingleThreadScheduledExecutor(), "single-scheduled-thread-pool")
val registeredSingleAsScheduled = ExecutorInstrumentation.instrumentScheduledExecutor(JavaExecutors.newSingleThreadScheduledExecutor(), "single-scheduled-thread-pool-as-scheduled")
val registeredUThreadPool = ExecutorInstrumentation.instrument(JavaExecutors.unconfigurableExecutorService(JavaExecutors.newFixedThreadPool(1)), "unconfigurable-thread-pool")
val registeredUScheduled = ExecutorInstrumentation.instrument(JavaExecutors.unconfigurableScheduledExecutorService(JavaExecutors.newScheduledThreadPool(1)), "unconfigurable-scheduled-thread-pool")
val registeredExecContext = ExecutorInstrumentation.instrumentExecutionContext(ExecutionContext.fromExecutorService(JavaExecutors.newFixedThreadPool(1)), "execution-context")

assertContainsAllExecutorNames(ThreadsActive.tagValues("name"))
assertContainsAllExecutorNames(TasksSubmitted.tagValues("name"))
assertContainsAllExecutorNames(QueueSize.tagValues("name"))

val (scheduledPoolOne, _) = ThreadsActive.instruments(TagSet.of("name", "single-scheduled-thread-pool")).head
val (scheduledPoolTwo, _) = ThreadsActive.instruments(TagSet.of("name", "single-scheduled-thread-pool-as-scheduled")).head

scheduledPoolOne.get(coerce("type")) shouldBe "ThreadPoolExecutor"
scheduledPoolTwo.get(coerce("type")) shouldBe "ScheduledThreadPoolExecutor"

registeredForkJoin.shutdown()
registeredThreadPool.shutdown()
registeredScheduled.shutdown()
registeredSingle.shutdown()
registeredSingleScheduled.shutdown()
registeredSingleAsScheduled.shutdown()
registeredUThreadPool.shutdown()
registeredUScheduled.shutdown()
registeredExecContext.shutdown()

assertDoesNotContainAllExecutorNames(ThreadsActive.tagValues("name"))
Expand All @@ -79,11 +63,6 @@ class ExecutorsRegistrationSpec extends AnyWordSpec with Matchers with MetricIns
"fjp",
"thread-pool",
"scheduled-thread-pool",
"single-thread-pool",
"single-scheduled-thread-pool",
"single-scheduled-thread-pool-as-scheduled",
"unconfigurable-thread-pool",
"unconfigurable-scheduled-thread-pool",
"execution-context"
)
}
Expand All @@ -93,11 +72,6 @@ class ExecutorsRegistrationSpec extends AnyWordSpec with Matchers with MetricIns
"fjp",
"thread-pool",
"scheduled-thread-pool",
"single-thread-pool",
"single-scheduled-thread-pool",
"single-scheduled-thread-pool-as-scheduled",
"unconfigurable-thread-pool",
"unconfigurable-scheduled-thread-pool",
"execution-context"
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class OnSubmitContextPropagationSpec extends AnyWordSpec with Matchers with Cont
}

"capture the context when call execute(Runnable) in ThreadPool" in {
val executor = instrument(JavaExecutors.newSingleThreadExecutor())
val executor = instrument(JavaExecutors.newFixedThreadPool(1))
val ctx = Kamon.runWithContext(testContext("in-runnable-body")) {
val runnable = new SimpleRunnable
executor.execute(runnable)
Expand All @@ -57,7 +57,7 @@ class OnSubmitContextPropagationSpec extends AnyWordSpec with Matchers with Cont
}

"capture the context when call submit(Runnable) in ThreadPool" in {
val executor = instrument(JavaExecutors.newSingleThreadExecutor())
val executor = instrument(JavaExecutors.newFixedThreadPool(1))
val ctx = Kamon.runWithContext(testContext("in-runnable-body")) {
val runnable = new SimpleRunnable
executor.submit(runnable)
Expand Down Expand Up @@ -121,7 +121,7 @@ class OnSubmitContextPropagationSpec extends AnyWordSpec with Matchers with Cont
}

"capture the context when call submit(Callable) in ThreadPool" in {
val executor = instrument(JavaExecutors.newSingleThreadExecutor())
val executor = instrument(JavaExecutors.newFixedThreadPool(1))
val ctx = Kamon.runWithContext(testContext("in-callable-body")) {
val callable = new SimpleCallable
executor.submit(callable)
Expand Down

0 comments on commit 2a916af

Please sign in to comment.