diff --git a/instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/FlowOpsMapAsyncAdvice.java b/instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/FlowOpsMapAsyncAdvice.java new file mode 100644 index 000000000..030f708be --- /dev/null +++ b/instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/FlowOpsMapAsyncAdvice.java @@ -0,0 +1,35 @@ +package kamon.instrumentation.akka.http; + +import akka.NotUsed; +import akka.http.scaladsl.model.HttpRequest; +import akka.http.scaladsl.model.HttpResponse; +import akka.stream.scaladsl.Flow; +import kanela.agent.libs.net.bytebuddy.asm.Advice; + +public class FlowOpsMapAsyncAdvice { + + public static class EndpointInfo { + public final String listenInterface; + public final int listenPort; + + public EndpointInfo(String listenInterface, int listenPort) { + this.listenInterface = listenInterface; + this.listenPort = listenPort; + } + } + + public static ThreadLocal currentEndpoint = new ThreadLocal<>(); + + @Advice.OnMethodExit + public static void onExit(@Advice.Return(readOnly = false) akka.stream.scaladsl.FlowOps returnedFlow) { + EndpointInfo bindAndHandlerEndpoint = currentEndpoint.get(); + + if(bindAndHandlerEndpoint != null) { + returnedFlow = ServerFlowWrapper.apply( + (Flow) returnedFlow, + bindAndHandlerEndpoint.listenInterface, + bindAndHandlerEndpoint.listenPort + ); + } + } +} diff --git a/instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2ExtBindAndHandleAdvice.java b/instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2ExtBindAndHandleAdvice.java index 11222a77d..d63769368 100644 --- a/instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2ExtBindAndHandleAdvice.java +++ b/instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2ExtBindAndHandleAdvice.java @@ -29,6 +29,12 @@ public static void onEnter(@Advice.Argument(value = 0, readOnly = false) Functio @Advice.Argument(1) String iface, @Advice.Argument(2) Integer port) { + FlowOpsMapAsyncAdvice.currentEndpoint.set(new FlowOpsMapAsyncAdvice.EndpointInfo(iface, port)); handler = new Http2BlueprintInterceptor.HandlerWithEndpoint(iface, port, handler); } + + @Advice.OnMethodExit + public static void onExit() { + FlowOpsMapAsyncAdvice.currentEndpoint.remove(); + } } diff --git a/instrumentation/kamon-akka-http/src/main/resources/reference.conf b/instrumentation/kamon-akka-http/src/main/resources/reference.conf index db52d2eea..f4c6c54c9 100644 --- a/instrumentation/kamon-akka-http/src/main/resources/reference.conf +++ b/instrumentation/kamon-akka-http/src/main/resources/reference.conf @@ -245,7 +245,9 @@ kanela.modules { within = [ "akka.http.*", - "akka.grpc.internal.*" + "akka.grpc.internal.*", + "akka.stream.scaladsl.Flow", + "akka.stream.scaladsl.FlowOps" ] } } diff --git a/instrumentation/kamon-akka-http/src/main/scala-2.11/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala b/instrumentation/kamon-akka-http/src/main/scala-2.11/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala index 32ff1fd6f..ce8fffb17 100644 --- a/instrumentation/kamon-akka-http/src/main/scala-2.11/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala +++ b/instrumentation/kamon-akka-http/src/main/scala-2.11/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala @@ -77,6 +77,13 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder { .intercept(method("redirect"), classOf[ResolveOperationNameOnRouteInterceptor]) .intercept(method("failWith"), classOf[ResolveOperationNameOnRouteInterceptor]) + /** + * Support for HTTP/1 and HTTP/2 at the same time. + */ + + onType("akka.stream.scaladsl.Flow") + .advise(method("mapAsync"), classOf[FlowOpsMapAsyncAdvice]) + } trait HasMatchingContext { diff --git a/instrumentation/kamon-akka-http/src/main/scala-2.12/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala b/instrumentation/kamon-akka-http/src/main/scala-2.12/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala index 7c33c3f68..2afa0b770 100644 --- a/instrumentation/kamon-akka-http/src/main/scala-2.12/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala +++ b/instrumentation/kamon-akka-http/src/main/scala-2.12/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala @@ -41,7 +41,6 @@ import akka.NotUsed import akka.http.scaladsl.server.RouteResult.Rejected import akka.stream.scaladsl.Flow import kamon.context.Context -import kanela.agent.libs.net.bytebuddy.asm.Advice import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic import scala.collection.immutable @@ -103,6 +102,14 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder { onType("akka.http.scaladsl.Http2Ext") .advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice]) + + /** + * Support for HTTP/1 and HTTP/2 at the same time. + * + */ + + onType("akka.stream.scaladsl.FlowOps") + .advise(method("mapAsync"), classOf[FlowOpsMapAsyncAdvice]) } trait HasMatchingContext { diff --git a/instrumentation/kamon-akka-http/src/main/scala-2.13/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala b/instrumentation/kamon-akka-http/src/main/scala-2.13/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala index 052f5fdf9..563f687c3 100644 --- a/instrumentation/kamon-akka-http/src/main/scala-2.13/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala +++ b/instrumentation/kamon-akka-http/src/main/scala-2.13/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala @@ -25,7 +25,6 @@ import akka.NotUsed import akka.http.scaladsl.server.RouteResult.Rejected import akka.stream.scaladsl.Flow import kamon.context.Context -import kanela.agent.libs.net.bytebuddy.asm.Advice import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic import scala.collection.immutable @@ -86,6 +85,13 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder { onType("akka.http.scaladsl.Http2Ext") .advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice]) + /** + * Support for HTTP/1 and HTTP/2 at the same time. + * + */ + + onType("akka.stream.scaladsl.FlowOps") + .advise(method("mapAsync"), classOf[FlowOpsMapAsyncAdvice]) } trait HasMatchingContext { diff --git a/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerTracingSpec.scala b/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerTracingSpec.scala index 92831dcd9..00a5cefc8 100644 --- a/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerTracingSpec.scala +++ b/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerTracingSpec.scala @@ -43,6 +43,11 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala val (sslSocketFactory, trustManager) = clientSSL() val okHttp = new OkHttpClient.Builder() + .sslSocketFactory(sslSocketFactory, trustManager) + .hostnameVerifier(new HostnameVerifier { override def verify(s: String, sslSession: SSLSession): Boolean = true }) + .build() + + val okHttp1ONly = new OkHttpClient.Builder() .sslSocketFactory(sslSocketFactory, trustManager) .protocols(List(Protocol.HTTP_1_1).asJava) .hostnameVerifier(new HostnameVerifier { override def verify(s: String, sslSession: SSLSession): Boolean = true }) @@ -53,10 +58,11 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala val httpWebServer = startServer(interface, 8081, https = false) val httpsWebServer = startServer(interface, 8082, https = true) - testSuite("HTTP", httpWebServer) - testSuite("HTTPS", httpsWebServer) + testSuite("HTTP", httpWebServer, okHttp) + testSuite("HTTPS", httpsWebServer, okHttp) + testSuite("HTTPS with HTTP/1 only clients", httpsWebServer, okHttp1ONly) - def testSuite(httpVersion: String, server: WebServer) = { + def testSuite(httpVersion: String, server: WebServer, client: OkHttpClient) = { val interface = server.interface val port = server.port val protocol = server.protocol @@ -64,11 +70,12 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala s"the Akka HTTP server instrumentation with ${httpVersion}" should { "create a server Span when receiving requests" in { val target = s"$protocol://$interface:$port/$dummyPathOk" - okHttp.newCall(new Request.Builder().url(target).build()).execute() + client.newCall(new Request.Builder().url(target).build()).execute() + eventually(timeout(10 seconds)) { val span = testSpanReporter().nextSpan().value - span.tags.get(plain("http.url")) shouldBe target + span.tags.get(plain("http.url")) should endWith(s"$interface:$port/$dummyPathOk") span.metricTags.get(plain("component")) shouldBe "akka.http.server" span.metricTags.get(plain("http.method")) shouldBe "GET" span.metricTags.get(plainLong("http.status_code")) shouldBe 200L @@ -78,7 +85,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala "return the correct operation name with overloaded route" in { val target = s"$protocol://$interface:$port/some_endpoint" - okHttp.newCall(new Request.Builder() + client.newCall(new Request.Builder() .get() .url(target).build()) .execute() @@ -91,7 +98,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala val path = s"extraction/nested/42/fixed/anchor/32/${UUID.randomUUID().toString}/fixed/44/CafE" val expected = "/extraction/nested/{}/fixed/anchor/{}/{}/fixed/{}/{}" val target = s"$protocol://$interface:$port/$path" - okHttp.newCall(new Request.Builder().url(target).build()).execute() + client.newCall(new Request.Builder().url(target).build()).execute() eventually(timeout(10 seconds)) { val span = testSpanReporter().nextSpan().value @@ -103,7 +110,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala val path = "extraction/segment/special**" val expected = "/extraction/segment/{}" val target = s"$protocol://$interface:$port/$path" - val response = okHttp.newCall(new Request.Builder().url(target).build()).execute() + val response = client.newCall(new Request.Builder().url(target).build()).execute() response.code() shouldBe 200 response.body().string() shouldBe "special**" @@ -118,7 +125,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala val path = "extraction/on-complete/42/more-path" val expected = "/extraction/on-complete/{}/more-path" val target = s"$protocol://$interface:$port/$path" - okHttp.newCall(new Request.Builder().url(target).build()).execute() + client.newCall(new Request.Builder().url(target).build()).execute() eventually(timeout(10 seconds)) { val span = testSpanReporter().nextSpan().value @@ -130,7 +137,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala val path = "extraction/on-success/42/after" val expected = "/extraction/on-success/{}/after" val target = s"$protocol://$interface:$port/$path" - okHttp.newCall(new Request.Builder().url(target).build()).execute() + client.newCall(new Request.Builder().url(target).build()).execute() eventually(timeout(10 seconds)) { val span = testSpanReporter().nextSpan().value @@ -142,7 +149,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala val path = "extraction/complete-or-recover-with/42/after" val expected = "/extraction/complete-or-recover-with/{}/after" val target = s"$protocol://$interface:$port/$path" - okHttp.newCall(new Request.Builder().url(target).build()).execute() + client.newCall(new Request.Builder().url(target).build()).execute() eventually(timeout(10 seconds)) { val span = testSpanReporter().nextSpan().value @@ -154,7 +161,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala val path = "extraction/complete-or-recover-with-success/42/after" val expected = "/extraction/complete-or-recover-with-success/{}" val target = s"$protocol://$interface:$port/$path" - okHttp.newCall(new Request.Builder().url(target).build()).execute() + client.newCall(new Request.Builder().url(target).build()).execute() eventually(timeout(10 seconds)) { val span = testSpanReporter().nextSpan().value @@ -166,7 +173,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala val path = s"v3/user/3/post/3" val expected = "/v3/user/{}/post/{}" val target = s"$protocol://$interface:$port/$path" - okHttp.newCall(new Request.Builder().url(target).build()).execute() + client.newCall(new Request.Builder().url(target).build()).execute() eventually(timeout(10 seconds)) { val span = testSpanReporter().nextSpan().value @@ -177,12 +184,12 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala "change the Span operation name when using the operationName directive" in { val target = s"$protocol://$interface:$port/$traceOk" - okHttp.newCall(new Request.Builder().url(target).build()).execute() + client.newCall(new Request.Builder().url(target).build()).execute() eventually(timeout(10 seconds)) { val span = testSpanReporter().nextSpan().value span.operationName shouldBe "user-supplied-operation" - span.tags.get(plain("http.url")) shouldBe target + span.tags.get(plain("http.url")) should endWith(s"$interface:$port/$traceOk") span.metricTags.get(plain("component")) shouldBe "akka.http.server" span.metricTags.get(plain("http.method")) shouldBe "GET" span.metricTags.get(plainLong("http.status_code")) shouldBe 200L @@ -191,12 +198,12 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala "mark spans as failed when request fails" in { val target = s"$protocol://$interface:$port/$dummyPathError" - okHttp.newCall(new Request.Builder().url(target).build()).execute() + client.newCall(new Request.Builder().url(target).build()).execute() eventually(timeout(10 seconds)) { val span = testSpanReporter().nextSpan().value span.operationName shouldBe s"/$dummyPathError" - span.tags.get(plain("http.url")) shouldBe target + span.tags.get(plain("http.url")) should endWith(s"$interface:$port/$dummyPathError") span.metricTags.get(plain("component")) shouldBe "akka.http.server" span.metricTags.get(plain("http.method")) shouldBe "GET" span.metricTags.get(plainBoolean("error")) shouldBe true @@ -206,12 +213,12 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala "change the operation name to 'unhandled' when the response status code is 404" in { val target = s"$protocol://$interface:$port/unknown-path" - okHttp.newCall(new Request.Builder().url(target).build()).execute() + client.newCall(new Request.Builder().url(target).build()).execute() eventually(timeout(10 seconds)) { val span = testSpanReporter().nextSpan().value span.operationName shouldBe "unhandled" - span.tags.get(plain("http.url")) shouldBe target + span.tags.get(plain("http.url")) should endWith(s"$interface:$port/unknown-path") span.metricTags.get(plain("component")) shouldBe "akka.http.server" span.metricTags.get(plain("http.method")) shouldBe "GET" span.metricTags.get(plainBoolean("error")) shouldBe false @@ -221,7 +228,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala "correctly time entity transfer timings" in { val target = s"$protocol://$interface:$port/$stream" - okHttp.newCall(new Request.Builder().url(target).build()).execute() + client.newCall(new Request.Builder().url(target).build()).execute() val span = eventually(timeout(10 seconds)) { val span = testSpanReporter().nextSpan().value @@ -233,14 +240,14 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala case List(_ @ Mark(_, "http.response.ready")) => } - span.tags.get(plain("http.url")) shouldBe target + span.tags.get(plain("http.url")) should endWith(s"$interface:$port/$stream") span.metricTags.get(plain("component")) shouldBe "akka.http.server" span.metricTags.get(plain("http.method")) shouldBe "GET" } "include the trace-id and keep all user-provided headers in the responses" in { val target = s"$protocol://$interface:$port/extra-header" - val response = okHttp.newCall(new Request.Builder().url(target).build()).execute() + val response = client.newCall(new Request.Builder().url(target).build()).execute() response.headers().names() should contain allOf ( "trace-id", @@ -250,7 +257,7 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala "keep operation names provided by the HTTP Server instrumentation" in { val target = s"$protocol://$interface:$port/name-will-be-changed" - okHttp.newCall(new Request.Builder().url(target).build()).execute() + client.newCall(new Request.Builder().url(target).build()).execute() eventually(timeout(10 seconds)) { val span = testSpanReporter().nextSpan().value