From 96487b6c5e8516157205bdc31c2d6e9b6423e813 Mon Sep 17 00:00:00 2001 From: KirillPamPam Date: Mon, 12 Aug 2024 18:27:47 +0400 Subject: [PATCH] Check chain settings via ws (#544) --- .../dshackle/upstream/AbstractHead.kt | 5 +- .../emeraldpay/dshackle/upstream/EmptyHead.kt | 3 +- .../io/emeraldpay/dshackle/upstream/Head.kt | 3 +- .../ethereum/EthereumChainSpecific.kt | 11 + .../ethereum/EthereumUpstreamValidator.kt | 11 +- .../upstream/ethereum/GenericWsHead.kt | 49 +++- .../ethereum/HeadLivenessValidator.kt | 14 +- .../ethereum/HeadLivenessValidatorImpl.kt | 8 +- .../upstream/generic/AbstractChainSpecific.kt | 8 + .../upstream/generic/ChainSpecific.kt | 4 + .../upstream/generic/GenericUpstream.kt | 43 +-- .../generic/connectors/GenericConnector.kt | 3 +- .../generic/connectors/GenericRpcConnector.kt | 8 +- .../generic/connectors/GenericWsConnector.kt | 4 +- .../dshackle/test/EthereumHeadMock.groovy | 3 +- .../dshackle/test/GenericConnectorMock.groovy | 7 +- .../EthereumEgressSubscriptionSpec.groovy | 8 +- .../ethereum/GenericWsHeadSpec.groovy | 54 +++- .../ethereum/HeadLivenessValidatorSpec.groovy | 14 +- .../upstream/ethereum/GenericWsHeadTest.kt | 248 ++++++++++++++++++ 20 files changed, 449 insertions(+), 59 deletions(-) create mode 100644 src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt index 3804e1c77..11e51027a 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt @@ -16,6 +16,7 @@ package io.emeraldpay.dshackle.upstream import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice import io.micrometer.core.instrument.Gauge import io.micrometer.core.instrument.Meter @@ -164,7 +165,7 @@ abstract class AbstractHead @JvmOverloads constructor( metrics.forEach { Metrics.globalRegistry.remove(it) } } - protected open fun onNoHeadUpdates() { + open fun onNoHeadUpdates() { // NOOP } @@ -172,7 +173,7 @@ abstract class AbstractHead @JvmOverloads constructor( // NOOP } - override fun headLiveness(): Flux { + override fun headLiveness(): Flux { return Flux.empty() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/EmptyHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/EmptyHead.kt index fee871242..92296c891 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/EmptyHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/EmptyHead.kt @@ -17,6 +17,7 @@ package io.emeraldpay.dshackle.upstream import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState import reactor.core.publisher.Flux class EmptyHead : Head { @@ -44,7 +45,7 @@ class EmptyHead : Head { override fun onSyncingNode(isSyncing: Boolean) { } - override fun headLiveness(): Flux = Flux.empty() + override fun headLiveness(): Flux = Flux.empty() override fun getCurrent(): BlockContainer? { return null diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Head.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Head.kt index 4d8c5f819..532f16abc 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Head.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Head.kt @@ -17,6 +17,7 @@ package io.emeraldpay.dshackle.upstream import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState import reactor.core.publisher.Flux /** @@ -46,7 +47,7 @@ interface Head { fun onSyncingNode(isSyncing: Boolean) - fun headLiveness(): Flux + fun headLiveness(): Flux fun getCurrent(): BlockContainer? } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt index 58053f027..2292f7010 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt @@ -163,6 +163,17 @@ object EthereumChainSpecific : AbstractPollChainSpecific() { } } + override fun chainSettingsValidator( + chain: Chain, + upstream: Upstream, + reader: ChainReader, + ): SingleValidator? { + if (upstream.getOptions().disableUpstreamValidation) { + return null + } + return ChainIdValidator(upstream, chain, reader) + } + override fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector { return BasicEthUpstreamRpcModulesDetector(upstream) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt index 52fc07acd..f17e9d37d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt @@ -21,6 +21,7 @@ import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.foundation.ChainOptions +import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.ChainResponse import io.emeraldpay.dshackle.upstream.SingleValidator @@ -37,6 +38,8 @@ import reactor.kotlin.extra.retry.retryRandomBackoff import java.math.BigInteger import java.time.Duration import java.util.concurrent.TimeoutException +import java.util.function.Supplier + interface CallLimitValidator : SingleValidator { fun isEnabled(): Boolean } @@ -144,7 +147,11 @@ fun callLimitValidatorFactory( class ChainIdValidator( private val upstream: Upstream, private val chain: Chain, + private val customReader: ChainReader? = null, ) : SingleValidator { + private val validatorReader: Supplier = Supplier { + customReader ?: upstream.getIngressReader() + } companion object { @JvmStatic @@ -186,7 +193,7 @@ class ChainIdValidator( } private fun chainId(): Mono { - return upstream.getIngressReader() + return validatorReader.get() .read(ChainRequest("eth_chainId", ListParams())) .retryRandomBackoff(3, Duration.ofMillis(100), Duration.ofMillis(500)) { ctx -> log.warn( @@ -199,7 +206,7 @@ class ChainIdValidator( } private fun netVersion(): Mono { - return upstream.getIngressReader() + return validatorReader.get() .read(ChainRequest("net_version", ListParams())) .retryRandomBackoff(3, Duration.ofMillis(100), Duration.ofMillis(500)) { ctx -> log.warn( diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt index 0e1051439..440f96b67 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt @@ -21,14 +21,18 @@ import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.BlockValidator import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Lifecycle +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_VALID import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice import io.emeraldpay.dshackle.upstream.generic.ChainSpecific import io.emeraldpay.dshackle.upstream.generic.GenericHead +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcWsClient import reactor.core.Disposable import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.Sinks import reactor.core.scheduler.Scheduler +import reactor.kotlin.core.publisher.switchIfEmpty import java.time.Duration import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference @@ -42,6 +46,7 @@ class GenericWsHead( headScheduler: Scheduler, upstream: DefaultUpstream, private val chainSpecific: ChainSpecific, + jsonRpcWsClient: JsonRpcWsClient, timeout: Duration, ) : GenericHead(upstream.getId(), forkChoice, blockValidator, headScheduler, chainSpecific), Lifecycle { private val wsHeadTimeout = run { @@ -54,6 +59,7 @@ class GenericWsHead( }.also { log.info("WS head timeout for ${upstream.getId()} is $it") } + private val chainIdValidator = chainSpecific.chainSettingsValidator(upstream.getChain(), upstream, jsonRpcWsClient) private var connectionId: String? = null private var subscribed = false @@ -63,7 +69,7 @@ class GenericWsHead( private var subscription: Disposable? = null private var headResubSubscription: Disposable? = null private val noHeadUpdatesSink = Sinks.many().multicast().directBestEffort() - private val headLivenessSink = Sinks.many().multicast().directBestEffort() + private val headLivenessSink = Sinks.many().multicast().directBestEffort() private var subscriptionId = AtomicReference("") @@ -99,15 +105,37 @@ class GenericWsHead( } private fun listenNewHeads(): Flux { - return subscribe() + return Mono.justOrEmpty(chainIdValidator) .flatMap { - chainSpecific.getFromHeader(it, "unknown", api) + it!!.validate(UPSTREAM_SETTINGS_ERROR) } - .timeout(wsHeadTimeout, Mono.error(RuntimeException("No response from subscribe to newHeads"))) - .onErrorResume { - log.error("Error getting heads for $upstreamId", it) - subscribed = false - unsubscribe() + .switchIfEmpty { + Mono.just(UPSTREAM_VALID) + } + .flatMapMany { + when (it) { + UPSTREAM_VALID -> { + subscribe() + .flatMap { data -> + chainSpecific.getFromHeader(data, "unknown", api) + } + .timeout(wsHeadTimeout, Mono.error(RuntimeException("No response from subscribe to newHeads"))) + .onErrorResume { err -> + log.error("Error getting heads for {}, message {}", upstreamId, err.message) + unsubscribe() + } + } + UPSTREAM_SETTINGS_ERROR -> { + log.warn("Couldn't check chain settings via ws connection for {}, ws sub will be recreated", upstreamId) + subscribed = false + Mono.empty() + } + else -> { + log.error("Chain settings check hasn't been passed via ws connection, upstream {} will be removed", upstreamId) + headLivenessSink.emitNext(HeadLivenessState.FATAL_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } + Mono.empty() + } + } } } @@ -118,9 +146,10 @@ class GenericWsHead( headResubSubscription = null } - override fun headLiveness(): Flux = headLivenessSink.asFlux() + override fun headLiveness(): Flux = headLivenessSink.asFlux() private fun unsubscribe(): Mono { + subscribed = false return wsSubscriptions.unsubscribe(chainSpecific.unsubscribeNewHeadsRequest(subscriptionId.get()).copy(id = ids.getAndIncrement())) .flatMap { it.requireResult() } .doOnNext { log.warn("{} has just unsubscribed from newHeads", upstreamId) } @@ -152,7 +181,7 @@ class GenericWsHead( val connectionStates = wsSubscriptions.connectionInfoFlux() .map { if (it.connectionId == connectionId && it.connectionState == WsConnection.ConnectionState.DISCONNECTED) { - headLivenessSink.emitNext(false) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } + headLivenessSink.emitNext(HeadLivenessState.DISCONNECTED) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } subscribed = false connected = false connectionId = null diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt index 208df55d1..68187bc58 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt @@ -2,19 +2,23 @@ package io.emeraldpay.dshackle.upstream.ethereum import reactor.core.publisher.Flux +enum class HeadLivenessState { + OK, NON_CONSECUTIVE, DISCONNECTED, FATAL_ERROR +} + interface HeadLivenessValidator { - fun getFlux(): Flux + fun getFlux(): Flux } class NoHeadLivenessValidator : HeadLivenessValidator { - override fun getFlux(): Flux { - return Flux.just(false) + override fun getFlux(): Flux { + return Flux.just(HeadLivenessState.NON_CONSECUTIVE) } } class AlwaysHeadLivenessValidator : HeadLivenessValidator { - override fun getFlux(): Flux { - return Flux.just(true) + override fun getFlux(): Flux { + return Flux.just(HeadLivenessState.OK) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorImpl.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorImpl.kt index 17dc28c87..d05136284 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorImpl.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorImpl.kt @@ -18,7 +18,7 @@ class HeadLivenessValidatorImpl( private val log = LoggerFactory.getLogger(HeadLivenessValidatorImpl::class.java) } - override fun getFlux(): Flux { + override fun getFlux(): Flux { val headLiveness = head.headLiveness() // first we have moving window of 2 blocks and check that they are consecutive ones val headFlux = head.getFlux().map { it.height }.buffer(2, 1).map { @@ -39,13 +39,13 @@ class HeadLivenessValidatorImpl( // we emit when we have false or checked CHECKED_BLOCKS_UNTIL_LIVE blocks // CHECKED_BLOCKS_UNTIL_LIVE blocks == (CHECKED_BLOCKS_UNTIL_LIVE - 1) consecutive true when { - count >= (CHECKED_BLOCKS_UNTIL_LIVE - 1) -> Flux.just(true) - !value -> Flux.just(false) + count >= (CHECKED_BLOCKS_UNTIL_LIVE - 1) -> Flux.just(HeadLivenessState.OK) + !value -> Flux.just(HeadLivenessState.NON_CONSECUTIVE) else -> Flux.empty() } }.timeout( expectedBlockTime.multipliedBy(CHECKED_BLOCKS_UNTIL_LIVE.toLong() * 2), - Flux.just(false).doOnNext { + Flux.just(HeadLivenessState.NON_CONSECUTIVE).doOnNext { if (log.isDebugEnabled) { log.debug("head liveness check broken with timeout in $upstreamId") } else { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt index 693df7a68..6ee480611 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt @@ -60,6 +60,14 @@ abstract class AbstractChainSpecific : ChainSpecific { return null } + override fun chainSettingsValidator( + chain: Chain, + upstream: Upstream, + reader: ChainReader, + ): SingleValidator? { + return null + } + override fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector? { return null } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt index 7ea9b39b2..b51386f2d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt @@ -23,10 +23,12 @@ import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.LogsOracle import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.SingleValidator import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector import io.emeraldpay.dshackle.upstream.UpstreamValidator +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.beaconchain.BeaconChainSpecific import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.calls.CallSelector @@ -82,6 +84,8 @@ interface ChainSpecific { fun upstreamSettingsDetector(chain: Chain, upstream: Upstream): UpstreamSettingsDetector? + fun chainSettingsValidator(chain: Chain, upstream: Upstream, reader: ChainReader): SingleValidator? + fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector? fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index a5f7f795c..340d6a0db 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -23,7 +23,11 @@ import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetectorBuilder import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.UpstreamValidatorBuilder import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_VALID import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState import io.emeraldpay.dshackle.upstream.finalization.FinalizationData import io.emeraldpay.dshackle.upstream.generic.connectors.ConnectorFactory import io.emeraldpay.dshackle.upstream.generic.connectors.GenericConnector @@ -33,6 +37,7 @@ import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType import org.springframework.context.Lifecycle import reactor.core.Disposable import reactor.core.publisher.Flux +import reactor.core.publisher.Sinks import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference @@ -95,6 +100,8 @@ open class GenericUpstream( private val finalizationDetector = finalizationDetectorBuilder() private var finalizationDetectorSubscription: Disposable? = null + private val headLivenessState = Sinks.many().multicast().directBestEffort() + override fun getHead(): Head { return connector.getHead() } @@ -152,16 +159,16 @@ open class GenericUpstream( if (validator != null) { val validSettingsResult = validator.validateUpstreamSettingsOnStartup() when (validSettingsResult) { - ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR -> { + UPSTREAM_FATAL_SETTINGS_ERROR -> { log.warn("Upstream ${getId()} couldn't start, invalid upstream settings") connector.stop() return } - ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR -> { + UPSTREAM_SETTINGS_ERROR -> { log.warn("Non fatal upstream settings error, continue validation...") connector.getHead().stop() } - ValidateUpstreamSettingsResult.UPSTREAM_VALID -> { + UPSTREAM_VALID -> { isUpstreamValid.set(true) upstreamStart() } @@ -177,15 +184,18 @@ open class GenericUpstream( private fun validateUpstreamSettings() { if (validator != null) { - validationSettingsSubscription = Flux.interval( - Duration.ofSeconds(20), - ).flatMap { - validator.validateUpstreamSettings() - } + validationSettingsSubscription = Flux.merge( + Flux.interval( + Duration.ofSeconds(20), + ).flatMap { + validator.validateUpstreamSettings() + }, + headLivenessState.asFlux(), + ) .distinctUntilChanged() .subscribe { when (it) { - ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR -> { + UPSTREAM_FATAL_SETTINGS_ERROR -> { if (isUpstreamValid.get()) { log.warn("There is a fatal error after upstream settings validation, removing ${getId()}...") partialStop() @@ -194,7 +204,7 @@ open class GenericUpstream( isUpstreamValid.set(false) } - ValidateUpstreamSettingsResult.UPSTREAM_VALID -> { + UPSTREAM_VALID -> { if (!isUpstreamValid.get()) { log.warn("Upstream ${getId()} is now valid, adding to the multistream...") upstreamStart() @@ -232,8 +242,6 @@ open class GenericUpstream( } private fun detectRpcModules(config: UpstreamsConfig.Upstream<*>, buildMethods: (UpstreamsConfig.Upstream<*>, Chain) -> CallMethods) { - rpcModulesDetector?.detectRpcModules() - val rpcDetector = rpcModulesDetector?.detectRpcModules()?.block() ?: HashMap() log.info("Upstream rpc detector for ${getId()} returned $rpcDetector ") if (rpcDetector.size != 0) { @@ -266,9 +274,14 @@ open class GenericUpstream( validatorSubscription = validator?.start() ?.subscribe(this::setStatus) } - livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ - hasLiveSubscriptionHead.set(it) - sendUpstreamStateEvent(UPDATED) + livenessSubscription = connector.headLivenessEvents().subscribe({ + val hasSub = it == HeadLivenessState.OK + hasLiveSubscriptionHead.set(hasSub) + if (it == HeadLivenessState.FATAL_ERROR) { + headLivenessState.emitNext(UPSTREAM_FATAL_SETTINGS_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } + } else { + sendUpstreamStateEvent(UPDATED) + } }, { log.debug("Error while checking live subscription for ${getId()}", it) },) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnector.kt index 685019107..9d58c153b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnector.kt @@ -4,12 +4,13 @@ import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.Lifecycle +import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState import reactor.core.publisher.Flux interface GenericConnector : Lifecycle { fun getHead(): Head - fun hasLiveSubscriptionHead(): Flux + fun headLivenessEvents(): Flux fun getIngressReader(): ChainReader diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt index e7ec0b598..2e10daf4e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt @@ -14,6 +14,7 @@ import io.emeraldpay.dshackle.upstream.MergedHead import io.emeraldpay.dshackle.upstream.NoIngressSubscription import io.emeraldpay.dshackle.upstream.ethereum.AlwaysHeadLivenessValidator import io.emeraldpay.dshackle.upstream.ethereum.GenericWsHead +import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidator import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidatorImpl import io.emeraldpay.dshackle.upstream.ethereum.NoHeadLivenessValidator @@ -30,6 +31,7 @@ import io.emeraldpay.dshackle.upstream.generic.connectors.GenericConnectorFactor import io.emeraldpay.dshackle.upstream.generic.connectors.GenericConnectorFactory.ConnectorMode.RPC_REQUESTS_WITH_MIXED_HEAD import io.emeraldpay.dshackle.upstream.generic.connectors.GenericConnectorFactory.ConnectorMode.RPC_REQUESTS_WITH_WS_HEAD import io.emeraldpay.dshackle.upstream.generic.connectors.GenericConnectorFactory.ConnectorMode.WS_ONLY +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcWsClient import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.scheduler.Scheduler @@ -55,18 +57,20 @@ class GenericRpcConnector( private val ingressSubscription: IngressSubscription? private val head: Head private val liveness: HeadLivenessValidator + private val jsonRpcWsClient: JsonRpcWsClient? companion object { private val log = LoggerFactory.getLogger(GenericRpcConnector::class.java) } - override fun hasLiveSubscriptionHead(): Flux { + override fun headLivenessEvents(): Flux { return liveness.getFlux().distinctUntilChanged() } init { pool = wsFactory?.create(upstream) wsSubs = pool?.let { WsSubscriptionsImpl(it) } + jsonRpcWsClient = pool?.let { JsonRpcWsClient(pool) } ingressSubscription = wsSubs?.let { chainSpecific.makeIngressSubscription(it) } head = when (connectorType) { @@ -98,6 +102,7 @@ class GenericRpcConnector( headScheduler, upstream, chainSpecific, + jsonRpcWsClient!!, expectedBlockTime, ) // receive all new blocks through WebSockets, but also periodically verify with RPC in case if WS failed @@ -124,6 +129,7 @@ class GenericRpcConnector( headScheduler, upstream, chainSpecific, + jsonRpcWsClient!!, expectedBlockTime, ) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt index b86ffacc7..2a847f7b5 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt @@ -6,6 +6,7 @@ import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.ethereum.GenericWsHead +import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidatorImpl import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPool import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPoolFactory @@ -46,13 +47,14 @@ class GenericWsConnector( headScheduler, upstream, chainSpecific, + reader, expectedBlockTime, ) liveness = HeadLivenessValidatorImpl(head, expectedBlockTime, headLivenessScheduler, upstream.getId()) subscriptions = chainSpecific.makeIngressSubscription(wsSubscriptions) } - override fun hasLiveSubscriptionHead(): Flux { + override fun headLivenessEvents(): Flux { return liveness.getFlux().distinctUntilChanged() } override fun start() { diff --git a/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy b/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy index 0b982bccf..2fd3b0ff7 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy @@ -18,6 +18,7 @@ package io.emeraldpay.dshackle.test import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState import org.jetbrains.annotations.NotNull import org.reactivestreams.Publisher import reactor.core.publisher.Flux @@ -89,7 +90,7 @@ class EthereumHeadMock implements Head { } @Override - Flux headLiveness() { + Flux headLiveness() { return Flux.empty() } diff --git a/src/test/groovy/io/emeraldpay/dshackle/test/GenericConnectorMock.groovy b/src/test/groovy/io/emeraldpay/dshackle/test/GenericConnectorMock.groovy index bcfb74192..f66ddf659 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/test/GenericConnectorMock.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/test/GenericConnectorMock.groovy @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.test import io.emeraldpay.dshackle.reader.Reader import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription +import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState import io.emeraldpay.dshackle.upstream.ethereum.NoEthereumIngressSubscription import io.emeraldpay.dshackle.upstream.generic.connectors.GenericConnector import io.emeraldpay.dshackle.upstream.ChainRequest @@ -12,16 +13,16 @@ import reactor.core.publisher.Flux class GenericConnectorMock implements GenericConnector { Reader api Head head - Flux liveness + Flux liveness GenericConnectorMock(Reader api, Head head) { this.api = api this.head = head - this.liveness = Flux.just(false) + this.liveness = Flux.just(HeadLivenessState.NON_CONSECUTIVE) } @Override - Flux hasLiveSubscriptionHead() { + Flux headLivenessEvents() { return liveness } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscriptionSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscriptionSpec.groovy index 52e8bcb38..052e7a575 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscriptionSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumEgressSubscriptionSpec.groovy @@ -179,14 +179,14 @@ class EthereumEgressSubscriptionSpec extends Specification { def "get available subscriptions"() { when: def up1 = TestingCommons.upstream("test") - up1.getConnectorMock().setLiveness(Flux.just(false)) + up1.getConnectorMock().setLiveness(Flux.just(HeadLivenessState.NON_CONSECUTIVE)) def ethereumSubscribe1 = new EthereumEgressSubscription(TestingCommons.multistream(up1) as GenericMultistream, Schedulers.boundedElastic(), null) then: ethereumSubscribe1.getAvailableTopics() == [] when: def up2 = TestingCommons.upstream("test") - up2.getConnectorMock().setLiveness(Flux.just(true)) + up2.getConnectorMock().setLiveness(Flux.just(HeadLivenessState.OK)) up2.stop() up2.start() def ethereumSubscribe2 = new EthereumEgressSubscription(TestingCommons.multistream(up2) as GenericMultistream, Schedulers.boundedElastic(), null) @@ -194,7 +194,7 @@ class EthereumEgressSubscriptionSpec extends Specification { ethereumSubscribe2.getAvailableTopics().toSet() == [EthereumEgressSubscription.METHOD_LOGS, EthereumEgressSubscription.METHOD_NEW_HEADS].toSet() when: def up3 = TestingCommons.upstream("test") - up3.getConnectorMock().setLiveness(Flux.just(true)) + up3.getConnectorMock().setLiveness(Flux.just(HeadLivenessState.OK)) up3.stop() up3.start() def ethereumSubscribe3 = new EthereumEgressSubscription(TestingCommons.multistream(up3) as GenericMultistream, Schedulers.boundedElastic(), Stub(PendingTxesSource)) @@ -202,7 +202,7 @@ class EthereumEgressSubscriptionSpec extends Specification { ethereumSubscribe3.getAvailableTopics().toSet() == [EthereumEgressSubscription.METHOD_LOGS, EthereumEgressSubscription.METHOD_NEW_HEADS, EthereumEgressSubscription.METHOD_PENDING_TXES].toSet() when: def up4 = TestingCommons.upstream(TestingCommons.api(), "eth_getBlockByNumber") - up4.getConnectorMock().setLiveness(Flux.just(true)) + up4.getConnectorMock().setLiveness(Flux.just(HeadLivenessState.OK)) up4.stop() up4.start() def ethereumSubscribe4 = new EthereumEgressSubscription(TestingCommons.multistream(up4) as GenericMultistream, Schedulers.boundedElastic(), null) diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy index dc0955a94..6913efb25 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy @@ -27,6 +27,7 @@ import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson import io.emeraldpay.dshackle.upstream.forkchoice.AlwaysForkChoice import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.ChainResponse +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcWsClient import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import io.emeraldpay.dshackle.upstream.ethereum.domain.BlockHash import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionRefJson @@ -69,6 +70,11 @@ class GenericWsHeadSpec extends Specification { 1 * it.connectionInfoFlux() >> Flux.empty() } + def pool = Mock(WsConnectionPool) { + getConnection() >> Mock(WsConnection) + } + def client = new JsonRpcWsClient(pool) + 1 * ws.subscribe(_) >> new WsSubscriptions.SubscribeData( Flux.fromIterable([headBlock]), "id", new AtomicReference("") ) @@ -82,6 +88,7 @@ class GenericWsHeadSpec extends Specification { Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE, + client, Duration.ofSeconds(60), ) @@ -113,6 +120,11 @@ class GenericWsHeadSpec extends Specification { def apiMock = TestingCommons.api() + def pool = Mock(WsConnectionPool) { + getConnection() >> Mock(WsConnection) + } + def client = new JsonRpcWsClient(pool) + def connectionInfoSink = Sinks.many().multicast().directBestEffort() def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() @@ -120,6 +132,8 @@ class GenericWsHeadSpec extends Specification { new WsSubscriptions.SubscribeData(Flux.error(new RuntimeException()), "id", new AtomicReference("")), new WsSubscriptions.SubscribeData(Flux.fromIterable([secondHeadBlock]), "id", new AtomicReference("")) ] + 1 * it.unsubscribe(new ChainRequest("eth_unsubscribe", new ListParams(""), 2, null, null, false)) >> + Mono.just(new ChainResponse("".bytes, null)) } def head = new GenericWsHead( @@ -131,6 +145,7 @@ class GenericWsHeadSpec extends Specification { Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE, + client, Duration.ofSeconds(60), ) @@ -171,6 +186,11 @@ class GenericWsHeadSpec extends Specification { Global.objectMapper.writeValueAsBytes(it) } + def pool = Mock(WsConnectionPool) { + getConnection() >> Mock(WsConnection) + } + def client = new JsonRpcWsClient(pool) + def apiMock = TestingCommons.api() def connectionInfoSink = Sinks.many().multicast().directBestEffort() apiMock.answerOnce("eth_getBlockByHash", ["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200", false], null) @@ -195,6 +215,7 @@ class GenericWsHeadSpec extends Specification { Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE, + client, Duration.ofSeconds(60), ) @@ -229,6 +250,11 @@ class GenericWsHeadSpec extends Specification { apiMock.answerOnce("eth_getBlockByHash", ["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200", false], null) apiMock.answerOnce("eth_blockNumber", [], Mono.empty()) + def pool = Mock(WsConnectionPool) { + getConnection() >> Mock(WsConnection) + } + def client = new JsonRpcWsClient(pool) + def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() 1 * subscribe(_) >>> [ @@ -245,6 +271,7 @@ class GenericWsHeadSpec extends Specification { Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE, + client, Duration.ofSeconds(60), ) @@ -278,6 +305,11 @@ class GenericWsHeadSpec extends Specification { apiMock.answerOnce("eth_getBlockByHash", ["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200", false], null) apiMock.answerOnce("eth_blockNumber", [], Mono.empty()) + def pool = Mock(WsConnectionPool) { + getConnection() >> Mock(WsConnection) + } + def client = new JsonRpcWsClient(pool) + def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() 1 * subscribe(_) >>> [ @@ -294,6 +326,7 @@ class GenericWsHeadSpec extends Specification { Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE, + client, Duration.ofSeconds(60), ) @@ -340,6 +373,11 @@ class GenericWsHeadSpec extends Specification { apiMock.answerOnce("eth_getBlockByHash", ["0x29229361dc5aa1ec66c323dc7a299e2b61a8c8dd2a3522d41255ec10eca25dd8", false], null) apiMock.answerOnce("eth_blockNumber", [], Mono.empty()) + def pool = Mock(WsConnectionPool) { + getConnection() >> Mock(WsConnection) + } + def client = new JsonRpcWsClient(pool) + def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() 2 * subscribe(_) >>> [ @@ -357,6 +395,7 @@ class GenericWsHeadSpec extends Specification { Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE, + client, Duration.ofSeconds(60), ) @@ -401,6 +440,12 @@ class GenericWsHeadSpec extends Specification { def reader = Mock(Reader) { 1 * it.read(new ChainRequest("eth_getBlockByNumber", new ListParams("latest", false))) >> Mono.empty() } + + def pool = Mock(WsConnectionPool) { + getConnection() >> Mock(WsConnection) + } + def client = new JsonRpcWsClient(pool) + def subId = "subId" def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> Flux.empty() @@ -420,6 +465,7 @@ class GenericWsHeadSpec extends Specification { Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE, + client, Duration.ofSeconds(60), ) @@ -450,6 +496,11 @@ class GenericWsHeadSpec extends Specification { def apiMock = TestingCommons.api() + def pool = Mock(WsConnectionPool) { + getConnection() >> Mock(WsConnection) + } + def client = new JsonRpcWsClient(pool) + def connectionInfoSink = Sinks.many().multicast().directBestEffort() def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() @@ -467,6 +518,7 @@ class GenericWsHeadSpec extends Specification { Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE, + client, Duration.ofSeconds(60), ) @@ -479,7 +531,7 @@ class GenericWsHeadSpec extends Specification { .then { connectionInfoSink.tryEmitNext(new WsConnection.ConnectionInfo("id", WsConnection.ConnectionState.DISCONNECTED)) } - .expectNext(false) + .expectNext(HeadLivenessState.DISCONNECTED) .thenCancel() .verify(Duration.ofSeconds(1)) } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy index acd2efcf7..517ef72db 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy @@ -21,19 +21,19 @@ class HeadLivenessValidatorSpec extends Specification{ head.nextBlock(TestingCommons.blockForEthereum(1)) head.nextBlock(TestingCommons.blockForEthereum(2)) head.nextBlock(TestingCommons.blockForEthereum(3)) - }.expectNext(true).thenCancel().verify(Duration.ofSeconds(1)) + }.expectNext(HeadLivenessState.OK).thenCancel().verify(Duration.ofSeconds(1)) } def "emits false if head liveness emits false"() { when: def head = Mock(Head) { - 1 * it.headLiveness() >> Flux.just(false) + 1 * it.headLiveness() >> Flux.just(HeadLivenessState.NON_CONSECUTIVE) 1 * it.getFlux() >> Flux.just(TestingCommons.blockForEthereum(1)) } def checker = new HeadLivenessValidatorImpl(head, Duration.ofSeconds(10), Schedulers.boundedElastic(), "test") then: StepVerifier.create(checker.flux) - .expectNext(false) + .expectNext(HeadLivenessState.NON_CONSECUTIVE) .thenCancel() .verify(Duration.ofSeconds(1)) } @@ -52,7 +52,7 @@ class HeadLivenessValidatorSpec extends Specification{ .then { head.nextBlock(TestingCommons.blockForEthereum(5)) } - .expectNext(false) + .expectNext(HeadLivenessState.NON_CONSECUTIVE) .thenCancel().verify(Duration.ofSeconds(1)) } @@ -67,7 +67,7 @@ class HeadLivenessValidatorSpec extends Specification{ head.nextBlock(TestingCommons.blockForEthereum(2)) } .thenAwait(Duration.ofSeconds(1)) - .expectNext(false) + .expectNext(HeadLivenessState.NON_CONSECUTIVE) .thenCancel().verify(Duration.ofSeconds(2)) } @@ -82,13 +82,13 @@ class HeadLivenessValidatorSpec extends Specification{ head.nextBlock(TestingCommons.blockForEthereum(2)) } .thenAwait(Duration.ofSeconds(1)) - .expectNext(false) + .expectNext(HeadLivenessState.NON_CONSECUTIVE) .then { head.nextBlock(TestingCommons.blockForEthereum(3)) head.nextBlock(TestingCommons.blockForEthereum(4)) head.nextBlock(TestingCommons.blockForEthereum(5)) } - .expectNext(true) + .expectNext(HeadLivenessState.OK) .thenCancel().verify(Duration.ofSeconds(3)) } } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt new file mode 100644 index 000000000..b68ce8336 --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt @@ -0,0 +1,248 @@ +package io.emeraldpay.dshackle.upstream.ethereum + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.foundation.ChainOptions +import io.emeraldpay.dshackle.reader.ChainReader +import io.emeraldpay.dshackle.upstream.BlockValidator +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.ChainResponse +import io.emeraldpay.dshackle.upstream.DefaultUpstream +import io.emeraldpay.dshackle.upstream.ethereum.domain.BlockHash +import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson +import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionRefJson +import io.emeraldpay.dshackle.upstream.forkchoice.AlwaysForkChoice +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcWsClient +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import org.junit.jupiter.api.Test +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import org.mockito.kotlin.never +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.publisher.Sinks +import reactor.core.scheduler.Schedulers +import reactor.test.StepVerifier +import java.math.BigInteger +import java.time.Duration +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.concurrent.atomic.AtomicReference + +class GenericWsHeadTest { + + @Test + fun `validate chain settings and then head sub`() { + val block = block() + val reader = mock { + on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn Mono.empty() + } + val wsSub = mock { + on { connectionInfoFlux() } doReturn Flux.empty() + on { subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) } doReturn + WsSubscriptions.SubscribeData(Flux.just(Global.objectMapper.writeValueAsBytes(block)), "id", AtomicReference("subId")) + } + val connection = mock { + on { isConnected } doReturn true + on { callRpc(ChainRequest("eth_chainId", ListParams())) } doReturn Mono.just(ChainResponse("\"0x1\"".toByteArray(), null)) + on { callRpc(ChainRequest("net_version", ListParams())) } doReturn Mono.just(ChainResponse("\"1\"".toByteArray(), null)) + } + val wsPool = mock { + on { getConnection() } doReturn connection + } + val wsClient = JsonRpcWsClient(wsPool) + val upstream = mock { + on { getId() } doReturn "id" + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + on { getOptions() } doReturn ChainOptions.PartialOptions().buildOptions() + } + + val wsHead = GenericWsHead( + AlwaysForkChoice(), + BlockValidator.ALWAYS_VALID, + reader, + wsSub, + Schedulers.boundedElastic(), + Schedulers.boundedElastic(), + upstream, + EthereumChainSpecific, + wsClient, + Duration.ofSeconds(60), + ) + + StepVerifier.create(wsHead.getFlux()) + .then { wsHead.start() } + .expectNext(BlockContainer.from(block)) + .thenCancel() + .verify(Duration.ofSeconds(1)) + + verify(connection).callRpc(ChainRequest("eth_chainId", ListParams())) + verify(connection).callRpc(ChainRequest("net_version", ListParams())) + verify(wsSub).subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) + } + + @Test + fun `validate chain settings and then fatal error`() { + val reader = mock { + on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn Mono.empty() + } + val wsSub = mock { + on { connectionInfoFlux() } doReturn Flux.empty() + } + val connection = mock { + on { isConnected } doReturn true + on { callRpc(ChainRequest("eth_chainId", ListParams())) } doReturn Mono.just(ChainResponse("\"0x1\"".toByteArray(), null)) + on { callRpc(ChainRequest("net_version", ListParams())) } doReturn Mono.just(ChainResponse("\"155\"".toByteArray(), null)) + } + val wsPool = mock { + on { getConnection() } doReturn connection + } + val wsClient = JsonRpcWsClient(wsPool) + val upstream = mock { + on { getId() } doReturn "id" + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + on { getOptions() } doReturn ChainOptions.PartialOptions().buildOptions() + } + + val wsHead = GenericWsHead( + AlwaysForkChoice(), + BlockValidator.ALWAYS_VALID, + reader, + wsSub, + Schedulers.boundedElastic(), + Schedulers.boundedElastic(), + upstream, + EthereumChainSpecific, + wsClient, + Duration.ofSeconds(60), + ) + + StepVerifier.create(wsHead.headLiveness()) + .then { wsHead.start() } + .expectNext(HeadLivenessState.FATAL_ERROR) + .thenCancel() + .verify(Duration.ofSeconds(3)) + + verify(connection).callRpc(ChainRequest("eth_chainId", ListParams())) + verify(connection).callRpc(ChainRequest("net_version", ListParams())) + verify(wsSub, never()).subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) + } + + @Test + fun `no validate chain settings if it's disabled`() { + val block = block() + val reader = mock { + on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn Mono.empty() + } + val wsSub = mock { + on { connectionInfoFlux() } doReturn Flux.empty() + on { subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) } doReturn + WsSubscriptions.SubscribeData(Flux.just(Global.objectMapper.writeValueAsBytes(block)), "id", AtomicReference("subId")) + } + val connection = mock() + val wsPool = mock { + on { getConnection() } doReturn connection + } + val wsClient = JsonRpcWsClient(wsPool) + val upstream = mock { + on { getId() } doReturn "id" + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + on { getOptions() } doReturn ChainOptions.PartialOptions(disableUpstreamValidation = true).buildOptions() + } + + val wsHead = GenericWsHead( + AlwaysForkChoice(), + BlockValidator.ALWAYS_VALID, + reader, + wsSub, + Schedulers.boundedElastic(), + Schedulers.boundedElastic(), + upstream, + EthereumChainSpecific, + wsClient, + Duration.ofSeconds(60), + ) + + StepVerifier.create(wsHead.getFlux()) + .then { wsHead.start() } + .expectNext(BlockContainer.from(block)) + .thenCancel() + .verify(Duration.ofSeconds(3)) + + verify(connection, never()).callRpc(ChainRequest("eth_chainId", ListParams())) + verify(connection, never()).callRpc(ChainRequest("net_version", ListParams())) + verify(wsSub).subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) + } + + @Test + fun `validate chain settings, getting an error and then head sub`() { + val block = block() + val reader = mock { + on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn Mono.empty() + } + val connectionInfoSink = Sinks.many().multicast().directBestEffort() + val wsSub = mock { + on { connectionInfoFlux() } doReturn connectionInfoSink.asFlux() + on { subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) } doReturn + WsSubscriptions.SubscribeData(Flux.just(Global.objectMapper.writeValueAsBytes(block)), "id", AtomicReference("subId")) + } + val connection = mock { + on { isConnected } doReturn true + on { callRpc(ChainRequest("eth_chainId", ListParams())) } doReturn Mono.error(RuntimeException("err")) doReturn Mono.just(ChainResponse("\"0x1\"".toByteArray(), null)) + on { callRpc(ChainRequest("net_version", ListParams())) } doReturn Mono.just(ChainResponse("\"1\"".toByteArray(), null)) + } + val wsPool = mock { + on { getConnection() } doReturn connection + } + val wsClient = JsonRpcWsClient(wsPool) + val upstream = mock { + on { getId() } doReturn "id" + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + on { getOptions() } doReturn ChainOptions.PartialOptions().buildOptions() + } + + val wsHead = GenericWsHead( + AlwaysForkChoice(), + BlockValidator.ALWAYS_VALID, + reader, + wsSub, + Schedulers.boundedElastic(), + Schedulers.boundedElastic(), + upstream, + EthereumChainSpecific, + wsClient, + Duration.ofSeconds(60), + ) + + StepVerifier.create(wsHead.getFlux()) + .then { + wsHead.start() + connectionInfoSink.tryEmitNext(WsConnection.ConnectionInfo("id", WsConnection.ConnectionState.CONNECTED)) + } + .expectNoEvent(Duration.ofMillis(1500)) + .then { + wsHead.onNoHeadUpdates() + } + .expectNext(BlockContainer.from(block)) + .thenCancel() + .verify(Duration.ofSeconds(3)) + + verify(connection, times(2)).callRpc(ChainRequest("eth_chainId", ListParams())) + verify(connection, times(2)).callRpc(ChainRequest("net_version", ListParams())) + verify(wsSub).subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) + } + + private fun block() = + BlockJson() + .apply { + number = 1500000L + uncles = emptyList() + totalDifficulty = BigInteger.ONE + parentHash = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200") + timestamp = Instant.now().truncatedTo(ChronoUnit.SECONDS) + hash = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200") + } +}