Skip to content

Commit

Permalink
solution: metrics to monitor Websocket connection time and queue size
Browse files Browse the repository at this point in the history
  • Loading branch information
splix committed May 5, 2023
1 parent 9f9edf5 commit 7fd81f9
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ open class ConfiguredUpstreams(
Tag.of("chain", (Global.chainById(config.blockchain).chainCode))
)
val metrics = RpcMetrics(
metricsTags,
timer = Timer.builder("upstream.rpc.conn")
.description("Request time through a HTTP JSON RPC connection")
.tags(metricsTags)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright (c) 2023 EmeraldPay, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.upstream.rpcclient.RpcMetrics
import java.time.Duration
import java.time.Instant
import java.util.concurrent.atomic.AtomicReference
import java.util.function.Consumer

/**
* A consumer for a connection events that records the connection time metrics
*/
class ConnectionTimeMonitoring(
val metrics: RpcMetrics,
) : Consumer<WsConnection.ConnectionStatus> {

private val connectedAt = AtomicReference<Instant?>(null)

val connectionTime: Duration?
get() = connectedAt.get()?.let {
Duration.between(it, Instant.now()).coerceAtLeast(Duration.ofMillis(0))
}

override fun accept(t: WsConnection.ConnectionStatus) {
if (t == WsConnection.ConnectionStatus.CONNECTED) {
connectedAt.set(Instant.now())
} else if (t == WsConnection.ConnectionStatus.DISCONNECTED) {
connectionTime?.let(metrics::recordConnectionTime)
connectedAt.set(null)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ open class EthereumWsFactory(
)

RpcMetrics(
metricsTags,
timer = Timer.builder("upstream.ws.conn")
.description("Request time through a WebSocket JSON RPC connection")
.tags(metricsTags)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ open class WsConnectionImpl(
private var keepConnection = true
private var connection: Disposable? = null
private val reconnecting = AtomicBoolean(false)
private var onConnectionChange: Consumer<WsConnection.ConnectionStatus>? = null
private var onConnectionChange: Consumer<WsConnection.ConnectionStatus> = defaultOnConnectionChange()

/**
* true when the connection is actively receiving messages
Expand All @@ -130,8 +130,18 @@ open class WsConnectionImpl(
override val isConnected: Boolean
get() = connection != null && !reconnecting.get() && active

private fun defaultOnConnectionChange(): Consumer<WsConnection.ConnectionStatus> {
return rpcMetrics?.let {
ConnectionTimeMonitoring(it)
} ?: Consumer { }
}

override fun onConnectionChange(handler: Consumer<WsConnection.ConnectionStatus>?) {
this.onConnectionChange = handler
this.onConnectionChange = if (handler != null) {
handler.andThen(defaultOnConnectionChange())
} else {
defaultOnConnectionChange()
}
}

fun setReconnectIntervalSeconds(value: Long) {
Expand Down Expand Up @@ -192,7 +202,7 @@ open class WsConnectionImpl(
.doOnDisconnected {
active = false
disconnects.tryEmitNext(Instant.now())
this.onConnectionChange?.accept(WsConnection.ConnectionStatus.DISCONNECTED)
this.onConnectionChange.accept(WsConnection.ConnectionStatus.DISCONNECTED)
log.info("Disconnected from $uri")
if (keepConnection) {
tryReconnectLater()
Expand Down Expand Up @@ -235,13 +245,13 @@ open class WsConnectionImpl(
)
.uri(uri)
.handle { inbound, outbound ->
this.onConnectionChange?.accept(WsConnection.ConnectionStatus.CONNECTED)
this.onConnectionChange.accept(WsConnection.ConnectionStatus.CONNECTED)
// mark as active once connected, because the actual message wouldn't come until a request is sent
active = true
handle(inbound, outbound)
}
.onErrorResume { t ->
log.debug("Dropping WS connection to $uri. Error: ${t.message}")
log.debug("Dropping WS connection to {}. Error: {}", uri, t.message)
Mono.empty<Void>()
}
.subscribe()
Expand Down Expand Up @@ -374,6 +384,7 @@ open class WsConnectionImpl(
val internalId = request.id.toLong()
val onResponse = Sinks.one<JsonRpcResponse>()
currentRequests[internalId.toInt()] = onResponse
rpcMetrics?.onMessageEnqueued()

// a default response when nothing is received back from WS after a timeout
val noResponse = JsonRpcException(
Expand Down Expand Up @@ -407,7 +418,10 @@ open class WsConnectionImpl(
.switchIfEmpty(
Mono.fromCallable { log.warn("No response for ${request.method} ${request.params}") }.then(Mono.error(noResponse))
)
.doFinally { currentRequests.remove(internalId.toInt()) }
.doFinally {
currentRequests.remove(internalId.toInt())
rpcMetrics?.onMessageFinished()
}
}

override fun close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ class GrpcUpstreams(
)

val metrics = RpcMetrics(
metricsTags,
timer = Timer.builder("upstream.grpc.conn")
.description("Request time through a Dshackle/gRPC connection")
.tags(metricsTags)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,15 @@ class JsonRpcHttpClient(
bytes.aggregate().asByteArray().map {
Tuples.of(statusCode, it)
}
}.doFinally {
metrics.onMessageFinished()
}.single()
}

override fun read(key: JsonRpcRequest): Mono<JsonRpcResponse> {
val startTime = StopWatch()
return Mono.just(key)
.doOnSubscribe { metrics.onMessageEnqueued() }
.map(JsonRpcRequest::toJson)
.doOnNext { startTime.start() }
.flatMap(this@JsonRpcHttpClient::execute)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@ package io.emeraldpay.dshackle.upstream.rpcclient

import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.DistributionSummary
import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.Metrics
import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.Timer
import reactor.core.publisher.Mono
import reactor.netty.ChannelPipelineConfigurer
import reactor.netty.channel.ChannelMetricsRecorder
import reactor.netty.channel.ChannelOperations
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Function

class RpcMetrics(
tags: Iterable<Tag>,
val timer: Timer,
val fails: Counter,
val responseSize: DistributionSummary,
Expand All @@ -33,6 +39,12 @@ class RpcMetrics(
val connectionMetrics: ChannelMetricsRecorder,
) {

private val connectionTime = Timer.builder("netty.client.connection_time")
.tags(tags)
.register(Metrics.globalRegistry)

private val queueSize = AtomicInteger(0)

val onChannelInit: ChannelPipelineConfigurer
get() = ChannelPipelineConfigurer { connectionObserver, channel, remoteAddress ->
// See reactor.netty.transport.TransportConfig$TransportChannelInitializer
Expand All @@ -50,4 +62,32 @@ class RpcMetrics(
}
}
}

init {
Gauge.builder("upstream.rpc.queue_size", queueSize.get()::toDouble)
.tags(tags)
.register(Metrics.globalRegistry)
}

/**
* Record the total connection time.
* The difference from #recordConnectTime is that this one is applicable to long
*/
fun recordConnectionTime(time: Duration) {
connectionTime.record(time)
}

/**
* Call when a new request is added to the queue, i.e., is about to be sent to the upstream
*/
fun onMessageEnqueued() {
queueSize.incrementAndGet()
}

/**
* Call when a request it processed, as success, error, timeout or any other status indicating that the request is no longer in queue.
*/
fun onMessageFinished() {
queueSize.decrementAndGet()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class EthereumGrpcUpstreamSpec extends Specification {
MockGrpcServer mockServer = new MockGrpcServer()
ObjectMapper objectMapper = Global.objectMapper
RpcMetrics metrics = new RpcMetrics(
[],
Timer.builder("test1").register(TestingCommons.meterRegistry),
Counter.builder("test2").register(TestingCommons.meterRegistry),
DistributionSummary.builder("test3").register(TestingCommons.meterRegistry),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright (c) 2023 EmeraldPay, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.upstream.rpcclient.RpcMetrics
import io.kotest.core.spec.style.ShouldSpec
import io.kotest.matchers.comparables.shouldBeGreaterThan
import io.kotest.matchers.shouldBe
import io.kotest.matchers.shouldNotBe
import io.mockk.mockk
import io.mockk.verify
import java.time.Duration

class ConnectionTimeMonitoringTest : ShouldSpec({

should("Update time on CONNECT") {
val monitoring = ConnectionTimeMonitoring(mockk())

monitoring.connectionTime shouldBe null
monitoring.accept(WsConnection.ConnectionStatus.CONNECTED)
Thread.sleep(25)
monitoring.connectionTime shouldNotBe null
monitoring.connectionTime!! shouldBeGreaterThan Duration.ofMillis(20)
}

should("Record time on DISCONNECT") {
val rpc = mockk<RpcMetrics>(relaxed = true)
val monitoring = ConnectionTimeMonitoring(rpc)

monitoring.accept(WsConnection.ConnectionStatus.CONNECTED)
Thread.sleep(25)
monitoring.accept(WsConnection.ConnectionStatus.DISCONNECTED)

// erases the time after recording it
monitoring.connectionTime shouldBe null

verify {
rpc.recordConnectionTime(more(Duration.ofMillis(20)))
}
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.instanceOf
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.DistributionSummary
import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.Timer
import org.mockserver.integration.ClientAndServer
import org.mockserver.model.HttpRequest
Expand All @@ -23,6 +24,7 @@ class JsonRpcHttpClientTest : ShouldSpec({

val port = SocketUtils.findAvailableTcpPort(20000)
val metrics = RpcMetrics(
emptyList<Tag>(),
Timer.builder("test1").register(TestingCommonsKotlin.meterRegistry),
Counter.builder("test2").register(TestingCommonsKotlin.meterRegistry),
DistributionSummary.builder("test3").register(TestingCommonsKotlin.meterRegistry),
Expand Down

0 comments on commit 7fd81f9

Please sign in to comment.