From 13776646475226de5805a4ac5dc4d9678fe4805d Mon Sep 17 00:00:00 2001 From: inobu Date: Fri, 20 Oct 2023 18:45:15 +0900 Subject: [PATCH] initial commit for add-pekko-grpc-instrumentation --- ...koGRPCUnmarshallingContextPropagation.java | 49 ++++++++++++++ .../src/main/resources/reference.conf | 20 ++++++ .../grpc/PekkoGrpcServerInstrumentation.scala | 57 +++++++++++++++++ .../src/test/protobuf/helloworld.proto | 41 ++++++++++++ .../src/test/resources/application.conf | 1 + .../src/test/resources/logback.xml | 12 ++++ .../pekko/grpc/GreeterServiceImpl.scala | 44 +++++++++++++ .../pekko/grpc/PekkoGrpcTracingSpec.scala | 64 +++++++++++++++++++ 8 files changed, 288 insertions(+) create mode 100644 instrumentation/kamon-pekko-grpc/src/main/java/kamon/instrumentation/pekko/grpc/PekkoGRPCUnmarshallingContextPropagation.java create mode 100644 instrumentation/kamon-pekko-grpc/src/main/resources/reference.conf create mode 100644 instrumentation/kamon-pekko-grpc/src/main/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcServerInstrumentation.scala create mode 100644 instrumentation/kamon-pekko-grpc/src/test/protobuf/helloworld.proto create mode 100644 instrumentation/kamon-pekko-grpc/src/test/resources/application.conf create mode 100644 instrumentation/kamon-pekko-grpc/src/test/resources/logback.xml create mode 100644 instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/GreeterServiceImpl.scala create mode 100644 instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcTracingSpec.scala diff --git a/instrumentation/kamon-pekko-grpc/src/main/java/kamon/instrumentation/pekko/grpc/PekkoGRPCUnmarshallingContextPropagation.java b/instrumentation/kamon-pekko-grpc/src/main/java/kamon/instrumentation/pekko/grpc/PekkoGRPCUnmarshallingContextPropagation.java new file mode 100644 index 000000000..a776da65f --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/main/java/kamon/instrumentation/pekko/grpc/PekkoGRPCUnmarshallingContextPropagation.java @@ -0,0 +1,49 @@ +package kamon.instrumentation.akka.grpc; + +import akka.http.javadsl.model.HttpEntity; +import kamon.Kamon; +import kamon.context.Context; +import kanela.agent.libs.net.bytebuddy.asm.Advice; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; + +public class AkkaGRPCUnmarshallingContextPropagation { + + @Advice.OnMethodExit() + public static void onExit( + @Advice.Return(readOnly = false) CompletionStage returnValue, + @Advice.Argument(0) Object firstArgument) { + + if(firstArgument instanceof HttpEntity && returnValue instanceof CompletableFuture) { + final Context currentContext = Kamon.currentContext(); + + // NOTES: The wrapper is only overriding thenCompose because it is the only function that gets called + // after GrpcMarshalling.unmarshall in the auto-generated HandlerFactory for gRPC services. In + // the future this might be removed if we instrument CompletionStage directly. + returnValue = new ContextPropagatingCompletionStage<>((CompletableFuture) returnValue, currentContext); + } + } + + + public static class ContextPropagatingCompletionStage extends CompletableFuture { + private final CompletableFuture wrapped; + private final Context context; + + public ContextPropagatingCompletionStage(CompletableFuture wrapped, Context context) { + this.wrapped = wrapped; + this.context = context; + } + + @Override + public CompletableFuture thenCompose(Function> fn) { + Function> wrapperFunction = (t) -> { + return Kamon.runWithContext(context, () -> fn.apply(t)); + }; + + return wrapped.thenCompose(wrapperFunction); + } + } + +} diff --git a/instrumentation/kamon-pekko-grpc/src/main/resources/reference.conf b/instrumentation/kamon-pekko-grpc/src/main/resources/reference.conf new file mode 100644 index 000000000..9f5880d6e --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/main/resources/reference.conf @@ -0,0 +1,20 @@ +# ======================================= # +# Kamon-Akka-gRPC Reference Configuration # +# ======================================= # + +kanela.modules { + akka-grpc { + name = "Akka gRPC Instrumentation" + description = "Context propagation and tracing for Akka gRPC" + enabled = yes + + instrumentations = [ + "kamon.instrumentation.akka.grpc.AkkaGrpcServerInstrumentation" + ] + + within = [ + "^akka.grpc.internal..*", + "^akka.grpc.javadsl.GrpcMarshalling$" + ] + } +} diff --git a/instrumentation/kamon-pekko-grpc/src/main/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcServerInstrumentation.scala b/instrumentation/kamon-pekko-grpc/src/main/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcServerInstrumentation.scala new file mode 100644 index 000000000..800b67b62 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/main/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcServerInstrumentation.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2013-2021 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.akka.grpc + +import kamon.Kamon +import kanela.agent.api.instrumentation.InstrumentationBuilder +import kanela.agent.libs.net.bytebuddy.asm.Advice + +class AkkaGrpcServerInstrumentation extends InstrumentationBuilder { + + /** + * Support for Akka gRPC servers. + * + * gRPC requests get their spans started by the ServerFlowWrapper in the Akka HTTP instrumentation like any other + * requests, but they never go through any instrumentation that gives a good operation name to the Span and forces + * taking a sampling decision. + * + * This instrumentation gives a proper name and tags to the span when it matches one of the exposed services, + * otherwise the span remains unchanged. Assumes no actual implementation of `akka.grpc.internal.TelemetrySpi` is + * configured. + */ + onType("akka.grpc.internal.NoOpTelemetry$") + .advise(method("onRequest"), AkkaGRPCServerRequestHandler) + + + onType("akka.grpc.javadsl.GrpcMarshalling") + .advise(method("unmarshal"), classOf[AkkaGRPCUnmarshallingContextPropagation]) +} + +object AkkaGRPCServerRequestHandler { + + @Advice.OnMethodEnter() + def enter(@Advice.Argument(0) serviceName: String, @Advice.Argument(1) method: String): Unit = { + val fullSpanName = serviceName + "/" + method + Kamon.currentSpan() + .name(fullSpanName) + .tagMetrics("component", "akka.grpc.server") + .tagMetrics("rpc.system", "grpc") + .tagMetrics("rpc.service", serviceName) + .tagMetrics("rpc.method", method) + .takeSamplingDecision() + } +} diff --git a/instrumentation/kamon-pekko-grpc/src/test/protobuf/helloworld.proto b/instrumentation/kamon-pekko-grpc/src/test/protobuf/helloworld.proto new file mode 100644 index 000000000..e4d4cef79 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/test/protobuf/helloworld.proto @@ -0,0 +1,41 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "kamon.instrumentation.akka.grpc"; +option java_outer_classname = "HelloWorldProto"; + +package helloworld; + +////////////////////////////////////// The greeting service definition. +service GreeterService { + ////////////////////// + // Sends a greeting // + ////////*****///////// + // HELLO // + ////////*****///////// + rpc SayHello (HelloRequest) returns (HelloReply) {} + + // Comment spanning + // on several lines + rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {} + + /* + * C style comments + */ + rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {} + + /* C style comments + * on several lines + * with non-empty heading/trailing line */ + rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} \ No newline at end of file diff --git a/instrumentation/kamon-pekko-grpc/src/test/resources/application.conf b/instrumentation/kamon-pekko-grpc/src/test/resources/application.conf new file mode 100644 index 000000000..3371d0c8c --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/test/resources/application.conf @@ -0,0 +1 @@ +akka.http.server.preview.enable-http2 = on \ No newline at end of file diff --git a/instrumentation/kamon-pekko-grpc/src/test/resources/logback.xml b/instrumentation/kamon-pekko-grpc/src/test/resources/logback.xml new file mode 100644 index 000000000..742815603 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/test/resources/logback.xml @@ -0,0 +1,12 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/GreeterServiceImpl.scala b/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/GreeterServiceImpl.scala new file mode 100644 index 000000000..c29d58d14 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/GreeterServiceImpl.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2013-2021 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.akka.grpc + +import scala.concurrent.Future +import akka.NotUsed +import akka.stream.Materializer +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + + +class GreeterServiceImpl(implicit mat: Materializer) extends GreeterService { + import mat.executionContext + + override def sayHello(in: HelloRequest): Future[HelloReply] = { + Future.successful(HelloReply(s"Hello, ${in.name}")) + } + + override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = { + in.runWith(Sink.seq).map(elements => HelloReply(s"Hello, ${elements.map(_.name).mkString(", ")}")) + } + + override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = { + Source(s"Hello, ${in.name}".toList).map(character => HelloReply(character.toString)) + } + + override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = { + in.map(request => HelloReply(s"Hello, ${request.name}")) + } +} diff --git a/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcTracingSpec.scala b/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcTracingSpec.scala new file mode 100644 index 000000000..c18fc5100 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcTracingSpec.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2013-2021 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.pekko.grpc + +import akka.actor.ActorSystem +import akka.grpc.GrpcClientSettings +import akka.http.scaladsl.Http +import kamon.tag.Lookups.plain +import kamon.testkit.{InitAndStopKamonAfterAll, TestSpanReporter} +import org.scalatest.OptionValues +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import scala.concurrent.duration._ + +class AkkaGrpcTracingSpec extends AnyWordSpec with InitAndStopKamonAfterAll with Matchers with Eventually + with TestSpanReporter with OptionValues { + + implicit val system = ActorSystem("akka-grpc-instrumentation") + implicit val ec = system.dispatcher + + val greeterService = GreeterServiceHandler(new GreeterServiceImpl()) + val serverBinding = Http() + .newServerAt("127.0.0.1", 8598) + .bind(greeterService) + + + val client = GreeterServiceClient(GrpcClientSettings.connectToServiceAt("127.0.0.1", 8598).withTls(false)) + + "the Akka gRPC instrumentation" should { + "create spans for the server-side" in { + client.sayHello(HelloRequest("kamon")) + + eventually(timeout(5 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe "helloworld.GreeterService/SayHello" + span.metricTags.get(plain("component")) shouldBe "akka.grpc.server" + span.metricTags.get(plain("rpc.system")) shouldBe "grpc" + span.metricTags.get(plain("rpc.service")) shouldBe "helloworld.GreeterService" + span.metricTags.get(plain("rpc.method")) shouldBe "SayHello" + } + } + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + enableFastSpanFlushing() + } +}