-
Notifications
You must be signed in to change notification settings - Fork 328
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add: instrumentation for apache cxf client (#1335)
- Loading branch information
Showing
14 changed files
with
715 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
99 changes: 99 additions & 0 deletions
99
instrumentation/kamon-apache-cxf/src/main/resources/reference.conf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
# ================================================== # | ||
# kamon Apache CXF client reference configuration # | ||
# ================================================== # | ||
|
||
# Settings to control the CXF Client instrumentation | ||
# | ||
# IMPORTANT: The entire configuration of the CXF Client Instrumentation is based on the constructs provided by the | ||
# Kamon Instrumentation Common library which will always fallback to the settings found under the | ||
# "kamon.instrumentation.http-client.default" path. The default settings have been included here to make them easy to | ||
# find and understand in the context of this project and commented out so that any changes to the default settings | ||
# will actually have effect. | ||
# | ||
kamon.instrumentation.apache.cxf { | ||
|
||
# | ||
# Configuration for CXF context propagation. | ||
# | ||
propagation { | ||
|
||
# Enables or disables HTTP context propagation on this HTTP client instrumentation. Please note that if | ||
# propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can | ||
# be created and reported but will not be linked across boundaries nor take trace identifiers from tags). | ||
#enabled = yes | ||
|
||
# HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default | ||
# configuration for more details on how to configure the detault HTTP context propagation. | ||
#channel = "default" | ||
} | ||
|
||
tracing { | ||
|
||
# Enables HTTP request tracing. When enabled the instrumentation will create Spans for outgoing requests | ||
# and finish them when the response is received from the server. | ||
#enabled = yes | ||
|
||
# Enables collection of span metrics using the `span.processing-time` metric. | ||
#span-metrics = on | ||
|
||
# Select which tags should be included as span and span metric tags. The possible options are: | ||
# - span: the tag is added as a Span tag (i.e. using span.tag(...)) | ||
# - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) | ||
# - off: the tag is not used. | ||
# | ||
tags { | ||
|
||
# Use the http.url tag. | ||
#url = span | ||
|
||
# Use the http.method tag. | ||
#method = metric | ||
|
||
# Use the http.status_code tag. | ||
#status-code = metric | ||
|
||
# Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type | ||
# tag from the context into the HTTP Client Span created by the instrumentation, the following configuration | ||
# should be added: | ||
# | ||
# from-context { | ||
# customer_type = span | ||
# } | ||
# | ||
from-context { | ||
|
||
} | ||
} | ||
|
||
operations { | ||
|
||
# The default operation name to be used when creating Spans to handle the HTTP client requests. The HTTP | ||
# Client instrumentation will always try to use the HTTP Operation Name Generator configured below to get | ||
# a name, but if it fails to generate it then this name will be used. | ||
default = "apache.cxf.client" | ||
|
||
# FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms: | ||
# - hostname: Uses the request Host as the operation name. | ||
# - method: Uses the request HTTP method as the operation name. | ||
# | ||
#name-generator = "method" | ||
} | ||
} | ||
|
||
} | ||
|
||
kanela { | ||
modules { | ||
apache-cxf { | ||
name = "Apache CXF Client" | ||
description = "Provides tracing of client calls made with the official Apache CXF library." | ||
instrumentations = [ | ||
"kamon.instrumentation.apache.cxf.client.ClientProxyFactoryBeanInstrumentation" | ||
] | ||
enabled = true | ||
within = [ | ||
"org.apache.cxf..*", | ||
] | ||
} | ||
} | ||
} |
103 changes: 103 additions & 0 deletions
103
...he-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ApacheCxfClientHelper.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package kamon.instrumentation.apache.cxf.client | ||
|
||
import kamon.instrumentation.http.HttpMessage | ||
import org.apache.cxf.message.Message | ||
import org.apache.cxf.message.Message.{HTTP_REQUEST_METHOD, PROTOCOL_HEADERS, RESPONSE_CODE} | ||
import org.slf4j.LoggerFactory | ||
|
||
import java.net.{URI, URISyntaxException} | ||
import java.util.Collections.{emptyMap => jEmptyMap, singletonList => jList} | ||
import java.util.{List => JList, Map => JMap} | ||
import scala.collection.mutable | ||
import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsJava, MapHasAsScala} | ||
|
||
class ApacheCxfClientHelper | ||
|
||
object ApacheCxfClientHelper { | ||
|
||
private val _logger = LoggerFactory.getLogger(classOf[ApacheCxfClientHelper]) | ||
|
||
def toRequestBuilder(request: Message): HttpMessage.RequestBuilder[Message] = | ||
new RequestReader with HttpMessage.RequestBuilder[Message] { | ||
|
||
val delegate: Message = request | ||
|
||
val uri: URI = getUri(request) | ||
|
||
override def write(header: String, value: String): Unit = { | ||
val headers: mutable.Map[String, String] = getAllHeaders(delegate).to(mutable.Map) | ||
headers.put(header, value) | ||
delegate.put(Message.PROTOCOL_HEADERS, headers.map(m => m._1 -> jList(m._2)).toMap.asJava) | ||
} | ||
|
||
override def build(): Message = { | ||
_logger.trace("Prepared request for instrumentation: {}", this) | ||
delegate | ||
} | ||
|
||
override def toString(): String = s"RequestReader(host=$host,port=$port,method=$method,path=$path)" | ||
} | ||
|
||
def toResponse(message: Message): HttpMessage.Response = new HttpMessage.Response { | ||
override def statusCode: Int = message.get(RESPONSE_CODE) match { | ||
case code: Integer => code | ||
case _ => | ||
_logger.debug("Not able to retrieve status code from response") | ||
-1 | ||
} | ||
} | ||
private def getUri(message: Message): URI = | ||
try { | ||
getUriAsString(message).map(s => new URI(s)).orNull | ||
} catch { | ||
case e: URISyntaxException => throw new RuntimeException(e.getMessage, e) | ||
} | ||
|
||
private def safeGet(message: Message, key: String): Option[String] = | ||
if (message.containsKey(key)) { | ||
message.get(key) match { | ||
case value: String => Option.apply(value) | ||
case _ => Option.empty | ||
} | ||
} else Option.empty | ||
|
||
private def getUriAsString(message: Message): Option[String] = { | ||
val requestUrl: Option[String] = safeGet(message, Message.REQUEST_URL).orElse { | ||
var address = safeGet(message, Message.ENDPOINT_ADDRESS) | ||
val requestUri = safeGet(message, Message.REQUEST_URI) | ||
if (requestUri.exists(r => r.startsWith("/"))) { | ||
if (!address.exists(a => a.startsWith(requestUri.get))) { | ||
if (address.exists(t => t.endsWith("/") && t.length > 1)) { | ||
address = address.map(a => a.substring(0, a.length)) | ||
} | ||
address.map(a => a + requestUri.getOrElse("")) | ||
} else requestUri | ||
} else address | ||
} | ||
safeGet(message, Message.QUERY_STRING).map(q => requestUrl.map(u => s"$u?$q")).getOrElse(requestUrl) | ||
} | ||
|
||
private def getAllHeaders(message: Message): Map[String, String] = (message.get(PROTOCOL_HEADERS) match { | ||
case hs: JMap[String, JList[String]] => hs | ||
case _ => jEmptyMap[String, JList[String]]() | ||
}).asScala.map { case (key, values) => key -> values.asScala.mkString(", ") }.toMap | ||
|
||
private trait RequestReader extends HttpMessage.Request { | ||
def uri: URI | ||
def delegate: Message | ||
|
||
override def host: String = if (uri != null) uri.getHost else null | ||
|
||
override def port: Int = if (uri != null) uri.getPort else 0 | ||
|
||
override def method: String = delegate.get(HTTP_REQUEST_METHOD).asInstanceOf[String] | ||
|
||
override def path: String = if (uri != null) uri.getPath else null | ||
|
||
override def read(header: String): Option[String] = getAllHeaders(delegate).get(header) | ||
|
||
override def readAll(): Map[String, String] = getAllHeaders(delegate) | ||
|
||
override def url: String = if (uri != null) uri.toString else null | ||
} | ||
} |
32 changes: 32 additions & 0 deletions
32
...scala/kamon/instrumentation/apache/cxf/client/ClientProxyFactoryBeanInstrumentation.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package kamon.instrumentation.apache.cxf.client | ||
|
||
import kamon.Kamon | ||
import kamon.instrumentation.http.HttpClientInstrumentation | ||
import kamon.instrumentation.http.HttpClientInstrumentation.RequestHandler | ||
import kanela.agent.api.instrumentation.InstrumentationBuilder | ||
import org.apache.cxf.message.Message | ||
|
||
class ClientProxyFactoryBeanInstrumentation extends InstrumentationBuilder { | ||
|
||
onSubTypesOf("org.apache.cxf.frontend.ClientProxyFactoryBean") | ||
.advise(method("initFeatures"), classOf[TracingClientFeatureInitializer]) | ||
} | ||
|
||
object ClientProxyFactoryBeanInstrumentation { | ||
Kamon.onReconfigure(_ => | ||
ClientProxyFactoryBeanInstrumentation.rebuildHttpClientInstrumentation(): Unit | ||
) | ||
|
||
@volatile var cxfClientInstrumentation: HttpClientInstrumentation = rebuildHttpClientInstrumentation() | ||
|
||
private def rebuildHttpClientInstrumentation(): HttpClientInstrumentation = { | ||
val httpClientConfig = Kamon.config().getConfig("kamon.instrumentation.apache.cxf") | ||
cxfClientInstrumentation = HttpClientInstrumentation.from(httpClientConfig, "apache.cxf.client") | ||
cxfClientInstrumentation | ||
} | ||
|
||
def processResponse(handler: RequestHandler[_], message: Message, t: Throwable = null): Unit = { | ||
if (t != null) handler.span.fail(t).finish() | ||
else handler.processResponse(ApacheCxfClientHelper.toResponse(message)) | ||
} | ||
} |
17 changes: 17 additions & 0 deletions
17
...-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TraceScopeHolder.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package kamon.instrumentation.apache.cxf.client | ||
|
||
import kamon.context.Storage.Scope | ||
import kamon.instrumentation.http.HttpClientInstrumentation.RequestHandler | ||
import org.apache.cxf.message.Message | ||
|
||
import java.io.Closeable | ||
|
||
private class TraceScopeHolder(val traceScope: Option[TraceScope], val detached: Boolean = false) extends Serializable | ||
|
||
private case class TraceScope(handler: RequestHandler[Message], scope: Option[Scope]) extends Closeable { | ||
|
||
override def close(): Unit = { | ||
if (handler != null && handler.span != null) handler.span.finish() | ||
if (scope.nonEmpty) scope.get.close() | ||
} | ||
} |
22 changes: 22 additions & 0 deletions
22
...che-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeature.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package kamon.instrumentation.apache.cxf.client | ||
|
||
import org.apache.cxf.Bus | ||
import org.apache.cxf.feature.AbstractFeature | ||
import org.apache.cxf.interceptor.InterceptorProvider | ||
|
||
private class TracingClientFeature extends AbstractFeature { | ||
|
||
private val setup = new TracingClientSetupInterceptor() | ||
private val receive = new TracingClientReceiveInterceptor() | ||
private val postInvoke = new TracingClientPostInvokeInterceptor() | ||
|
||
override def initializeProvider(provider: InterceptorProvider, bus: Bus): Unit = { | ||
provider.getOutInterceptors.add(0, setup) | ||
provider.getOutFaultInterceptors.add(0, setup) | ||
provider.getInInterceptors.add(0, receive) | ||
provider.getInInterceptors.add(postInvoke) | ||
provider.getInFaultInterceptors.add(0, receive) | ||
provider.getInFaultInterceptors.add(postInvoke) | ||
super.initializeProvider(provider, bus) | ||
} | ||
} |
16 changes: 16 additions & 0 deletions
16
.../main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeatureInitializer.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package kamon.instrumentation.apache.cxf.client | ||
|
||
import kanela.agent.libs.net.bytebuddy.asm.Advice | ||
import kanela.agent.libs.net.bytebuddy.asm.Advice.This | ||
import org.apache.cxf.frontend.ClientProxyFactoryBean | ||
|
||
import scala.annotation.static | ||
|
||
class TracingClientFeatureInitializer | ||
object TracingClientFeatureInitializer { | ||
|
||
@Advice.OnMethodEnter | ||
@static def onEnter(@This clientProxyFactoryBean: Any) = clientProxyFactoryBean match { | ||
case c: ClientProxyFactoryBean => c.getFeatures.add(new TracingClientFeature) | ||
} | ||
} |
Oops, something went wrong.