From 34ca886d027eff8a2f4081fe45f6fb1b26201ca7 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 7 Mar 2022 08:46:39 +0100 Subject: [PATCH] prepend the actor system name in dispatcher-related scheduled actions, fixes #1132 (#1138) --- .../akka_25/DispatcherInstrumentation.scala | 19 +++-- .../akka_26/DispatcherInstrumentation.scala | 8 +- .../akka/DispatcherMetricsSpec.scala | 24 ++++-- .../executor/ExecutorInstrumentation.scala | 76 +++++++++++++------ 4 files changed, 88 insertions(+), 39 deletions(-) diff --git a/instrumentation/kamon-akka/src/akka-2.5/scala/kamon/instrumentation/akka/instrumentations/akka_25/DispatcherInstrumentation.scala b/instrumentation/kamon-akka/src/akka-2.5/scala/kamon/instrumentation/akka/instrumentations/akka_25/DispatcherInstrumentation.scala index bf1e7b157..ac73a7364 100644 --- a/instrumentation/kamon-akka/src/akka-2.5/scala/kamon/instrumentation/akka/instrumentations/akka_25/DispatcherInstrumentation.scala +++ b/instrumentation/kamon-akka/src/akka-2.5/scala/kamon/instrumentation/akka/instrumentations/akka_25/DispatcherInstrumentation.scala @@ -117,17 +117,20 @@ object InstrumentNewExecutorServiceOnAkka24 { def around(@This factory: HasDispatcherPrerequisites with HasDispatcherName, @SuperCall callable: Callable[ExecutorService]): ExecutorService = { val executor = callable.call() + val actorSystemName = factory.dispatcherPrerequisites.settings.name val dispatcherName = factory.dispatcherName - val systemTags = TagSet.of("akka.system", factory.dispatcherPrerequisites.settings.name) + val scheduledActionName = actorSystemName + "/" + dispatcherName + val systemTags = TagSet.of("akka.system", actorSystemName) + if(Kamon.filter(AkkaInstrumentation.TrackDispatcherFilterName).accept(dispatcherName)) { val defaultEcOption = factory.dispatcherPrerequisites.defaultExecutionContext if(dispatcherName == Dispatchers.DefaultDispatcherId && defaultEcOption.isDefined) { - ExecutorInstrumentation.instrumentExecutionContext(defaultEcOption.get, dispatcherName, systemTags) + ExecutorInstrumentation.instrumentExecutionContext(defaultEcOption.get, dispatcherName, systemTags, scheduledActionName, ExecutorInstrumentation.DefaultSettings) .underlyingExecutor.getOrElse(executor) } else { - ExecutorInstrumentation.instrument(executor, dispatcherName, systemTags) + ExecutorInstrumentation.instrument(executor, dispatcherName, systemTags, scheduledActionName, ExecutorInstrumentation.DefaultSettings) } } else executor @@ -139,22 +142,24 @@ object InstrumentNewExecutorServiceOnAkka25 { def around(@This factory: HasDispatcherPrerequisites with HasDispatcherName, @SuperCall callable: Callable[ExecutorService]): ExecutorService = { val executor = callable.call() + val actorSystemName = factory.dispatcherPrerequisites.settings.name val dispatcherName = factory.dispatcherName - val systemTags = TagSet.of("akka.system", factory.dispatcherPrerequisites.settings.name) + val scheduledActionName = actorSystemName + "/" + dispatcherName + val systemTags = TagSet.of("akka.system", actorSystemName) if(Kamon.filter(AkkaInstrumentation.TrackDispatcherFilterName).accept(dispatcherName)) { val defaultEcOption = factory.dispatcherPrerequisites.defaultExecutionContext if(dispatcherName == Dispatchers.DefaultDispatcherId && defaultEcOption.isDefined) { - ExecutorInstrumentation.instrumentExecutionContext(defaultEcOption.get, dispatcherName, systemTags) + ExecutorInstrumentation.instrumentExecutionContext(defaultEcOption.get, dispatcherName, systemTags, scheduledActionName, ExecutorInstrumentation.DefaultSettings) .underlyingExecutor.getOrElse(executor) } else { executor match { case afjp: ForkJoinPool => - ExecutorInstrumentation.instrument(executor, telemetryReader(afjp), dispatcherName, systemTags, ExecutorInstrumentation.DefaultSettings) + ExecutorInstrumentation.instrument(executor, telemetryReader(afjp), dispatcherName, systemTags, scheduledActionName, ExecutorInstrumentation.DefaultSettings) case otherExecutor => - ExecutorInstrumentation.instrument(otherExecutor, dispatcherName, systemTags) + ExecutorInstrumentation.instrument(otherExecutor, dispatcherName, systemTags, scheduledActionName, ExecutorInstrumentation.DefaultSettings) } } } else executor diff --git a/instrumentation/kamon-akka/src/akka-2.6/scala/kamon/instrumentation/akka/instrumentations/akka_26/DispatcherInstrumentation.scala b/instrumentation/kamon-akka/src/akka-2.6/scala/kamon/instrumentation/akka/instrumentations/akka_26/DispatcherInstrumentation.scala index eee000727..f7e5f19ad 100644 --- a/instrumentation/kamon-akka/src/akka-2.6/scala/kamon/instrumentation/akka/instrumentations/akka_26/DispatcherInstrumentation.scala +++ b/instrumentation/kamon-akka/src/akka-2.6/scala/kamon/instrumentation/akka/instrumentations/akka_26/DispatcherInstrumentation.scala @@ -105,17 +105,19 @@ object InstrumentNewExecutorServiceOnAkka26 { def around(@This factory: HasDispatcherPrerequisites with HasDispatcherName, @SuperCall callable: Callable[ExecutorService]): ExecutorService = { val executor = callable.call() + val actorSystemName = factory.dispatcherPrerequisites.settings.name val dispatcherName = factory.dispatcherName - val systemTags = TagSet.of("akka.system", factory.dispatcherPrerequisites.settings.name) + val scheduledActionName = actorSystemName + "/" + dispatcherName + val systemTags = TagSet.of("akka.system", actorSystemName) if(Kamon.filter(AkkaInstrumentation.TrackDispatcherFilterName).accept(dispatcherName)) { val defaultEcOption = factory.dispatcherPrerequisites.defaultExecutionContext if(dispatcherName == Dispatchers.DefaultDispatcherId && defaultEcOption.isDefined) { - ExecutorInstrumentation.instrumentExecutionContext(defaultEcOption.get, dispatcherName, systemTags) + ExecutorInstrumentation.instrumentExecutionContext(defaultEcOption.get, dispatcherName, systemTags, scheduledActionName, ExecutorInstrumentation.DefaultSettings) .underlyingExecutor.getOrElse(executor) } else { - ExecutorInstrumentation.instrument(executor, dispatcherName, systemTags) + ExecutorInstrumentation.instrument(executor, dispatcherName, systemTags, scheduledActionName, ExecutorInstrumentation.DefaultSettings) } } else executor } diff --git a/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/DispatcherMetricsSpec.scala b/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/DispatcherMetricsSpec.scala index 1a49ff8be..e363b7659 100644 --- a/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/DispatcherMetricsSpec.scala +++ b/instrumentation/kamon-akka/src/test-common/scala/kamon/instrumentation/akka/DispatcherMetricsSpec.scala @@ -30,7 +30,8 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike import java.util.concurrent.Executors -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration._ class DispatcherMetricsSpec extends TestKit(ActorSystem("DispatcherMetricsSpec")) with AnyWordSpecLike with Matchers with MetricInspection.Syntax with BeforeAndAfterAll with ImplicitSender with Eventually { @@ -46,7 +47,9 @@ class DispatcherMetricsSpec extends TestKit(ActorSystem("DispatcherMetricsSpec") val excluded = "explicitly-excluded" val allDispatchers = trackedDispatchers :+ excluded - val builtInDispatchers = Seq("akka.actor.default-dispatcher")++ (if(Version.current.startsWith("2.6")) Seq("akka.actor.internal-dispatcher") else Seq.empty) + val builtInDispatchers = Seq("akka.actor.default-dispatcher") ++ { + if(Version.current.startsWith("2.6")) Seq("akka.actor.internal-dispatcher") else Seq.empty + } "track dispatchers configured in the akka.dispatcher filter" in { @@ -56,10 +59,10 @@ class DispatcherMetricsSpec extends TestKit(ActorSystem("DispatcherMetricsSpec") val queues = ExecutorMetrics.QueueSize.tagValues("name") val tasks = ExecutorMetrics.TasksCompleted.tagValues("name") - trackedDispatchers.forall { dispatcher => - threads.contains(dispatcher) && - queues.contains(dispatcher) && - tasks.contains(dispatcher) + trackedDispatchers.forall { dispatcherName => + threads.contains(dispatcherName) && + queues.contains(dispatcherName) && + tasks.contains(dispatcherName) } should be (true) Seq(threads, queues, tasks).flatten should not contain excluded @@ -98,6 +101,7 @@ class DispatcherMetricsSpec extends TestKit(ActorSystem("DispatcherMetricsSpec") .map(_.get(plain("name"))) instrumentExecutorsWithSystem should contain only(builtInDispatchers: _*) + Await.result(system.terminate(), 5 seconds) } "pick up default execution contexts provided when creating an actor system when the type is unknown" in { @@ -108,7 +112,13 @@ class DispatcherMetricsSpec extends TestKit(ActorSystem("DispatcherMetricsSpec") .filter(_.get(plain("akka.system")) == system.name) .map(_.get(plain("name"))) - instrumentExecutorsWithSystem should contain only(builtInDispatchers: _*) + val builtInWithoutDefaultDispatcher = builtInDispatchers.filterNot(_.endsWith("default-dispatcher")) + if(builtInWithoutDefaultDispatcher.isEmpty) + instrumentExecutorsWithSystem shouldBe empty + else + instrumentExecutorsWithSystem should contain only(builtInWithoutDefaultDispatcher: _*) + + Await.result(system.terminate(), 5 seconds) } } diff --git a/instrumentation/kamon-executors/src/main/scala/kamon/instrumentation/executor/ExecutorInstrumentation.scala b/instrumentation/kamon-executors/src/main/scala/kamon/instrumentation/executor/ExecutorInstrumentation.scala index 99ce6e908..374049e20 100644 --- a/instrumentation/kamon-executors/src/main/scala/kamon/instrumentation/executor/ExecutorInstrumentation.scala +++ b/instrumentation/kamon-executors/src/main/scala/kamon/instrumentation/executor/ExecutorInstrumentation.scala @@ -27,7 +27,7 @@ import kamon.module.ScheduledAction import kamon.tag.TagSet import org.slf4j.LoggerFactory -import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService} +import scala.concurrent.ExecutionContext import scala.util.Try object ExecutorInstrumentation { @@ -60,7 +60,7 @@ object ExecutorInstrumentation { * Once the returned executor is shutdown, all related metric instruments will be removed. */ def instrumentExecutionContext(executionContext: ExecutionContext, name: String): InstrumentedExecutionContext = - instrumentExecutionContext(executionContext, name, TagSet.Empty, DefaultSettings) + instrumentExecutionContext(executionContext, name, TagSet.Empty, name, DefaultSettings) /** * Creates a new instrumented ScheduledExecutorService that wraps the provided one. The instrumented executor will @@ -74,7 +74,7 @@ object ExecutorInstrumentation { * Once the returned executor is shutdown, all related metric instruments will be removed. */ def instrumentScheduledExecutor(executor: ScheduledExecutorService, name: String): ScheduledExecutorService = - instrumentScheduledExecutor(executor, name, TagSet.Empty) + instrumentScheduledExecutor(executor, name, TagSet.Empty, name) /** * Creates a new instrumented ExecutorService that wraps the provided one. The instrumented executor will track @@ -104,7 +104,7 @@ object ExecutorInstrumentation { * Once the returned executor is shutdown, all related metric instruments will be removed. */ def instrumentExecutionContext(executionContext: ExecutionContext, name: String, settings: Settings): InstrumentedExecutionContext = - instrumentExecutionContext(executionContext, name, TagSet.Empty, settings) + instrumentExecutionContext(executionContext, name, TagSet.Empty, name, settings) /** * Creates a new instrumented ExecutorService that wraps the provided one. The instrumented executor will track @@ -132,7 +132,7 @@ object ExecutorInstrumentation { * Once the returned executor is shutdown, all related metric instruments will be removed. */ def instrumentExecutionContext(executionContext: ExecutionContext, name: String, extraTags: TagSet): InstrumentedExecutionContext = - instrumentExecutionContext(executionContext, name, extraTags, DefaultSettings) + instrumentExecutionContext(executionContext, name, extraTags, name, DefaultSettings) /** * Creates a new instrumented ExecutorService that wraps the provided one. The instrumented executor will track @@ -146,11 +146,26 @@ object ExecutorInstrumentation { * * Once the returned executor is shutdown, all related metric instruments will be removed. */ - def instrument(executor: ExecutorService, name: String, extraTags: TagSet, settings: Settings): ExecutorService = { + def instrument(executor: ExecutorService, name: String, extraTags: TagSet, settings: Settings): ExecutorService = + instrument(executor, name, extraTags, name, settings) + + /** + * Creates a new instrumented ExecutorService that wraps the provided one. The instrumented executor will track + * metrics for ThreadPoolExecutor and ForkJoinPool instances (both from Java and Scala) and optionally, track the + * time spent by each task on the wrapped executor's queue. + * + * All metrics related to the instrumented service will have the following tags: + * * all of the provided extraTags (take into account that any "name" or "type" tags will be overwritten. + * * name: set to the provided name parameter. + * * type: set to "ThreadPoolExecutor" executors or "ForkJoinPool". + * + * Once the returned executor is shutdown, all related metric instruments will be removed. + */ + def instrument(executor: ExecutorService, name: String, extraTags: TagSet, scheduledActionName: String, settings: Settings): ExecutorService = { executor match { - case tpe: ThreadPoolExecutor => new InstrumentedThreadPool(tpe, name, extraTags, settings) - case jfjp: JavaForkJoinPool => new InstrumentedForkJoinPool(jfjp, ForkJoinPoolTelemetryReader.forJava(jfjp), name, extraTags, settings) - case sfjp: ScalaForkJoinPool => new InstrumentedForkJoinPool(sfjp, ForkJoinPoolTelemetryReader.forScala(sfjp), name, extraTags, settings) + case tpe: ThreadPoolExecutor => new InstrumentedThreadPool(tpe, name, extraTags, scheduledActionName, settings) + case jfjp: JavaForkJoinPool => new InstrumentedForkJoinPool(jfjp, ForkJoinPoolTelemetryReader.forJava(jfjp), name, extraTags, scheduledActionName, settings) + case sfjp: ScalaForkJoinPool => new InstrumentedForkJoinPool(sfjp, ForkJoinPoolTelemetryReader.forScala(sfjp), name, extraTags, scheduledActionName, settings) case anyOther => _logger.warn("Cannot instrument unknown executor [{}]", anyOther) executor @@ -169,11 +184,28 @@ object ExecutorInstrumentation { * * Once the returned executor is shutdown, all related metric instruments will be removed. */ - def instrumentScheduledExecutor(executor: ScheduledExecutorService, name: String, extraTags: TagSet): ScheduledExecutorService = { + def instrumentScheduledExecutor(executor: ScheduledExecutorService, name: String, extraTags: TagSet): ScheduledExecutorService = + instrumentScheduledExecutor(executor, name, extraTags, name) + + /** + * Creates a new instrumented ScheduledExecutorService that wraps the provided one. The instrumented executor will + * track metrics for a ScheduledThreadPoolExecutor, but will not perform any context propagation nor track the time + * in queue metric for submitted tasks. + * + * All metrics related to the instrumented service will have the following tags: + * * all of the provided extraTags (take into account that any "name" or "type" tags will be overwritten. + * * name: set to the provided name parameter. + * * type: set to "ScheduledThreadPoolExecutor". + * + * Once the returned executor is shutdown, all related metric instruments will be removed. + */ + def instrumentScheduledExecutor(executor: ScheduledExecutorService, name: String, extraTags: TagSet, + scheduledActionName: String): ScheduledExecutorService = { + executor match { case stpe: ScheduledThreadPoolExecutor => - new InstrumentedScheduledThreadPoolExecutor(stpe, name, extraTags.withTag("scheduled", true)) + new InstrumentedScheduledThreadPoolExecutor(stpe, name, extraTags.withTag("scheduled", true), scheduledActionName) case anyOther => _logger.warn("Cannot instrument unknown executor [{}]", anyOther) @@ -193,11 +225,11 @@ object ExecutorInstrumentation { * * Once the returned executor is shutdown, all related metric instruments will be removed. */ - def instrumentExecutionContext(executionContext: ExecutionContext, name: String, extraTags: TagSet, + def instrumentExecutionContext(executionContext: ExecutionContext, name: String, extraTags: TagSet, scheduledActionName: String, settings: Settings): InstrumentedExecutionContext = { val underlyingExecutor = unwrapExecutionContext(executionContext) - .map(executor => instrument(executor, name, extraTags, settings)) + .map(executor => instrument(executor, name, extraTags, scheduledActionName, settings)) new InstrumentedExecutionContext(executionContext, underlyingExecutor) } @@ -215,8 +247,8 @@ object ExecutorInstrumentation { * Once the returned executor is shutdown, all related metric instruments will be removed. */ def instrument(executor: ExecutorService, telemetryReader: ForkJoinPoolTelemetryReader, name: String, extraTags: TagSet, - settings: Settings): ExecutorService = { - new InstrumentedForkJoinPool(executor, telemetryReader, name, extraTags, settings) + scheduledActionName: String, settings: Settings): ExecutorService = { + new InstrumentedForkJoinPool(executor, telemetryReader, name, extraTags, scheduledActionName, settings) } /** @@ -310,15 +342,15 @@ object ExecutorInstrumentation { * * The instruments used to track the pool's behavior are removed once the pool is shut down. */ - class InstrumentedThreadPool(wrapped: ThreadPoolExecutor, name: String, extraTags: TagSet, settings: Settings) - extends ExecutorService { + class InstrumentedThreadPool(wrapped: ThreadPoolExecutor, name: String, extraTags: TagSet, scheduledActionName: String, + settings: Settings) extends ExecutorService { private val _runnableWrapper = buildRunnableWrapper() private val _callableWrapper = buildCallableWrapper() private val _instruments = new ExecutorMetrics.ThreadPoolInstruments(name, extraTags, executorType) private val _timeInQueueTimer = _instruments.timeInQueue private val _collectorRegistration = Kamon.addScheduledAction( - name, + scheduledActionName, Some(s"Updates health metrics for the ${name} thread pool every ${_sampleInterval.getSeconds} seconds"), new ScheduledAction { val submittedTasksSource = Counter.delta(() => wrapped.getTaskCount) @@ -501,8 +533,8 @@ object ExecutorInstrumentation { * * The instruments used to track the pool's behavior are removed once the pool is shut down. */ - class InstrumentedScheduledThreadPoolExecutor(wrapped: ScheduledThreadPoolExecutor, name: String, extraTags: TagSet) - extends InstrumentedThreadPool(wrapped, name, extraTags, NoExtraSettings) with ScheduledExecutorService { + class InstrumentedScheduledThreadPoolExecutor(wrapped: ScheduledThreadPoolExecutor, name: String, extraTags: TagSet, scheduledActionName: String) + extends InstrumentedThreadPool(wrapped, name, extraTags, scheduledActionName, NoExtraSettings) with ScheduledExecutorService { override protected def executorType: String = "ScheduledThreadPoolExecutor" @@ -530,7 +562,7 @@ object ExecutorInstrumentation { * The instruments used to track the pool's behavior are removed once the pool is shut down. */ class InstrumentedForkJoinPool(wrapped: ExecutorService, telemetryReader: ForkJoinPoolTelemetryReader, name: String, - extraTags: TagSet, settings: Settings) extends ExecutorService { + extraTags: TagSet, scheduledActionName: String, settings: Settings) extends ExecutorService { private val _runnableWrapper = buildRunnableWrapper() private val _callableWrapper = buildCallableWrapper() @@ -539,7 +571,7 @@ object ExecutorInstrumentation { private val _submittedTasksCounter: LongAdder = new LongAdder private val _completedTasksCounter: LongAdder = new LongAdder private val _collectorRegistration = Kamon.addScheduledAction( - name, + scheduledActionName, Some(s"Updates health metrics for the ${name} thread pool every ${_sampleInterval.getSeconds} seconds"), new ScheduledAction { val submittedTasksSource = Counter.delta(() => _submittedTasksCounter.longValue())