Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Akka, pekko, and pekko-http to support scala 3 #1311

Merged
merged 18 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ lazy val `kamon-akka` = (project in file("instrumentation/kamon-akka"))
.enablePlugins(JavaAgent)
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings: _*)
.settings(crossScalaVersions += `scala_3_version`)
.dependsOn(
`kamon-scala-future` % "compile,common,akka-2.5,akka-2.6",
`kamon-testkit` % "test,test-common,test-akka-2.5,test-akka-2.6"
Expand Down Expand Up @@ -497,7 +498,10 @@ lazy val `kamon-pekko` = (project in file("instrumentation/kamon-pekko"))
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings: _*)
.settings(Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-actor" % pekkoHttpVersion % "provided"
)
))
.dependsOn(
`kamon-scala-future` % "compile",
Expand All @@ -511,8 +515,7 @@ lazy val `kamon-pekko-http` = (project in file("instrumentation/kamon-pekko-http
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings)
.settings(Seq(
javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.10" % "test",
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"org.apache.pekko" %% "pekko-http" % pekkoHttpVersion % "provided",
Expand Down
55 changes: 40 additions & 15 deletions instrumentation/kamon-akka/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Def.Initialize

val `Akka-2.4-version` = "2.4.20"
val `Akka-2.5-version` = "2.5.32"
val `Akka-2.6-version` = "2.6.20"
val `Akka-2.6-version` = "2.6.21"

/**
* Compile Configurations
Expand Down Expand Up @@ -31,7 +31,7 @@ configs(
// The Common configuration should always depend on the latest version of Akka. All code in the Common configuration
// should be source compatible with all Akka versions.
inConfig(Common)(Defaults.compileSettings ++ Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`)
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version)
))

libraryDependencies ++= { if(scalaBinaryVersion.value == "2.11") Seq.empty else Seq(
Expand All @@ -50,7 +50,7 @@ libraryDependencies ++= { if(scalaBinaryVersion.value == "2.11") Seq.empty else


inConfig(`Compile-Akka-2.6`)(Defaults.compileSettings ++ Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
sources := joinSources(Common, `Compile-Akka-2.6`).value
))

Expand All @@ -73,7 +73,7 @@ inConfig(`Compile-Akka-2.5`)(Defaults.compileSettings ++ Seq(
sources := joinSources(Common, `Compile-Akka-2.5`).value
))

libraryDependencies ++= Seq(
libraryDependencies ++= {if (scalaVersion.value startsWith "3") Seq.empty else Seq(
kanelaAgent % `Compile-Akka-2.5`,
scalatest % `Test-Akka-2.5`,
logbackClassic % `Test-Akka-2.5`,
Expand All @@ -85,21 +85,28 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster-sharding" % `Akka-2.5-version` % `Compile-Akka-2.5`,
"com.typesafe.akka" %% "akka-protobuf" % `Akka-2.5-version` % `Compile-Akka-2.5`,
"com.typesafe.akka" %% "akka-testkit" % `Akka-2.5-version` % `Test-Akka-2.5`
)
)}

// Ensure that the packaged artifact contains the instrumentation for all Akka versions.
Compile / packageBin / mappings := Def.taskDyn {
if(scalaBinaryVersion.value == "2.11")
if(scalaBinaryVersion.value == "2.11") {
Def.task {
joinProducts((`Compile-Akka-2.5` / products).value) ++
joinProducts((Common / unmanagedResourceDirectories).value)
}
else
} else if (scalaVersion.value startsWith "3") {
Def.task {
joinProducts((`Compile-Akka-2.6` / products).value) ++
joinProducts((Common / unmanagedResourceDirectories).value)
}
} else {
Def.task {
joinProducts(
(`Compile-Akka-2.5` / products).value ++
(`Compile-Akka-2.6` / products).value
) ++ joinProducts((Common / unmanagedResourceDirectories).value)}
) ++ joinProducts((Common / unmanagedResourceDirectories).value)
}
}
}.value

// Ensure that the packaged sources contains the instrumentation for all Akka versions.
Expand All @@ -108,26 +115,38 @@ Compile / packageSrc / mappings := Def.taskDyn {
Def.task {
(`Compile-Akka-2.5` / packageSrc / mappings).value ++
(Common / packageSrc / mappings).value
}
} else if (scalaVersion.value startsWith "3") {
Def.task {
(`Compile-Akka-2.6` / packageSrc / mappings).value ++
(Common / packageSrc / mappings).value
}
} else
} else {
Def.task {
(`Compile-Akka-2.5` / packageSrc / mappings).value ++
(`Compile-Akka-2.6` / packageSrc / mappings).value ++
(Common / packageSrc / mappings).value
}
}
}.value

// Compile will return the compile analysis for the Common configuration but will run on all Akka configurations.
Compile / compile := Def.taskDyn {
if(scalaBinaryVersion.value == "2.11")
if(scalaBinaryVersion.value == "2.11") {
Def.task {
(`Compile-Akka-2.5` / compile).value
}
else
} else if (scalaVersion.value startsWith "3"){

Def.task {
(`Compile-Akka-2.6` / compile).value
}
} else {
Def.task {
(`Compile-Akka-2.5` / compile).value
(`Compile-Akka-2.6` / compile).value
}
}
}.value

exportJars := true
Expand All @@ -145,7 +164,7 @@ lazy val baseTestSettings = Seq(
)

inConfig(TestCommon)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`)
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version)
))

inConfig(`Test-Akka-2.5`)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq(
Expand All @@ -155,20 +174,26 @@ inConfig(`Test-Akka-2.5`)(Defaults.testSettings ++ instrumentationSettings ++ ba
))

inConfig(`Test-Akka-2.6`)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
sources := joinSources(TestCommon, `Test-Akka-2.6`).value,
unmanagedResourceDirectories ++= (Common / unmanagedResourceDirectories).value,
unmanagedResourceDirectories ++= (TestCommon / unmanagedResourceDirectories).value
))

Test / test := Def.taskDyn {
if(scalaBinaryVersion.value == "2.11")
if(scalaBinaryVersion.value == "2.11") {
Def.task {
(`Test-Akka-2.5` / test).value
}
else
} else if (scalaVersion.value startsWith "3") {
Def.task {
(`Test-Akka-2.6` / test).value
}
}
else {
Def.task {
(`Test-Akka-2.5` / test).value
(`Test-Akka-2.6` / test).value
}
}
}.value
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import kanela.agent.libs.net.bytebuddy.asm.Advice
import org.slf4j.LoggerFactory

import java.nio.ByteBuffer
import scala.annotation.static
import scala.util.control.NonFatal

class KamonRemoteInstrument(system: ExtendedActorSystem) extends RemoteInstrument {
Expand Down Expand Up @@ -85,12 +86,12 @@ object CaptureCurrentInboundEnvelope {
}

@Advice.OnMethodEnter
def enter(@Advice.Argument(0) inboundEnvelope: InboundEnvelope): Unit = {
@static def enter(@Advice.Argument(0) inboundEnvelope: InboundEnvelope): Unit = {
CurrentInboundEnvelope.set(inboundEnvelope)
}

@Advice.OnMethodExit
def exit(): Unit = {
@static def exit(): Unit = {
CurrentInboundEnvelope.remove()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.implementation.bind.annotation.Argument
import org.slf4j.LoggerFactory

import scala.annotation.static
import scala.util.control.NonFatal

class ActorMonitorInstrumentation extends InstrumentationBuilder with VersionFiltering {
Expand All @@ -19,15 +20,15 @@ class ActorMonitorInstrumentation extends InstrumentationBuilder with VersionFil
* so we're forced to extract the original message type.
*/
onSubTypesOf("kamon.instrumentation.akka.instrumentations.ActorMonitor")
.intercept(method("extractMessageClass"), MessageClassAdvice)
.intercept(method("extractMessageClass"), classOf[MessageClassAdvice])
}
}

class MessageClassAdvice
object MessageClassAdvice {
private val logger = LoggerFactory.getLogger(classOf[MessageClassAdvice])

def extractMessageClass(@Argument(0) envelope: Envelope): String = {
@static def extractMessageClass(@Argument(0) envelope: Envelope): String = {
try {
envelope.message match {
case message: WrappedMessage => ActorCellInfo.simpleClassName(message.message.getClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package kamon.instrumentation.akka.instrumentations.akka_26

import java.util.concurrent.{AbstractExecutorService, Callable, ExecutorService, ThreadFactory, TimeUnit}

import akka.dispatch.{DefaultExecutorServiceConfigurator, DispatcherPrerequisites, Dispatchers, ExecutorServiceFactory, ExecutorServiceFactoryProvider, ForkJoinExecutorConfigurator, PinnedDispatcherConfigurator, ThreadPoolExecutorConfigurator}
import kamon.instrumentation.akka.instrumentations.VersionFiltering
import kamon.Kamon
Expand All @@ -29,6 +28,8 @@ import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice
import kanela.agent.libs.net.bytebuddy.implementation.bind.annotation.{Argument, SuperCall, This}

import scala.annotation.static

class DispatcherInstrumentation extends InstrumentationBuilder with VersionFiltering {

onAkka("2.6", "2.7") {
Expand All @@ -41,7 +42,7 @@ class DispatcherInstrumentation extends InstrumentationBuilder with VersionFilte
onSubTypesOf("akka.dispatch.ExecutorServiceFactory")
.mixin(classOf[HasDispatcherPrerequisites.Mixin])
.mixin(classOf[HasDispatcherName.Mixin])
.intercept(method("createExecutorService"), InstrumentNewExecutorServiceOnAkka26)
.intercept(method("createExecutorService"), classOf[InstrumentNewExecutorServiceOnAkka26])

/**
* First step on getting the Actor System name is to read it from the prerequisites instance passed to the
Expand Down Expand Up @@ -77,10 +78,11 @@ class DispatcherInstrumentation extends InstrumentationBuilder with VersionFilte

}

class CaptureDispatcherPrerequisitesOnExecutorConfigurator
object CaptureDispatcherPrerequisitesOnExecutorConfigurator {

@Advice.OnMethodExit(suppress = classOf[Throwable])
def exit(@Advice.This configurator: Any, @Advice.Argument(1) prerequisites: DispatcherPrerequisites): Unit = {
@static def exit(@Advice.This configurator: Any, @Advice.Argument(1) prerequisites: DispatcherPrerequisites): Unit = {
configurator match {
case fjec: ForkJoinExecutorConfigurator => fjec.asInstanceOf[HasDispatcherPrerequisites].setDispatcherPrerequisites(prerequisites)
case tpec: ThreadPoolExecutorConfigurator => tpec.threadPoolConfig.asInstanceOf[HasDispatcherPrerequisites].setDispatcherPrerequisites(prerequisites)
Expand All @@ -91,19 +93,21 @@ object CaptureDispatcherPrerequisitesOnExecutorConfigurator {
}
}

class CopyDispatcherInfoToExecutorServiceFactory
object CopyDispatcherInfoToExecutorServiceFactory {

@Advice.OnMethodExit
def exit(@Advice.This poolConfig: HasDispatcherPrerequisites, @Advice.Argument(0) dispatcherName: String, @Advice.Return factory: Any): Unit = {
@static def exit(@Advice.This poolConfig: HasDispatcherPrerequisites, @Advice.Argument(0) dispatcherName: String, @Advice.Return factory: Any): Unit = {
val factoryWithMixins = factory.asInstanceOf[HasDispatcherName with HasDispatcherPrerequisites]
factoryWithMixins.setDispatcherPrerequisites(poolConfig.dispatcherPrerequisites)
factoryWithMixins.setDispatcherName(dispatcherName)
}
}

class InstrumentNewExecutorServiceOnAkka26
object InstrumentNewExecutorServiceOnAkka26 {

def around(@This factory: HasDispatcherPrerequisites with HasDispatcherName, @SuperCall callable: Callable[ExecutorService]): ExecutorService = {
@static def around(@This factory: HasDispatcherPrerequisites with HasDispatcherName, @SuperCall callable: Callable[ExecutorService]): ExecutorService = {
val executor = callable.call()
val actorSystemName = factory.dispatcherPrerequisites.settings.name
val dispatcherName = factory.dispatcherName
Expand All @@ -123,10 +127,11 @@ object InstrumentNewExecutorServiceOnAkka26 {
}
}

class ThreadPoolConfigCopyAdvice
object ThreadPoolConfigCopyAdvice {

@Advice.OnMethodExit
def exit(@Advice.This original: Any, @Advice.Return copy: Any): Unit = {
@static def exit(@Advice.This original: Any, @Advice.Return copy: Any): Unit = {
copy.asInstanceOf[HasDispatcherPrerequisites].setDispatcherPrerequisites(original.asInstanceOf[HasDispatcherPrerequisites].dispatcherPrerequisites)
copy.asInstanceOf[HasDispatcherName].setDispatcherName(original.asInstanceOf[HasDispatcherName].dispatcherName)
}
Expand Down
Loading