diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt index bceda217b..426ce9e46 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt @@ -363,6 +363,7 @@ open class ConfiguredUpstreams( Tag.of("chain", (Global.chainById(config.blockchain).chainCode)) ) val metrics = RpcMetrics( + metricsTags, timer = Timer.builder("upstream.rpc.conn") .description("Request time through a HTTP JSON RPC connection") .tags(metricsTags) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/ConnectionTimeMonitoring.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/ConnectionTimeMonitoring.kt new file mode 100644 index 000000000..0d9491a26 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/ConnectionTimeMonitoring.kt @@ -0,0 +1,46 @@ +/** + * Copyright (c) 2023 EmeraldPay, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.emeraldpay.dshackle.upstream.ethereum + +import io.emeraldpay.dshackle.upstream.rpcclient.RpcMetrics +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicReference +import java.util.function.Consumer + +/** + * A consumer for a connection events that records the connection time metrics + */ +class ConnectionTimeMonitoring( + val metrics: RpcMetrics, +) : Consumer { + + private val connectedAt = AtomicReference(null) + + val connectionTime: Duration? + get() = connectedAt.get()?.let { + Duration.between(it, Instant.now()).coerceAtLeast(Duration.ofMillis(0)) + } + + override fun accept(t: WsConnection.ConnectionStatus) { + if (t == WsConnection.ConnectionStatus.CONNECTED) { + connectedAt.set(Instant.now()) + } else if (t == WsConnection.ConnectionStatus.DISCONNECTED) { + connectionTime?.let(metrics::recordConnectionTime) + connectedAt.set(null) + } + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsFactory.kt index 8a1479794..f7926c50c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsFactory.kt @@ -46,6 +46,7 @@ open class EthereumWsFactory( ) RpcMetrics( + metricsTags, timer = Timer.builder("upstream.ws.conn") .description("Request time through a WebSocket JSON RPC connection") .tags(metricsTags) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt index 09ae34f8a..e6ea45935 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt @@ -120,7 +120,7 @@ open class WsConnectionImpl( private var keepConnection = true private var connection: Disposable? = null private val reconnecting = AtomicBoolean(false) - private var onConnectionChange: Consumer? = null + private var onConnectionChange: Consumer = defaultOnConnectionChange() /** * true when the connection is actively receiving messages @@ -130,8 +130,18 @@ open class WsConnectionImpl( override val isConnected: Boolean get() = connection != null && !reconnecting.get() && active + private fun defaultOnConnectionChange(): Consumer { + return rpcMetrics?.let { + ConnectionTimeMonitoring(it) + } ?: Consumer { } + } + override fun onConnectionChange(handler: Consumer?) { - this.onConnectionChange = handler + this.onConnectionChange = if (handler != null) { + handler.andThen(defaultOnConnectionChange()) + } else { + defaultOnConnectionChange() + } } fun setReconnectIntervalSeconds(value: Long) { @@ -192,7 +202,7 @@ open class WsConnectionImpl( .doOnDisconnected { active = false disconnects.tryEmitNext(Instant.now()) - this.onConnectionChange?.accept(WsConnection.ConnectionStatus.DISCONNECTED) + this.onConnectionChange.accept(WsConnection.ConnectionStatus.DISCONNECTED) log.info("Disconnected from $uri") if (keepConnection) { tryReconnectLater() @@ -235,13 +245,13 @@ open class WsConnectionImpl( ) .uri(uri) .handle { inbound, outbound -> - this.onConnectionChange?.accept(WsConnection.ConnectionStatus.CONNECTED) + this.onConnectionChange.accept(WsConnection.ConnectionStatus.CONNECTED) // mark as active once connected, because the actual message wouldn't come until a request is sent active = true handle(inbound, outbound) } .onErrorResume { t -> - log.debug("Dropping WS connection to $uri. Error: ${t.message}") + log.debug("Dropping WS connection to {}. Error: {}", uri, t.message) Mono.empty() } .subscribe() @@ -374,6 +384,7 @@ open class WsConnectionImpl( val internalId = request.id.toLong() val onResponse = Sinks.one() currentRequests[internalId.toInt()] = onResponse + rpcMetrics?.onMessageEnqueued() // a default response when nothing is received back from WS after a timeout val noResponse = JsonRpcException( @@ -407,7 +418,10 @@ open class WsConnectionImpl( .switchIfEmpty( Mono.fromCallable { log.warn("No response for ${request.method} ${request.params}") }.then(Mono.error(noResponse)) ) - .doFinally { currentRequests.remove(internalId.toInt()) } + .doFinally { + currentRequests.remove(internalId.toInt()) + rpcMetrics?.onMessageFinished() + } } override fun close() { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt index 8e22df126..53a4c625c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt @@ -186,6 +186,7 @@ class GrpcUpstreams( ) val metrics = RpcMetrics( + metricsTags, timer = Timer.builder("upstream.grpc.conn") .description("Request time through a Dshackle/gRPC connection") .tags(metricsTags) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt index 82ff47df1..ef5596751 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt @@ -100,12 +100,15 @@ class JsonRpcHttpClient( bytes.aggregate().asByteArray().map { Tuples.of(statusCode, it) } + }.doFinally { + metrics.onMessageFinished() }.single() } override fun read(key: JsonRpcRequest): Mono { val startTime = StopWatch() return Mono.just(key) + .doOnSubscribe { metrics.onMessageEnqueued() } .map(JsonRpcRequest::toJson) .doOnNext { startTime.start() } .flatMap(this@JsonRpcHttpClient::execute) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/RpcMetrics.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/RpcMetrics.kt index c62387a1d..1aa4de54f 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/RpcMetrics.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/RpcMetrics.kt @@ -17,14 +17,20 @@ package io.emeraldpay.dshackle.upstream.rpcclient import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.DistributionSummary +import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.Metrics +import io.micrometer.core.instrument.Tag import io.micrometer.core.instrument.Timer import reactor.core.publisher.Mono import reactor.netty.ChannelPipelineConfigurer import reactor.netty.channel.ChannelMetricsRecorder import reactor.netty.channel.ChannelOperations +import java.time.Duration +import java.util.concurrent.atomic.AtomicInteger import java.util.function.Function class RpcMetrics( + tags: Iterable, val timer: Timer, val fails: Counter, val responseSize: DistributionSummary, @@ -33,6 +39,12 @@ class RpcMetrics( val connectionMetrics: ChannelMetricsRecorder, ) { + private val connectionTime = Timer.builder("netty.client.connection_time") + .tags(tags) + .register(Metrics.globalRegistry) + + private val queueSize = AtomicInteger(0) + val onChannelInit: ChannelPipelineConfigurer get() = ChannelPipelineConfigurer { connectionObserver, channel, remoteAddress -> // See reactor.netty.transport.TransportConfig$TransportChannelInitializer @@ -50,4 +62,32 @@ class RpcMetrics( } } } + + init { + Gauge.builder("upstream.rpc.queue_size", queueSize.get()::toDouble) + .tags(tags) + .register(Metrics.globalRegistry) + } + + /** + * Record the total connection time. + * The difference from #recordConnectTime is that this one is applicable to long + */ + fun recordConnectionTime(time: Duration) { + connectionTime.record(time) + } + + /** + * Call when a new request is added to the queue, i.e., is about to be sent to the upstream + */ + fun onMessageEnqueued() { + queueSize.incrementAndGet() + } + + /** + * Call when a request it processed, as success, error, timeout or any other status indicating that the request is no longer in queue. + */ + fun onMessageFinished() { + queueSize.decrementAndGet() + } } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy index 1d73f3f7a..2e2b52809 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy @@ -48,6 +48,7 @@ class EthereumGrpcUpstreamSpec extends Specification { MockGrpcServer mockServer = new MockGrpcServer() ObjectMapper objectMapper = Global.objectMapper RpcMetrics metrics = new RpcMetrics( + [], Timer.builder("test1").register(TestingCommons.meterRegistry), Counter.builder("test2").register(TestingCommons.meterRegistry), DistributionSummary.builder("test3").register(TestingCommons.meterRegistry), diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/ConnectionTimeMonitoringTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/ConnectionTimeMonitoringTest.kt new file mode 100644 index 000000000..0cc3f237a --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/ConnectionTimeMonitoringTest.kt @@ -0,0 +1,54 @@ +/** + * Copyright (c) 2023 EmeraldPay, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.emeraldpay.dshackle.upstream.ethereum + +import io.emeraldpay.dshackle.upstream.rpcclient.RpcMetrics +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.comparables.shouldBeGreaterThan +import io.kotest.matchers.shouldBe +import io.kotest.matchers.shouldNotBe +import io.mockk.mockk +import io.mockk.verify +import java.time.Duration + +class ConnectionTimeMonitoringTest : ShouldSpec({ + + should("Update time on CONNECT") { + val monitoring = ConnectionTimeMonitoring(mockk()) + + monitoring.connectionTime shouldBe null + monitoring.accept(WsConnection.ConnectionStatus.CONNECTED) + Thread.sleep(25) + monitoring.connectionTime shouldNotBe null + monitoring.connectionTime!! shouldBeGreaterThan Duration.ofMillis(20) + } + + should("Record time on DISCONNECT") { + val rpc = mockk(relaxed = true) + val monitoring = ConnectionTimeMonitoring(rpc) + + monitoring.accept(WsConnection.ConnectionStatus.CONNECTED) + Thread.sleep(25) + monitoring.accept(WsConnection.ConnectionStatus.DISCONNECTED) + + // erases the time after recording it + monitoring.connectionTime shouldBe null + + verify { + rpc.recordConnectionTime(more(Duration.ofMillis(20))) + } + } +}) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClientTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClientTest.kt index 5ce4d0b37..6ff45e31c 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClientTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClientTest.kt @@ -10,6 +10,7 @@ import io.kotest.matchers.shouldBe import io.kotest.matchers.types.instanceOf import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.DistributionSummary +import io.micrometer.core.instrument.Tag import io.micrometer.core.instrument.Timer import org.mockserver.integration.ClientAndServer import org.mockserver.model.HttpRequest @@ -23,6 +24,7 @@ class JsonRpcHttpClientTest : ShouldSpec({ val port = SocketUtils.findAvailableTcpPort(20000) val metrics = RpcMetrics( + emptyList(), Timer.builder("test1").register(TestingCommonsKotlin.meterRegistry), Counter.builder("test2").register(TestingCommonsKotlin.meterRegistry), DistributionSummary.builder("test3").register(TestingCommonsKotlin.meterRegistry),