Skip to content

Commit

Permalink
prepend the actor system name in dispatcher-related scheduled actions,
Browse files Browse the repository at this point in the history
…fixes #1132 (#1138)
  • Loading branch information
ivantopo authored Mar 7, 2022
1 parent 2a916af commit 34ca886
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,20 @@ object InstrumentNewExecutorServiceOnAkka24 {

def around(@This factory: HasDispatcherPrerequisites with HasDispatcherName, @SuperCall callable: Callable[ExecutorService]): ExecutorService = {
val executor = callable.call()
val actorSystemName = factory.dispatcherPrerequisites.settings.name
val dispatcherName = factory.dispatcherName
val systemTags = TagSet.of("akka.system", factory.dispatcherPrerequisites.settings.name)
val scheduledActionName = actorSystemName + "/" + dispatcherName
val systemTags = TagSet.of("akka.system", actorSystemName)


if(Kamon.filter(AkkaInstrumentation.TrackDispatcherFilterName).accept(dispatcherName)) {
val defaultEcOption = factory.dispatcherPrerequisites.defaultExecutionContext

if(dispatcherName == Dispatchers.DefaultDispatcherId && defaultEcOption.isDefined) {
ExecutorInstrumentation.instrumentExecutionContext(defaultEcOption.get, dispatcherName, systemTags)
ExecutorInstrumentation.instrumentExecutionContext(defaultEcOption.get, dispatcherName, systemTags, scheduledActionName, ExecutorInstrumentation.DefaultSettings)
.underlyingExecutor.getOrElse(executor)
} else {
ExecutorInstrumentation.instrument(executor, dispatcherName, systemTags)
ExecutorInstrumentation.instrument(executor, dispatcherName, systemTags, scheduledActionName, ExecutorInstrumentation.DefaultSettings)
}
} else
executor
Expand All @@ -139,22 +142,24 @@ object InstrumentNewExecutorServiceOnAkka25 {

def around(@This factory: HasDispatcherPrerequisites with HasDispatcherName, @SuperCall callable: Callable[ExecutorService]): ExecutorService = {
val executor = callable.call()
val actorSystemName = factory.dispatcherPrerequisites.settings.name
val dispatcherName = factory.dispatcherName
val systemTags = TagSet.of("akka.system", factory.dispatcherPrerequisites.settings.name)
val scheduledActionName = actorSystemName + "/" + dispatcherName
val systemTags = TagSet.of("akka.system", actorSystemName)

if(Kamon.filter(AkkaInstrumentation.TrackDispatcherFilterName).accept(dispatcherName)) {
val defaultEcOption = factory.dispatcherPrerequisites.defaultExecutionContext

if(dispatcherName == Dispatchers.DefaultDispatcherId && defaultEcOption.isDefined) {
ExecutorInstrumentation.instrumentExecutionContext(defaultEcOption.get, dispatcherName, systemTags)
ExecutorInstrumentation.instrumentExecutionContext(defaultEcOption.get, dispatcherName, systemTags, scheduledActionName, ExecutorInstrumentation.DefaultSettings)
.underlyingExecutor.getOrElse(executor)
} else {
executor match {
case afjp: ForkJoinPool =>
ExecutorInstrumentation.instrument(executor, telemetryReader(afjp), dispatcherName, systemTags, ExecutorInstrumentation.DefaultSettings)
ExecutorInstrumentation.instrument(executor, telemetryReader(afjp), dispatcherName, systemTags, scheduledActionName, ExecutorInstrumentation.DefaultSettings)

case otherExecutor =>
ExecutorInstrumentation.instrument(otherExecutor, dispatcherName, systemTags)
ExecutorInstrumentation.instrument(otherExecutor, dispatcherName, systemTags, scheduledActionName, ExecutorInstrumentation.DefaultSettings)
}
}
} else executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,19 @@ object InstrumentNewExecutorServiceOnAkka26 {

def around(@This factory: HasDispatcherPrerequisites with HasDispatcherName, @SuperCall callable: Callable[ExecutorService]): ExecutorService = {
val executor = callable.call()
val actorSystemName = factory.dispatcherPrerequisites.settings.name
val dispatcherName = factory.dispatcherName
val systemTags = TagSet.of("akka.system", factory.dispatcherPrerequisites.settings.name)
val scheduledActionName = actorSystemName + "/" + dispatcherName
val systemTags = TagSet.of("akka.system", actorSystemName)

if(Kamon.filter(AkkaInstrumentation.TrackDispatcherFilterName).accept(dispatcherName)) {
val defaultEcOption = factory.dispatcherPrerequisites.defaultExecutionContext

if(dispatcherName == Dispatchers.DefaultDispatcherId && defaultEcOption.isDefined) {
ExecutorInstrumentation.instrumentExecutionContext(defaultEcOption.get, dispatcherName, systemTags)
ExecutorInstrumentation.instrumentExecutionContext(defaultEcOption.get, dispatcherName, systemTags, scheduledActionName, ExecutorInstrumentation.DefaultSettings)
.underlyingExecutor.getOrElse(executor)
} else {
ExecutorInstrumentation.instrument(executor, dispatcherName, systemTags)
ExecutorInstrumentation.instrument(executor, dispatcherName, systemTags, scheduledActionName, ExecutorInstrumentation.DefaultSettings)
}
} else executor
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._

class DispatcherMetricsSpec extends TestKit(ActorSystem("DispatcherMetricsSpec")) with AnyWordSpecLike with Matchers with MetricInspection.Syntax
with BeforeAndAfterAll with ImplicitSender with Eventually {
Expand All @@ -46,7 +47,9 @@ class DispatcherMetricsSpec extends TestKit(ActorSystem("DispatcherMetricsSpec")

val excluded = "explicitly-excluded"
val allDispatchers = trackedDispatchers :+ excluded
val builtInDispatchers = Seq("akka.actor.default-dispatcher")++ (if(Version.current.startsWith("2.6")) Seq("akka.actor.internal-dispatcher") else Seq.empty)
val builtInDispatchers = Seq("akka.actor.default-dispatcher") ++ {
if(Version.current.startsWith("2.6")) Seq("akka.actor.internal-dispatcher") else Seq.empty
}


"track dispatchers configured in the akka.dispatcher filter" in {
Expand All @@ -56,10 +59,10 @@ class DispatcherMetricsSpec extends TestKit(ActorSystem("DispatcherMetricsSpec")
val queues = ExecutorMetrics.QueueSize.tagValues("name")
val tasks = ExecutorMetrics.TasksCompleted.tagValues("name")

trackedDispatchers.forall { dispatcher =>
threads.contains(dispatcher) &&
queues.contains(dispatcher) &&
tasks.contains(dispatcher)
trackedDispatchers.forall { dispatcherName =>
threads.contains(dispatcherName) &&
queues.contains(dispatcherName) &&
tasks.contains(dispatcherName)
} should be (true)

Seq(threads, queues, tasks).flatten should not contain excluded
Expand Down Expand Up @@ -98,6 +101,7 @@ class DispatcherMetricsSpec extends TestKit(ActorSystem("DispatcherMetricsSpec")
.map(_.get(plain("name")))

instrumentExecutorsWithSystem should contain only(builtInDispatchers: _*)
Await.result(system.terminate(), 5 seconds)
}

"pick up default execution contexts provided when creating an actor system when the type is unknown" in {
Expand All @@ -108,7 +112,13 @@ class DispatcherMetricsSpec extends TestKit(ActorSystem("DispatcherMetricsSpec")
.filter(_.get(plain("akka.system")) == system.name)
.map(_.get(plain("name")))

instrumentExecutorsWithSystem should contain only(builtInDispatchers: _*)
val builtInWithoutDefaultDispatcher = builtInDispatchers.filterNot(_.endsWith("default-dispatcher"))
if(builtInWithoutDefaultDispatcher.isEmpty)
instrumentExecutorsWithSystem shouldBe empty
else
instrumentExecutorsWithSystem should contain only(builtInWithoutDefaultDispatcher: _*)

Await.result(system.terminate(), 5 seconds)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kamon.module.ScheduledAction
import kamon.tag.TagSet
import org.slf4j.LoggerFactory

import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
import scala.concurrent.ExecutionContext
import scala.util.Try

object ExecutorInstrumentation {
Expand Down Expand Up @@ -60,7 +60,7 @@ object ExecutorInstrumentation {
* Once the returned executor is shutdown, all related metric instruments will be removed.
*/
def instrumentExecutionContext(executionContext: ExecutionContext, name: String): InstrumentedExecutionContext =
instrumentExecutionContext(executionContext, name, TagSet.Empty, DefaultSettings)
instrumentExecutionContext(executionContext, name, TagSet.Empty, name, DefaultSettings)

/**
* Creates a new instrumented ScheduledExecutorService that wraps the provided one. The instrumented executor will
Expand All @@ -74,7 +74,7 @@ object ExecutorInstrumentation {
* Once the returned executor is shutdown, all related metric instruments will be removed.
*/
def instrumentScheduledExecutor(executor: ScheduledExecutorService, name: String): ScheduledExecutorService =
instrumentScheduledExecutor(executor, name, TagSet.Empty)
instrumentScheduledExecutor(executor, name, TagSet.Empty, name)

/**
* Creates a new instrumented ExecutorService that wraps the provided one. The instrumented executor will track
Expand Down Expand Up @@ -104,7 +104,7 @@ object ExecutorInstrumentation {
* Once the returned executor is shutdown, all related metric instruments will be removed.
*/
def instrumentExecutionContext(executionContext: ExecutionContext, name: String, settings: Settings): InstrumentedExecutionContext =
instrumentExecutionContext(executionContext, name, TagSet.Empty, settings)
instrumentExecutionContext(executionContext, name, TagSet.Empty, name, settings)

/**
* Creates a new instrumented ExecutorService that wraps the provided one. The instrumented executor will track
Expand Down Expand Up @@ -132,7 +132,7 @@ object ExecutorInstrumentation {
* Once the returned executor is shutdown, all related metric instruments will be removed.
*/
def instrumentExecutionContext(executionContext: ExecutionContext, name: String, extraTags: TagSet): InstrumentedExecutionContext =
instrumentExecutionContext(executionContext, name, extraTags, DefaultSettings)
instrumentExecutionContext(executionContext, name, extraTags, name, DefaultSettings)

/**
* Creates a new instrumented ExecutorService that wraps the provided one. The instrumented executor will track
Expand All @@ -146,11 +146,26 @@ object ExecutorInstrumentation {
*
* Once the returned executor is shutdown, all related metric instruments will be removed.
*/
def instrument(executor: ExecutorService, name: String, extraTags: TagSet, settings: Settings): ExecutorService = {
def instrument(executor: ExecutorService, name: String, extraTags: TagSet, settings: Settings): ExecutorService =
instrument(executor, name, extraTags, name, settings)

/**
* Creates a new instrumented ExecutorService that wraps the provided one. The instrumented executor will track
* metrics for ThreadPoolExecutor and ForkJoinPool instances (both from Java and Scala) and optionally, track the
* time spent by each task on the wrapped executor's queue.
*
* All metrics related to the instrumented service will have the following tags:
* * all of the provided extraTags (take into account that any "name" or "type" tags will be overwritten.
* * name: set to the provided name parameter.
* * type: set to "ThreadPoolExecutor" executors or "ForkJoinPool".
*
* Once the returned executor is shutdown, all related metric instruments will be removed.
*/
def instrument(executor: ExecutorService, name: String, extraTags: TagSet, scheduledActionName: String, settings: Settings): ExecutorService = {
executor match {
case tpe: ThreadPoolExecutor => new InstrumentedThreadPool(tpe, name, extraTags, settings)
case jfjp: JavaForkJoinPool => new InstrumentedForkJoinPool(jfjp, ForkJoinPoolTelemetryReader.forJava(jfjp), name, extraTags, settings)
case sfjp: ScalaForkJoinPool => new InstrumentedForkJoinPool(sfjp, ForkJoinPoolTelemetryReader.forScala(sfjp), name, extraTags, settings)
case tpe: ThreadPoolExecutor => new InstrumentedThreadPool(tpe, name, extraTags, scheduledActionName, settings)
case jfjp: JavaForkJoinPool => new InstrumentedForkJoinPool(jfjp, ForkJoinPoolTelemetryReader.forJava(jfjp), name, extraTags, scheduledActionName, settings)
case sfjp: ScalaForkJoinPool => new InstrumentedForkJoinPool(sfjp, ForkJoinPoolTelemetryReader.forScala(sfjp), name, extraTags, scheduledActionName, settings)
case anyOther =>
_logger.warn("Cannot instrument unknown executor [{}]", anyOther)
executor
Expand All @@ -169,11 +184,28 @@ object ExecutorInstrumentation {
*
* Once the returned executor is shutdown, all related metric instruments will be removed.
*/
def instrumentScheduledExecutor(executor: ScheduledExecutorService, name: String, extraTags: TagSet): ScheduledExecutorService = {
def instrumentScheduledExecutor(executor: ScheduledExecutorService, name: String, extraTags: TagSet): ScheduledExecutorService =
instrumentScheduledExecutor(executor, name, extraTags, name)

/**
* Creates a new instrumented ScheduledExecutorService that wraps the provided one. The instrumented executor will
* track metrics for a ScheduledThreadPoolExecutor, but will not perform any context propagation nor track the time
* in queue metric for submitted tasks.
*
* All metrics related to the instrumented service will have the following tags:
* * all of the provided extraTags (take into account that any "name" or "type" tags will be overwritten.
* * name: set to the provided name parameter.
* * type: set to "ScheduledThreadPoolExecutor".
*
* Once the returned executor is shutdown, all related metric instruments will be removed.
*/
def instrumentScheduledExecutor(executor: ScheduledExecutorService, name: String, extraTags: TagSet,
scheduledActionName: String): ScheduledExecutorService = {

executor match {

case stpe: ScheduledThreadPoolExecutor =>
new InstrumentedScheduledThreadPoolExecutor(stpe, name, extraTags.withTag("scheduled", true))
new InstrumentedScheduledThreadPoolExecutor(stpe, name, extraTags.withTag("scheduled", true), scheduledActionName)

case anyOther =>
_logger.warn("Cannot instrument unknown executor [{}]", anyOther)
Expand All @@ -193,11 +225,11 @@ object ExecutorInstrumentation {
*
* Once the returned executor is shutdown, all related metric instruments will be removed.
*/
def instrumentExecutionContext(executionContext: ExecutionContext, name: String, extraTags: TagSet,
def instrumentExecutionContext(executionContext: ExecutionContext, name: String, extraTags: TagSet, scheduledActionName: String,
settings: Settings): InstrumentedExecutionContext = {

val underlyingExecutor = unwrapExecutionContext(executionContext)
.map(executor => instrument(executor, name, extraTags, settings))
.map(executor => instrument(executor, name, extraTags, scheduledActionName, settings))

new InstrumentedExecutionContext(executionContext, underlyingExecutor)
}
Expand All @@ -215,8 +247,8 @@ object ExecutorInstrumentation {
* Once the returned executor is shutdown, all related metric instruments will be removed.
*/
def instrument(executor: ExecutorService, telemetryReader: ForkJoinPoolTelemetryReader, name: String, extraTags: TagSet,
settings: Settings): ExecutorService = {
new InstrumentedForkJoinPool(executor, telemetryReader, name, extraTags, settings)
scheduledActionName: String, settings: Settings): ExecutorService = {
new InstrumentedForkJoinPool(executor, telemetryReader, name, extraTags, scheduledActionName, settings)
}

/**
Expand Down Expand Up @@ -310,15 +342,15 @@ object ExecutorInstrumentation {
*
* The instruments used to track the pool's behavior are removed once the pool is shut down.
*/
class InstrumentedThreadPool(wrapped: ThreadPoolExecutor, name: String, extraTags: TagSet, settings: Settings)
extends ExecutorService {
class InstrumentedThreadPool(wrapped: ThreadPoolExecutor, name: String, extraTags: TagSet, scheduledActionName: String,
settings: Settings) extends ExecutorService {

private val _runnableWrapper = buildRunnableWrapper()
private val _callableWrapper = buildCallableWrapper()
private val _instruments = new ExecutorMetrics.ThreadPoolInstruments(name, extraTags, executorType)
private val _timeInQueueTimer = _instruments.timeInQueue
private val _collectorRegistration = Kamon.addScheduledAction(
name,
scheduledActionName,
Some(s"Updates health metrics for the ${name} thread pool every ${_sampleInterval.getSeconds} seconds"),
new ScheduledAction {
val submittedTasksSource = Counter.delta(() => wrapped.getTaskCount)
Expand Down Expand Up @@ -501,8 +533,8 @@ object ExecutorInstrumentation {
*
* The instruments used to track the pool's behavior are removed once the pool is shut down.
*/
class InstrumentedScheduledThreadPoolExecutor(wrapped: ScheduledThreadPoolExecutor, name: String, extraTags: TagSet)
extends InstrumentedThreadPool(wrapped, name, extraTags, NoExtraSettings) with ScheduledExecutorService {
class InstrumentedScheduledThreadPoolExecutor(wrapped: ScheduledThreadPoolExecutor, name: String, extraTags: TagSet, scheduledActionName: String)
extends InstrumentedThreadPool(wrapped, name, extraTags, scheduledActionName, NoExtraSettings) with ScheduledExecutorService {

override protected def executorType: String =
"ScheduledThreadPoolExecutor"
Expand Down Expand Up @@ -530,7 +562,7 @@ object ExecutorInstrumentation {
* The instruments used to track the pool's behavior are removed once the pool is shut down.
*/
class InstrumentedForkJoinPool(wrapped: ExecutorService, telemetryReader: ForkJoinPoolTelemetryReader, name: String,
extraTags: TagSet, settings: Settings) extends ExecutorService {
extraTags: TagSet, scheduledActionName: String, settings: Settings) extends ExecutorService {

private val _runnableWrapper = buildRunnableWrapper()
private val _callableWrapper = buildCallableWrapper()
Expand All @@ -539,7 +571,7 @@ object ExecutorInstrumentation {
private val _submittedTasksCounter: LongAdder = new LongAdder
private val _completedTasksCounter: LongAdder = new LongAdder
private val _collectorRegistration = Kamon.addScheduledAction(
name,
scheduledActionName,
Some(s"Updates health metrics for the ${name} thread pool every ${_sampleInterval.getSeconds} seconds"),
new ScheduledAction {
val submittedTasksSource = Counter.delta(() => _submittedTasksCounter.longValue())
Expand Down

0 comments on commit 34ca886

Please sign in to comment.