Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release scala-3 versions of kamon-akka, kamon-pekko and kamon-pekko-http #1295

Closed
wants to merge 10 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
some tests passing
hughsimpson authored and TjarkoG committed Nov 8, 2023
commit a5c4581b5532ac33c1ff330888da3a2f7a554221
Original file line number Diff line number Diff line change
@@ -28,6 +28,9 @@ import kanela.agent.libs.net.bytebuddy.asm.Advice.{Argument, OnMethodEnter, OnMe

class ActorInstrumentation extends InstrumentationBuilder {

onType("akka.actor.dungeon.Dispatch")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've caused a regression with Akka 2.5 with this one. It looks like this advice will need to be split between Akka versions

.advise(method("sendMessage").and(takesArguments(1)), SendMessageAdvice)

/**
* This is where most of the Actor processing magic happens. Handling of messages, errors and system messages.
*/
@@ -36,7 +39,6 @@ class ActorInstrumentation extends InstrumentationBuilder {
.advise(isConstructor, ActorCellConstructorAdvice)
.advise(method("invoke"), classOf[ActorCellInvokeAdvice])
.advise(method("handleInvokeFailure"), HandleInvokeFailureMethodAdvice)
.advise(method("sendMessage").and(takesArguments(1)), SendMessageAdvice)
.advise(method("terminate"), TerminateMethodAdvice)
.advise(method("swapMailbox"), ActorCellSwapMailboxAdvice)
.advise(method("invokeAll$1"), InvokeAllMethodInterceptor)
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ import org.apache.pekko.http.scaladsl.server.RouteResult.Rejected
import org.apache.pekko.http.scaladsl.server._
import org.apache.pekko.http.scaladsl.server.directives.RouteDirectives.reject
import org.apache.pekko.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet}
import org.apache.pekko.http.scaladsl.server.util.Tupler
import org.apache.pekko.http.scaladsl.server.util.{Tuple, Tupler}
import org.apache.pekko.stream.scaladsl.Flow

import java.util.concurrent.Callable
@@ -267,7 +267,7 @@ object PathDirectivesRawPathPrefixInterceptor {
import BasicDirectives._

def rawPathPrefix[T](@Argument(0) matcher: PathMatcher[T]): Directive[T] = {
implicit val LIsTuple = matcher.ev
implicit val LIsTuple: Tuple[T] = matcher.ev

extract { ctx =>
val fullPath = ctx.unmatchedPath.toString()
Original file line number Diff line number Diff line change
@@ -153,7 +153,6 @@ kanela.modules {
"kamon.instrumentation.pekko.instrumentations.EventStreamInstrumentation",
"kamon.instrumentation.pekko.instrumentations.ActorRefInstrumentation",
"kamon.instrumentation.pekko.instrumentations.DispatcherInstrumentation",
"kamon.instrumentation.pekko.instrumentations.ActorMonitorInstrumentation",
"kamon.instrumentation.pekko.instrumentations.SchedulerInstrumentation",
"kamon.instrumentation.pekko.instrumentations.ClusterInstrumentation"
]
Original file line number Diff line number Diff line change
@@ -16,44 +16,50 @@

package kamon.instrumentation.pekko.instrumentations

import org.apache.pekko.actor.{ActorRef, ActorSystem}
import kamon.Kamon
import kamon.context.Storage.Scope
import kamon.instrumentation.pekko.instrumentations.HasActorMonitor.actorMonitor
import kamon.instrumentation.context.{HasContext, HasTimestamp}
import kamon.instrumentation.pekko.instrumentations.HasActorMonitor.actorMonitor
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice
import kanela.agent.libs.net.bytebuddy.asm.Advice.{Argument, OnMethodEnter, OnMethodExit, This}
import org.apache.pekko.actor.instrumentation.ReplaceWithAdvice
import org.apache.pekko.actor.{ActorRef, ActorSystem}

import scala.annotation.static

class ActorInstrumentation extends InstrumentationBuilder {

onType("org.apache.pekko.actor.dungeon.Dispatch")
.advise(method("sendMessage").and(takesArguments(1)), classOf[SendMessageAdvice])

/**
* This is where most of the Actor processing magic happens. Handling of messages, errors and system messages.
*/
* This is where most of the Actor processing magic happens. Handling of messages, errors and system messages.
*/
onType("org.apache.pekko.actor.ActorCell")
.mixin(classOf[HasActorMonitor.Mixin])
.advise(isConstructor, ActorCellConstructorAdvice)
.advise(isConstructor, classOf[ActorCellConstructorAdvice])
.advise(method("invoke"), classOf[ActorCellInvokeAdvice])
.advise(method("handleInvokeFailure"), HandleInvokeFailureMethodAdvice)
.advise(method("sendMessage").and(takesArguments(1)), SendMessageAdvice)
.advise(method("terminate"), TerminateMethodAdvice)
.advise(method("swapMailbox"), ActorCellSwapMailboxAdvice)
.advise(method("invokeAll$1"), InvokeAllMethodInterceptor)
.advise(method("handleInvokeFailure"), classOf[HandleInvokeFailureMethodAdvice])
.advise(method("sendMessage").and(takesArguments(1)), classOf[SendMessageAdvice])
.advise(method("terminate"), classOf[TerminateMethodAdvice])
.advise(method("swapMailbox"), classOf[ActorCellSwapMailboxAdvice])
.advise(method("invokeAll$1"), classOf[InvokeAllMethodInterceptor])

/**
* Ensures that the Context is properly propagated when messages are temporarily stored on an UnstartedCell.
*/
* Ensures that the Context is properly propagated when messages are temporarily stored on an UnstartedCell.
*/
onType("org.apache.pekko.actor.UnstartedCell")
.mixin(classOf[HasActorMonitor.Mixin])
.advise(isConstructor, RepointableActorCellConstructorAdvice)
.advise(method("sendMessage").and(takesArguments(1)), SendMessageAdvice)
.advise(isConstructor, classOf[RepointableActorCellConstructorAdvice])
.advise(method("sendMessage").and(takesArguments(1)), classOf[SendMessageAdvice])
.advise(method("replaceWith"), classOf[ReplaceWithAdvice])

}

trait HasActorMonitor {
def actorMonitor: ActorMonitor

def setActorMonitor(actorMonitor: ActorMonitor): Unit
}

@@ -68,76 +74,90 @@ object HasActorMonitor {
cell.asInstanceOf[HasActorMonitor].actorMonitor
}

class ActorCellSwapMailboxAdvice

object ActorCellSwapMailboxAdvice {

@Advice.OnMethodEnter
def enter(@Advice.This cell: Any, @Advice.Argument(0) newMailbox: Any): Boolean = {
@static def enter(@Advice.This cell: Any, @Advice.Argument(0) newMailbox: Any): Boolean = {
val isShuttingDown = PekkoPrivateAccess.isDeadLettersMailbox(cell, newMailbox)
if(isShuttingDown)
if (isShuttingDown)
actorMonitor(cell).onTerminationStart()

isShuttingDown
}

@Advice.OnMethodExit
def exit(@Advice.This cell: Any, @Advice.Return oldMailbox: Any, @Advice.Enter isShuttingDown: Boolean): Unit = {
if(oldMailbox != null && isShuttingDown) {
@static def exit(@Advice.This cell: Any, @Advice.Return oldMailbox: Any, @Advice.Enter isShuttingDown: Boolean): Unit = {
if (oldMailbox != null && isShuttingDown) {
actorMonitor(cell).onDroppedMessages(PekkoPrivateAccess.mailboxMessageCount(oldMailbox))
}
}
}

class InvokeAllMethodInterceptor

object InvokeAllMethodInterceptor {

@Advice.OnMethodEnter
def enter(@Advice.Argument(0) message: Any): Option[Scope] =
@static def enter(@Advice.Argument(0) message: Any): Option[Scope] =
message match {
case m: HasContext => Some(Kamon.storeContext(m.context))
case _ => None
}

@Advice.OnMethodExit
def exit(@Advice.Enter scope: Option[Scope]): Unit =
@static def exit(@Advice.Enter scope: Option[Scope]): Unit =
scope.foreach(_.close())
}

class SendMessageAdvice

object SendMessageAdvice {

@OnMethodEnter(suppress = classOf[Throwable])
def onEnter(@This cell: Any, @Argument(0) envelope: Object): Unit = {
@static def onEnter(@This cell: Any, @Argument(0) envelope: Object): Unit = {

val instrumentation = actorMonitor(cell)
envelope.asInstanceOf[HasContext].setContext(instrumentation.captureEnvelopeContext())
envelope.asInstanceOf[HasTimestamp].setTimestamp(instrumentation.captureEnvelopeTimestamp())
}
}

class RepointableActorCellConstructorAdvice

object RepointableActorCellConstructorAdvice {

@Advice.OnMethodExit(suppress = classOf[Throwable])
def onExit(@This cell: Any, @Argument(0) system: ActorSystem, @Argument(1) ref: ActorRef, @Argument(3) parent: ActorRef): Unit =
@static def onExit(@This cell: Any, @Argument(0) system: ActorSystem, @Argument(1) ref: ActorRef, @Argument(3) parent: ActorRef): Unit =
cell.asInstanceOf[HasActorMonitor].setActorMonitor(ActorMonitor.from(cell, ref, parent, system))
}

class ActorCellConstructorAdvice

object ActorCellConstructorAdvice {

@OnMethodExit(suppress = classOf[Throwable])
def onExit(@This cell: Any, @Argument(0) system: ActorSystem, @Argument(1) ref: ActorRef, @Argument(4) parent: ActorRef): Unit =
@static def onExit(@This cell: Any, @Argument(0) system: ActorSystem, @Argument(1) ref: ActorRef, @Argument(4) parent: ActorRef): Unit =
cell.asInstanceOf[HasActorMonitor].setActorMonitor(ActorMonitor.from(cell, ref, parent, system))
}

class HandleInvokeFailureMethodAdvice

object HandleInvokeFailureMethodAdvice {

@OnMethodEnter(suppress = classOf[Throwable])
def onEnter(@This cell: Any, @Argument(1) failure: Throwable): Unit =
@static def onEnter(@This cell: Any, @Argument(1) failure: Throwable): Unit =
actorMonitor(cell).onFailure(failure)

}

class TerminateMethodAdvice

object TerminateMethodAdvice {

@OnMethodEnter(suppress = classOf[Throwable])
def onEnter(@This cell: Any): Unit = {
@static def onEnter(@This cell: Any): Unit = {
actorMonitor(cell).cleanup()

if (PekkoPrivateAccess.isRoutedActorCell(cell)) {
Original file line number Diff line number Diff line change
@@ -23,6 +23,8 @@ import kamon.instrumentation.context.HasContext
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice.{Argument, Enter, OnMethodEnter, OnMethodExit}

import scala.annotation.static

class ActorLoggingInstrumentation extends InstrumentationBuilder {

/**
@@ -33,16 +35,17 @@ class ActorLoggingInstrumentation extends InstrumentationBuilder {
.mixin(classOf[HasContext.MixinWithInitializer])

onType("org.apache.pekko.event.slf4j.Slf4jLogger")
.advise(method("withMdc"), WithMdcMethodAdvice)
.advise(method("withMdc"), classOf[WithMdcMethodAdvice])
}

class WithMdcMethodAdvice
object WithMdcMethodAdvice {

@OnMethodEnter
def enter(@Argument(1) logEvent: LogEvent): Scope =
@static def enter(@Argument(1) logEvent: LogEvent): Scope =
Kamon.storeContext(logEvent.asInstanceOf[HasContext].context)

@OnMethodExit
def exit(@Enter scope: Scope): Unit =
@static def exit(@Enter scope: Scope): Unit =
scope.close()
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -7,6 +7,8 @@ import kamon.instrumentation.context.HasContext
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice

import scala.annotation.static

class ActorRefInstrumentation extends InstrumentationBuilder {

/**
@@ -22,7 +24,7 @@ class ActorRefInstrumentation extends InstrumentationBuilder {
*/
onType("org.apache.pekko.actor.RepointableActorRef")
.mixin(classOf[HasContext.MixinWithInitializer])
.advise(method("point"), RepointableActorRefPointAdvice)
.advise(method("point"), classOf[RepointableActorRefPointAdvice])
}

trait HasGroupPath {
@@ -38,14 +40,15 @@ object HasGroupPath {
}
}

class RepointableActorRefPointAdvice
object RepointableActorRefPointAdvice {

@Advice.OnMethodEnter
def enter(@Advice.This repointableActorRef: Object): Scope =
@static def enter(@Advice.This repointableActorRef: Object): Scope =
Kamon.storeContext(repointableActorRef.asInstanceOf[HasContext].context)

@Advice.OnMethodExit
def exit(@Advice.Enter scope: Scope, @Advice.This repointableActorRef: Object): Unit = {
@static def exit(@Advice.Enter scope: Scope, @Advice.This repointableActorRef: Object): Unit = {
scope.close()

repointableActorRef
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice.{Argument, OnMethodExit, Origin, Return}
import org.slf4j.LoggerFactory

import scala.annotation.static
import scala.compat.Platform.EOL
import scala.concurrent.Future

@@ -52,7 +53,7 @@ object AskPatternInstrumentation {
)

@OnMethodExit(suppress = classOf[Throwable])
def onExit(@Origin origin: String, @Return future: Future[AnyRef], @Argument(0) actor: ActorRef, @Argument(2) timeout: Timeout) = {
@static def onExit(@Origin origin: String, @Return future: Future[AnyRef], @Argument(0) actor: ActorRef, @Argument(2) timeout: Timeout) = {

if(PekkoPrivateAccess.isInternalAndActiveActorRef(actor) && Kamon.currentContext().nonEmpty()) {
PekkoInstrumentation.settings().askPatternWarning match {
Original file line number Diff line number Diff line change
@@ -10,18 +10,20 @@ import kamon.tag.TagSet
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice

import scala.annotation.static
import scala.collection.mutable

class ClusterInstrumentation extends InstrumentationBuilder {

onType("org.apache.pekko.cluster.Cluster$")
.advise(method("createExtension").and(takesArguments(1)), AfterClusterInitializationAdvice)
.advise(method("createExtension").and(takesArguments(1)), classOf[AfterClusterInitializationAdvice])
}

class AfterClusterInitializationAdvice
object AfterClusterInitializationAdvice {

@Advice.OnMethodExit
def onClusterExtensionCreated(@Advice.Argument(0) system: ExtendedActorSystem, @Advice.Return clusterExtension: Cluster): Unit = {
@static def onClusterExtensionCreated(@Advice.Argument(0) system: ExtendedActorSystem, @Advice.Return clusterExtension: Cluster): Unit = {
val settings = PekkoInstrumentation.settings()
if(settings.exposeClusterMetrics) {
val stateExporter = system.systemActorOf(Props[ClusterInstrumentation.ClusterStateExporter](), "kamon-cluster-state-exporter")
Loading