Skip to content

Commit

Permalink
initial commit for add-pekko-grpc-instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
inobu committed Oct 20, 2023
1 parent a96a353 commit 1377664
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package kamon.instrumentation.akka.grpc;

import akka.http.javadsl.model.HttpEntity;
import kamon.Kamon;
import kamon.context.Context;
import kanela.agent.libs.net.bytebuddy.asm.Advice;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

public class AkkaGRPCUnmarshallingContextPropagation {

@Advice.OnMethodExit()
public static void onExit(
@Advice.Return(readOnly = false) CompletionStage<?> returnValue,
@Advice.Argument(0) Object firstArgument) {

if(firstArgument instanceof HttpEntity && returnValue instanceof CompletableFuture) {
final Context currentContext = Kamon.currentContext();

// NOTES: The wrapper is only overriding thenCompose because it is the only function that gets called
// after GrpcMarshalling.unmarshall in the auto-generated HandlerFactory for gRPC services. In
// the future this might be removed if we instrument CompletionStage directly.
returnValue = new ContextPropagatingCompletionStage<>((CompletableFuture) returnValue, currentContext);
}
}


public static class ContextPropagatingCompletionStage<T> extends CompletableFuture<T> {
private final CompletableFuture<T> wrapped;
private final Context context;

public ContextPropagatingCompletionStage(CompletableFuture<T> wrapped, Context context) {
this.wrapped = wrapped;
this.context = context;
}

@Override
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
Function<? super T, ? extends CompletionStage<U>> wrapperFunction = (t) -> {
return Kamon.runWithContext(context, () -> fn.apply(t));
};

return wrapped.thenCompose(wrapperFunction);
}
}

}
20 changes: 20 additions & 0 deletions instrumentation/kamon-pekko-grpc/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# ======================================= #
# Kamon-Akka-gRPC Reference Configuration #
# ======================================= #

kanela.modules {
akka-grpc {
name = "Akka gRPC Instrumentation"
description = "Context propagation and tracing for Akka gRPC"
enabled = yes

instrumentations = [
"kamon.instrumentation.akka.grpc.AkkaGrpcServerInstrumentation"
]

within = [
"^akka.grpc.internal..*",
"^akka.grpc.javadsl.GrpcMarshalling$"
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kamon.instrumentation.akka.grpc

import kamon.Kamon
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice

class AkkaGrpcServerInstrumentation extends InstrumentationBuilder {

/**
* Support for Akka gRPC servers.
*
* gRPC requests get their spans started by the ServerFlowWrapper in the Akka HTTP instrumentation like any other
* requests, but they never go through any instrumentation that gives a good operation name to the Span and forces
* taking a sampling decision.
*
* This instrumentation gives a proper name and tags to the span when it matches one of the exposed services,
* otherwise the span remains unchanged. Assumes no actual implementation of `akka.grpc.internal.TelemetrySpi` is
* configured.
*/
onType("akka.grpc.internal.NoOpTelemetry$")
.advise(method("onRequest"), AkkaGRPCServerRequestHandler)


onType("akka.grpc.javadsl.GrpcMarshalling")
.advise(method("unmarshal"), classOf[AkkaGRPCUnmarshallingContextPropagation])
}

object AkkaGRPCServerRequestHandler {

@Advice.OnMethodEnter()
def enter(@Advice.Argument(0) serviceName: String, @Advice.Argument(1) method: String): Unit = {
val fullSpanName = serviceName + "/" + method
Kamon.currentSpan()
.name(fullSpanName)
.tagMetrics("component", "akka.grpc.server")
.tagMetrics("rpc.system", "grpc")
.tagMetrics("rpc.service", serviceName)
.tagMetrics("rpc.method", method)
.takeSamplingDecision()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "kamon.instrumentation.akka.grpc";
option java_outer_classname = "HelloWorldProto";

package helloworld;

////////////////////////////////////// The greeting service definition.
service GreeterService {
//////////////////////
// Sends a greeting //
////////*****/////////
// HELLO //
////////*****/////////
rpc SayHello (HelloRequest) returns (HelloReply) {}

// Comment spanning
// on several lines
rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}

/*
* C style comments
*/
rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}

/* C style comments
* on several lines
* with non-empty heading/trailing line */
rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
akka.http.server.preview.enable-http2 = on
12 changes: 12 additions & 0 deletions instrumentation/kamon-pekko-grpc/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<configuration>
<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kamon.instrumentation.akka.grpc

import scala.concurrent.Future
import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source


class GreeterServiceImpl(implicit mat: Materializer) extends GreeterService {
import mat.executionContext

override def sayHello(in: HelloRequest): Future[HelloReply] = {
Future.successful(HelloReply(s"Hello, ${in.name}"))
}

override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = {
in.runWith(Sink.seq).map(elements => HelloReply(s"Hello, ${elements.map(_.name).mkString(", ")}"))
}

override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
Source(s"Hello, ${in.name}".toList).map(character => HelloReply(character.toString))
}

override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = {
in.map(request => HelloReply(s"Hello, ${request.name}"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kamon.instrumentation.pekko.grpc

import akka.actor.ActorSystem
import akka.grpc.GrpcClientSettings
import akka.http.scaladsl.Http
import kamon.tag.Lookups.plain
import kamon.testkit.{InitAndStopKamonAfterAll, TestSpanReporter}
import org.scalatest.OptionValues
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import scala.concurrent.duration._

class AkkaGrpcTracingSpec extends AnyWordSpec with InitAndStopKamonAfterAll with Matchers with Eventually
with TestSpanReporter with OptionValues {

implicit val system = ActorSystem("akka-grpc-instrumentation")
implicit val ec = system.dispatcher

val greeterService = GreeterServiceHandler(new GreeterServiceImpl())
val serverBinding = Http()
.newServerAt("127.0.0.1", 8598)
.bind(greeterService)


val client = GreeterServiceClient(GrpcClientSettings.connectToServiceAt("127.0.0.1", 8598).withTls(false))

"the Akka gRPC instrumentation" should {
"create spans for the server-side" in {
client.sayHello(HelloRequest("kamon"))

eventually(timeout(5 seconds)) {
val span = testSpanReporter().nextSpan().value
span.operationName shouldBe "helloworld.GreeterService/SayHello"
span.metricTags.get(plain("component")) shouldBe "akka.grpc.server"
span.metricTags.get(plain("rpc.system")) shouldBe "grpc"
span.metricTags.get(plain("rpc.service")) shouldBe "helloworld.GreeterService"
span.metricTags.get(plain("rpc.method")) shouldBe "SayHello"
}
}
}

override protected def beforeAll(): Unit = {
super.beforeAll()
enableFastSpanFlushing()
}
}

0 comments on commit 1377664

Please sign in to comment.