diff --git a/foundation/src/main/resources/public b/foundation/src/main/resources/public index 3f0729de1..ad94b61b5 160000 --- a/foundation/src/main/resources/public +++ b/foundation/src/main/resources/public @@ -1 +1 @@ -Subproject commit 3f0729de1901d1b756c01793d17e6839db29428d +Subproject commit ad94b61b5e4ea0eddfea14b1ff9bce6d761da5bf diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt index 11e51027a..bbcb2088f 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt @@ -24,6 +24,7 @@ import io.micrometer.core.instrument.Metrics import org.slf4j.LoggerFactory import reactor.core.Disposable import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import reactor.core.publisher.SignalType import reactor.core.publisher.Sinks import reactor.core.publisher.Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER @@ -95,6 +96,20 @@ abstract class AbstractHead @JvmOverloads constructor( log.warn("Received signal $upstreamId $it, continue emit heads") } } + .flatMap { + if (isSuspiciousBlock(it)) { + log.warn( + "Got a suspicious head of upstream {} with a height {} " + + "that is very different from the previous head with a height {}, validating this upstream", + upstreamId, + it.height, + getCurrentHeight(), + ) + checkSuspiciousBlock(it) + } else { + Mono.just(it) + } + } .subscribeOn(headScheduler) .subscribe { block -> val valid = runCatching { @@ -165,7 +180,7 @@ abstract class AbstractHead @JvmOverloads constructor( metrics.forEach { Metrics.globalRegistry.remove(it) } } - open fun onNoHeadUpdates() { + protected open fun onNoHeadUpdates() { // NOOP } @@ -196,4 +211,12 @@ abstract class AbstractHead @JvmOverloads constructor( ) } } + + protected open fun isSuspiciousBlock(block: BlockContainer): Boolean { + return false + } + + protected open fun checkSuspiciousBlock(block: BlockContainer): Mono { + return Mono.just(block) + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt index 2292f7010..cbcceac1b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt @@ -166,7 +166,7 @@ object EthereumChainSpecific : AbstractPollChainSpecific() { override fun chainSettingsValidator( chain: Chain, upstream: Upstream, - reader: ChainReader, + reader: ChainReader?, ): SingleValidator? { if (upstream.getOptions().disableUpstreamValidation) { return null diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt index 440f96b67..3aabc99b6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt @@ -21,6 +21,9 @@ import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.BlockValidator import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Lifecycle +import io.emeraldpay.dshackle.upstream.SingleValidator +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_VALID import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice @@ -69,7 +72,6 @@ class GenericWsHead( private var subscription: Disposable? = null private var headResubSubscription: Disposable? = null private val noHeadUpdatesSink = Sinks.many().multicast().directBestEffort() - private val headLivenessSink = Sinks.many().multicast().directBestEffort() private var subscriptionId = AtomicReference("") @@ -130,7 +132,7 @@ class GenericWsHead( subscribed = false Mono.empty() } - else -> { + UPSTREAM_FATAL_SETTINGS_ERROR -> { log.error("Chain settings check hasn't been passed via ws connection, upstream {} will be removed", upstreamId) headLivenessSink.emitNext(HeadLivenessState.FATAL_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } Mono.empty() @@ -146,7 +148,9 @@ class GenericWsHead( headResubSubscription = null } - override fun headLiveness(): Flux = headLivenessSink.asFlux() + override fun chainIdValidator(): SingleValidator? { + return chainIdValidator + } private fun unsubscribe(): Mono { subscribed = false diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt index 6ee480611..315ade7fd 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt @@ -63,7 +63,7 @@ abstract class AbstractChainSpecific : ChainSpecific { override fun chainSettingsValidator( chain: Chain, upstream: Upstream, - reader: ChainReader, + reader: ChainReader?, ): SingleValidator? { return null } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt index b51386f2d..db1844756 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt @@ -84,7 +84,7 @@ interface ChainSpecific { fun upstreamSettingsDetector(chain: Chain, upstream: Upstream): UpstreamSettingsDetector? - fun chainSettingsValidator(chain: Chain, upstream: Upstream, reader: ChainReader): SingleValidator? + fun chainSettingsValidator(chain: Chain, upstream: Upstream, reader: ChainReader?): SingleValidator? fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector? diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericHead.kt index 22b5f902e..9a66934ed 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericHead.kt @@ -21,9 +21,16 @@ import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.AbstractHead import io.emeraldpay.dshackle.upstream.BlockValidator import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.SingleValidator +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult +import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice +import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.core.publisher.Sinks import reactor.core.scheduler.Scheduler +import reactor.kotlin.core.publisher.switchIfEmpty +import kotlin.math.abs open class GenericHead( protected val upstreamId: String, @@ -32,6 +39,7 @@ open class GenericHead( private val headScheduler: Scheduler, private val chainSpecific: ChainSpecific, ) : Head, AbstractHead(forkChoice, headScheduler, blockValidator, 60_000, upstreamId) { + protected val headLivenessSink: Sinks.Many = Sinks.many().multicast().directBestEffort() fun getLatestBlock(api: ChainReader): Mono { return chainSpecific.getLatestBlock(api, upstreamId) @@ -42,4 +50,44 @@ open class GenericHead( Mono.empty() } } + + override fun isSuspiciousBlock(block: BlockContainer): Boolean { + return getCurrentHeight() + ?.let { + abs(block.height - it) > 10000 + } ?: false + } + + override fun checkSuspiciousBlock(block: BlockContainer): Mono { + return Mono.justOrEmpty(chainIdValidator()) + .flatMap { + it!!.validate(ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR) + } + .switchIfEmpty { + Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_VALID) + } + .flatMap { validationResult -> + when (validationResult) { + ValidateUpstreamSettingsResult.UPSTREAM_VALID -> { + log.info("Block {} of upstream {} has been received from the same chain, validation is passed", block.height, upstreamId) + Mono.just(block) + } + ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR -> { + log.warn("Block {} of upstream {} is filtered and can not be emitted due to upstream settings error check", block.height, upstreamId) + Mono.empty() + } + ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR -> { + log.error("Block {} of upstream {} can not be emitted due to chain inconsistency, upstream will be removed", block.height, upstreamId) + headLivenessSink.emitNext(HeadLivenessState.FATAL_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } + Mono.empty() + } + } + } + } + + override fun headLiveness(): Flux = headLivenessSink.asFlux() + + protected open fun chainIdValidator(): SingleValidator? { + return null + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericRpcHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericRpcHead.kt index 5bf8cc77c..49e44743d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericRpcHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericRpcHead.kt @@ -18,7 +18,10 @@ package io.emeraldpay.dshackle.upstream.generic import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.upstream.BlockValidator +import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Lifecycle +import io.emeraldpay.dshackle.upstream.SingleValidator +import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice import reactor.core.Disposable import reactor.core.publisher.Flux @@ -30,10 +33,12 @@ class GenericRpcHead( forkChoice: ForkChoice, upstreamId: String, blockValidator: BlockValidator, + upstream: DefaultUpstream, private val headScheduler: Scheduler, private val chainSpecific: ChainSpecific, private val interval: Duration = Duration.ofSeconds(10), ) : GenericHead(upstreamId, forkChoice, blockValidator, headScheduler, chainSpecific), Lifecycle { + private val chainIdValidator = chainSpecific.chainSettingsValidator(upstream.getChain(), upstream, null) private var refreshSubscription: Disposable? = null private var isSyncing = false @@ -63,4 +68,8 @@ class GenericRpcHead( refreshSubscription?.dispose() refreshSubscription = null } + + override fun chainIdValidator(): SingleValidator? { + return chainIdValidator + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt index 2e10daf4e..d4c639dcc 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt @@ -81,6 +81,7 @@ class GenericRpcConnector( forkChoice, id, blockValidator, + upstream, headScheduler, chainSpecific, expectedBlockTime.coerceAtLeast(Duration.ofSeconds(1)), @@ -112,6 +113,7 @@ class GenericRpcConnector( AlwaysForkChoice(), id, blockValidator, + upstream, headScheduler, chainSpecific, Duration.ofSeconds(30), diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt index b68ce8336..49ddb9877 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadTest.kt @@ -36,7 +36,7 @@ class GenericWsHeadTest { @Test fun `validate chain settings and then head sub`() { - val block = block() + val block = block(10000, BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) val reader = mock { on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn Mono.empty() } @@ -133,14 +133,15 @@ class GenericWsHeadTest { @Test fun `no validate chain settings if it's disabled`() { - val block = block() + val block1 = block(10000, BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val block2 = block(25000, BlockHash.from("0x2ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) val reader = mock { on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn Mono.empty() } val wsSub = mock { on { connectionInfoFlux() } doReturn Flux.empty() on { subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) } doReturn - WsSubscriptions.SubscribeData(Flux.just(Global.objectMapper.writeValueAsBytes(block)), "id", AtomicReference("subId")) + WsSubscriptions.SubscribeData(Flux.just(Global.objectMapper.writeValueAsBytes(block1), Global.objectMapper.writeValueAsBytes(block2)), "id", AtomicReference("subId")) } val connection = mock() val wsPool = mock { @@ -168,7 +169,8 @@ class GenericWsHeadTest { StepVerifier.create(wsHead.getFlux()) .then { wsHead.start() } - .expectNext(BlockContainer.from(block)) + .expectNext(BlockContainer.from(block1)) + .expectNext(BlockContainer.from(block2)) .thenCancel() .verify(Duration.ofSeconds(3)) @@ -179,7 +181,7 @@ class GenericWsHeadTest { @Test fun `validate chain settings, getting an error and then head sub`() { - val block = block() + val block = block(10000, BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) val reader = mock { on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn Mono.empty() } @@ -224,7 +226,7 @@ class GenericWsHeadTest { } .expectNoEvent(Duration.ofMillis(1500)) .then { - wsHead.onNoHeadUpdates() + wsHead.start() } .expectNext(BlockContainer.from(block)) .thenCancel() @@ -235,14 +237,234 @@ class GenericWsHeadTest { verify(wsSub).subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) } - private fun block() = + @Test + fun `emit chain heads and no suspicious head`() { + val block1 = block(10000, BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val block2 = block(15000, BlockHash.from("0x2ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val reader = mock { + on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn Mono.empty() + } + val wsSub = mock { + on { connectionInfoFlux() } doReturn Flux.empty() + on { subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) } doReturn + WsSubscriptions.SubscribeData(Flux.just(Global.objectMapper.writeValueAsBytes(block1), Global.objectMapper.writeValueAsBytes(block2)), "id", AtomicReference("subId")) + } + val connection = mock { + on { isConnected } doReturn true + on { callRpc(ChainRequest("eth_chainId", ListParams())) } doReturn Mono.just(ChainResponse("\"0x1\"".toByteArray(), null)) + on { callRpc(ChainRequest("net_version", ListParams())) } doReturn Mono.just(ChainResponse("\"1\"".toByteArray(), null)) + } + val wsPool = mock { + on { getConnection() } doReturn connection + } + val wsClient = JsonRpcWsClient(wsPool) + val upstream = mock { + on { getId() } doReturn "id" + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + on { getOptions() } doReturn ChainOptions.PartialOptions().buildOptions() + } + + val wsHead = GenericWsHead( + AlwaysForkChoice(), + BlockValidator.ALWAYS_VALID, + reader, + wsSub, + Schedulers.boundedElastic(), + Schedulers.boundedElastic(), + upstream, + EthereumChainSpecific, + wsClient, + Duration.ofSeconds(60), + ) + + StepVerifier.create(wsHead.getFlux()) + .then { wsHead.start() } + .expectNext(BlockContainer.from(block1)) + .expectNext(BlockContainer.from(block2)) + .thenCancel() + .verify(Duration.ofSeconds(1)) + + verify(connection).callRpc(ChainRequest("eth_chainId", ListParams())) + verify(connection).callRpc(ChainRequest("net_version", ListParams())) + verify(wsSub).subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) + } + + @Test + fun `emit chain heads and validate a suspicious one`() { + val block1 = block(10000, BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val block2 = block(25000, BlockHash.from("0x2ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val reader = mock { + on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn Mono.empty() + } + val wsSub = mock { + on { connectionInfoFlux() } doReturn Flux.empty() + on { subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) } doReturn + WsSubscriptions.SubscribeData(Flux.just(Global.objectMapper.writeValueAsBytes(block1), Global.objectMapper.writeValueAsBytes(block2)), "id", AtomicReference("subId")) + } + val connection = mock { + on { isConnected } doReturn true + on { callRpc(ChainRequest("eth_chainId", ListParams())) } doReturn Mono.just(ChainResponse("\"0x1\"".toByteArray(), null)) + on { callRpc(ChainRequest("net_version", ListParams())) } doReturn Mono.just(ChainResponse("\"1\"".toByteArray(), null)) + } + val wsPool = mock { + on { getConnection() } doReturn connection + } + val wsClient = JsonRpcWsClient(wsPool) + val upstream = mock { + on { getId() } doReturn "id" + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + on { getOptions() } doReturn ChainOptions.PartialOptions().buildOptions() + } + + val wsHead = GenericWsHead( + AlwaysForkChoice(), + BlockValidator.ALWAYS_VALID, + reader, + wsSub, + Schedulers.boundedElastic(), + Schedulers.boundedElastic(), + upstream, + EthereumChainSpecific, + wsClient, + Duration.ofSeconds(60), + ) + + StepVerifier.create(wsHead.getFlux()) + .then { wsHead.start() } + .expectNext(BlockContainer.from(block1)) + .expectNext(BlockContainer.from(block2)) + .thenCancel() + .verify(Duration.ofSeconds(1)) + + verify(connection, times(2)).callRpc(ChainRequest("eth_chainId", ListParams())) + verify(connection, times(2)).callRpc(ChainRequest("net_version", ListParams())) + verify(wsSub).subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) + } + + @Test + fun `emit chain heads, validate a suspicious one and skip it`() { + val block1 = block(10000, BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val block2 = block(25000, BlockHash.from("0x2ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val reader = mock { + on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn Mono.empty() + } + val wsSub = mock { + on { connectionInfoFlux() } doReturn Flux.empty() + on { subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) } doReturn + WsSubscriptions.SubscribeData(Flux.just(Global.objectMapper.writeValueAsBytes(block1), Global.objectMapper.writeValueAsBytes(block2)), "id", AtomicReference("subId")) + } + val connection = mock { + on { isConnected } doReturn true + on { callRpc(ChainRequest("eth_chainId", ListParams())) } doReturn Mono.just(ChainResponse("\"0x1\"".toByteArray(), null)) + on { callRpc(ChainRequest("net_version", ListParams())) } doReturn + Mono.just(ChainResponse("\"1\"".toByteArray(), null)) doReturn + Mono.just(ChainResponse("\"dsff\"".toByteArray(), null)) + } + val wsPool = mock { + on { getConnection() } doReturn connection + } + val wsClient = JsonRpcWsClient(wsPool) + val upstream = mock { + on { getId() } doReturn "id" + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + on { getOptions() } doReturn ChainOptions.PartialOptions().buildOptions() + } + + val wsHead = GenericWsHead( + AlwaysForkChoice(), + BlockValidator.ALWAYS_VALID, + reader, + wsSub, + Schedulers.boundedElastic(), + Schedulers.boundedElastic(), + upstream, + EthereumChainSpecific, + wsClient, + Duration.ofSeconds(60), + ) + + StepVerifier.create(wsHead.getFlux()) + .then { wsHead.start() } + .expectNext(BlockContainer.from(block1)) + .expectNoEvent(Duration.ofMillis(500)) + .thenCancel() + .verify(Duration.ofSeconds(1)) + + verify(connection, times(2)).callRpc(ChainRequest("eth_chainId", ListParams())) + verify(connection, times(2)).callRpc(ChainRequest("net_version", ListParams())) + verify(wsSub).subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) + } + + @Test + fun `emit chain heads, validate a suspicious one and then fatal error`() { + val block1 = block(10000, BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val block2 = block(25000, BlockHash.from("0x2ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val reader = mock { + on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn Mono.empty() + } + val wsSub = mock { + on { connectionInfoFlux() } doReturn Flux.empty() + on { subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) } doReturn + WsSubscriptions.SubscribeData(Flux.just(Global.objectMapper.writeValueAsBytes(block1), Global.objectMapper.writeValueAsBytes(block2)), "id", AtomicReference("subId")) + } + val connection = mock { + on { isConnected } doReturn true + on { callRpc(ChainRequest("eth_chainId", ListParams())) } doReturn Mono.just(ChainResponse("\"0x1\"".toByteArray(), null)) + on { callRpc(ChainRequest("net_version", ListParams())) } doReturn + Mono.just(ChainResponse("\"1\"".toByteArray(), null)) doReturn + Mono.just(ChainResponse("\"5\"".toByteArray(), null)) + } + val wsPool = mock { + on { getConnection() } doReturn connection + } + val wsClient = JsonRpcWsClient(wsPool) + val upstream = mock { + on { getId() } doReturn "id" + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + on { getOptions() } doReturn ChainOptions.PartialOptions().buildOptions() + } + + val wsHead = GenericWsHead( + AlwaysForkChoice(), + BlockValidator.ALWAYS_VALID, + reader, + wsSub, + Schedulers.boundedElastic(), + Schedulers.boundedElastic(), + upstream, + EthereumChainSpecific, + wsClient, + Duration.ofSeconds(60), + ) + + StepVerifier.create(wsHead.getFlux()) + .then { wsHead.start() } + .expectNext(BlockContainer.from(block1)) + .then { + StepVerifier.create(wsHead.headLiveness()) + .expectNext(HeadLivenessState.FATAL_ERROR) + .thenCancel() + .verify(Duration.ofSeconds(1)) + } + .thenCancel() + .verify(Duration.ofSeconds(1)) + + verify(connection, times(2)).callRpc(ChainRequest("eth_chainId", ListParams())) + verify(connection, times(2)).callRpc(ChainRequest("net_version", ListParams())) + verify(wsSub).subscribe(ChainRequest("eth_subscribe", ListParams("newHeads"))) + } + + private fun block( + height: Long, + blockHash: BlockHash, + ) = BlockJson() .apply { - number = 1500000L + number = height uncles = emptyList() totalDifficulty = BigInteger.ONE - parentHash = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200") + parentHash = blockHash timestamp = Instant.now().truncatedTo(ChronoUnit.SECONDS) - hash = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200") + hash = blockHash } } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericRpcHeadTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericRpcHeadTest.kt new file mode 100644 index 000000000..9ea174fad --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericRpcHeadTest.kt @@ -0,0 +1,257 @@ +package io.emeraldpay.dshackle.upstream.generic + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.foundation.ChainOptions +import io.emeraldpay.dshackle.reader.ChainReader +import io.emeraldpay.dshackle.upstream.BlockValidator +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.ChainResponse +import io.emeraldpay.dshackle.upstream.DefaultUpstream +import io.emeraldpay.dshackle.upstream.ethereum.EthereumChainSpecific +import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState +import io.emeraldpay.dshackle.upstream.ethereum.domain.BlockHash +import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson +import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionRefJson +import io.emeraldpay.dshackle.upstream.forkchoice.AlwaysForkChoice +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import org.junit.jupiter.api.Test +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import org.mockito.kotlin.never +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers +import reactor.test.StepVerifier +import java.math.BigInteger +import java.time.Duration +import java.time.Instant +import java.time.temporal.ChronoUnit + +class GenericRpcHeadTest { + + @Test + fun `emit chain heads and no suspicious head`() { + val block1 = block(10000, BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val block2 = block(10001, BlockHash.from("0x3ec1ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val upstream = mock { + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + on { getOptions() } doReturn ChainOptions.PartialOptions().buildOptions() + } + val reader = mock { + on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn + Mono.just(ChainResponse(Global.objectMapper.writeValueAsBytes(block1), null)) doReturn + Mono.just(ChainResponse(Global.objectMapper.writeValueAsBytes(block2), null)) + } + val head = GenericRpcHead( + reader, + AlwaysForkChoice(), + "id", + BlockValidator.ALWAYS_VALID, + upstream, + Schedulers.boundedElastic(), + EthereumChainSpecific, + Duration.ofSeconds(5), + ) + + StepVerifier.withVirtualTime { head.getFlux() } + .expectSubscription() + .then { head.start() } + .expectNoEvent(Duration.ofSeconds(5)) + .expectNext(BlockContainer.from(block1)) + .expectNoEvent(Duration.ofSeconds(5)) + .expectNext(BlockContainer.from(block2)) + .thenCancel() + .verify(Duration.ofSeconds(3)) + + verify(reader, times(2)).read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) + verify(reader, never()).read(ChainRequest("eth_chainId", ListParams())) + verify(reader, never()).read(ChainRequest("net_version", ListParams())) + } + + @Test + fun `emit chain heads and no validation if it's disabled`() { + val block1 = block(10000, BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val block2 = block(50000, BlockHash.from("0x3ec1ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val upstream = mock { + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + on { getOptions() } doReturn ChainOptions.PartialOptions(disableUpstreamValidation = true).buildOptions() + } + val reader = mock { + on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn + Mono.just(ChainResponse(Global.objectMapper.writeValueAsBytes(block1), null)) doReturn + Mono.just(ChainResponse(Global.objectMapper.writeValueAsBytes(block2), null)) + } + val head = GenericRpcHead( + reader, + AlwaysForkChoice(), + "id", + BlockValidator.ALWAYS_VALID, + upstream, + Schedulers.boundedElastic(), + EthereumChainSpecific, + Duration.ofSeconds(5), + ) + + StepVerifier.withVirtualTime { head.getFlux() } + .expectSubscription() + .then { head.start() } + .expectNoEvent(Duration.ofSeconds(5)) + .expectNext(BlockContainer.from(block1)) + .expectNoEvent(Duration.ofSeconds(5)) + .expectNext(BlockContainer.from(block2)) + .thenCancel() + .verify(Duration.ofSeconds(3)) + + verify(reader, times(2)).read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) + verify(reader, never()).read(ChainRequest("eth_chainId", ListParams())) + verify(reader, never()).read(ChainRequest("net_version", ListParams())) + } + + @Test + fun `emit chain heads and validate chain settings`() { + val block1 = block(10000, BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val block2 = block(50000, BlockHash.from("0x3ec1ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val reader = mock { + on { read(ChainRequest("eth_chainId", ListParams())) } doReturn Mono.just(ChainResponse("\"0x1\"".toByteArray(), null)) + on { read(ChainRequest("net_version", ListParams())) } doReturn Mono.just(ChainResponse("\"1\"".toByteArray(), null)) + on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn + Mono.just(ChainResponse(Global.objectMapper.writeValueAsBytes(block1), null)) doReturn + Mono.just(ChainResponse(Global.objectMapper.writeValueAsBytes(block2), null)) + } + val upstream = mock { + on { getIngressReader() } doReturn reader + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + on { getOptions() } doReturn ChainOptions.PartialOptions().buildOptions() + } + val head = GenericRpcHead( + reader, + AlwaysForkChoice(), + "id", + BlockValidator.ALWAYS_VALID, + upstream, + Schedulers.boundedElastic(), + EthereumChainSpecific, + Duration.ofSeconds(5), + ) + + StepVerifier.withVirtualTime { head.getFlux() } + .expectSubscription() + .then { head.start() } + .expectNoEvent(Duration.ofSeconds(5)) + .expectNext(BlockContainer.from(block1)) + .expectNoEvent(Duration.ofSeconds(5)) + .expectNext(BlockContainer.from(block2)) + .thenCancel() + .verify(Duration.ofSeconds(3)) + + verify(reader, times(2)).read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) + verify(reader).read(ChainRequest("eth_chainId", ListParams())) + verify(reader).read(ChainRequest("net_version", ListParams())) + } + + @Test + fun `emit chain heads, validate chain settings and then fatal error`() { + val block1 = block(10000, BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val block2 = block(50000, BlockHash.from("0x3ec1ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val reader = mock { + on { read(ChainRequest("eth_chainId", ListParams())) } doReturn Mono.just(ChainResponse("\"0x1\"".toByteArray(), null)) + on { read(ChainRequest("net_version", ListParams())) } doReturn Mono.just(ChainResponse("\"5\"".toByteArray(), null)) + on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn + Mono.just(ChainResponse(Global.objectMapper.writeValueAsBytes(block1), null)) doReturn + Mono.just(ChainResponse(Global.objectMapper.writeValueAsBytes(block2), null)) + } + val upstream = mock { + on { getIngressReader() } doReturn reader + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + on { getOptions() } doReturn ChainOptions.PartialOptions().buildOptions() + } + val head = GenericRpcHead( + reader, + AlwaysForkChoice(), + "id", + BlockValidator.ALWAYS_VALID, + upstream, + Schedulers.boundedElastic(), + EthereumChainSpecific, + Duration.ofSeconds(5), + ) + + StepVerifier.withVirtualTime { head.getFlux() } + .expectSubscription() + .then { head.start() } + .expectNoEvent(Duration.ofSeconds(5)) + .expectNext(BlockContainer.from(block1)) + .expectNoEvent(Duration.ofSeconds(5)) + .then { + StepVerifier.create(head.headLiveness()) + .expectNext(HeadLivenessState.FATAL_ERROR) + .thenCancel() + .verify(Duration.ofSeconds(3)) + } + .thenCancel() + .verify(Duration.ofSeconds(3)) + + verify(reader, times(2)).read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) + verify(reader).read(ChainRequest("eth_chainId", ListParams())) + verify(reader).read(ChainRequest("net_version", ListParams())) + } + + @Test + fun `emit chain heads, validate chain settings and skip a received head due to error`() { + val block1 = block(10000, BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val block2 = block(50000, BlockHash.from("0x3ec1ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200")) + val reader = mock { + on { read(ChainRequest("eth_chainId", ListParams())) } doReturn Mono.just(ChainResponse("sds".toByteArray(), null)) + on { read(ChainRequest("net_version", ListParams())) } doReturn Mono.just(ChainResponse("\"sad\"".toByteArray(), null)) + on { read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) } doReturn + Mono.just(ChainResponse(Global.objectMapper.writeValueAsBytes(block1), null)) doReturn + Mono.just(ChainResponse(Global.objectMapper.writeValueAsBytes(block2), null)) + } + val upstream = mock { + on { getIngressReader() } doReturn reader + on { getChain() } doReturn Chain.ETHEREUM__MAINNET + on { getOptions() } doReturn ChainOptions.PartialOptions().buildOptions() + } + val head = GenericRpcHead( + reader, + AlwaysForkChoice(), + "id", + BlockValidator.ALWAYS_VALID, + upstream, + Schedulers.boundedElastic(), + EthereumChainSpecific, + Duration.ofSeconds(5), + ) + + StepVerifier.withVirtualTime { head.getFlux() } + .expectSubscription() + .then { head.start() } + .expectNoEvent(Duration.ofSeconds(5)) + .expectNext(BlockContainer.from(block1)) + .expectNoEvent(Duration.ofSeconds(5)) + .thenCancel() + .verify(Duration.ofSeconds(3)) + + Thread.sleep(100) + verify(reader, times(2)).read(ChainRequest("eth_getBlockByNumber", ListParams("latest", false))) + verify(reader).read(ChainRequest("eth_chainId", ListParams())) + verify(reader).read(ChainRequest("net_version", ListParams())) + } + + private fun block( + height: Long, + blockHash: BlockHash, + ) = + BlockJson() + .apply { + number = height + uncles = emptyList() + totalDifficulty = BigInteger.ONE + parentHash = blockHash + timestamp = Instant.now().truncatedTo(ChronoUnit.SECONDS) + hash = blockHash + } +} diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactoryCreatorTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactoryCreatorTest.kt index 0d8f14320..3d9931eec 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactoryCreatorTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactoryCreatorTest.kt @@ -53,7 +53,7 @@ class GenericConnectorFactoryCreatorTest { config, )?.create(mock { on { getId() } doReturn "id" }, Chain.ETHEREUM__MAINNET) - assertEquals(expectedBlockTime, args?.get(6)) + assertEquals(expectedBlockTime, args?.get(7)) } }