diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt index f923c4bf5..afffb61d1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetector.kt @@ -1,14 +1,12 @@ package io.emeraldpay.dshackle.upstream.ethereum +import com.fasterxml.jackson.module.kotlin.readValue import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.upstream.ChainRequest -import io.emeraldpay.dshackle.upstream.ChainResponse import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionRefJson -import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException -import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcResponseError import io.emeraldpay.dshackle.upstream.finalization.FinalizationData import io.emeraldpay.dshackle.upstream.finalization.FinalizationDetector import io.emeraldpay.dshackle.upstream.finalization.FinalizationType @@ -74,35 +72,28 @@ class EthereumFinalizationDetector : FinalizationDetector { upstream .getIngressReader() .read(req) - .onErrorResume { - if (it.message != null && it.message!!.matches(Regex(errorRegex))) { - log.warn("Can't retrieve tagged block, finalization detector for upstream ${upstream.getId()} $chain tag $type disabled") - disableDetector[type] = true - } else { - throw it - } - Mono.empty() - } .flatMap { - it.requireResult().map { result -> - val block = Global.objectMapper - .readValue( - result, - BlockJson::class.java, - ) as BlockJson? + it.requireResult().flatMap { result -> + val block = Global.objectMapper.readValue>(result) if (block != null) { - FinalizationData(block.number, type) + Mono.just(FinalizationData(block.number, type)) } else { - throw RpcException(RpcResponseError.CODE_INVALID_JSON, "can't parse block data") + Mono.empty() } } } + .onErrorResume { + if (it.message != null && it.message!!.matches(Regex(errorRegex))) { + log.warn("Can't retrieve tagged block, finalization detector of upstream {} tag {} is disabled", upstream.getId(), type) + disableDetector[type] = true + } else { + log.error("Error in FinalizationDetector of upstream {}, reason - {}", upstream.getId(), it.message) + } + Mono.empty() + } } else { Flux.empty() } - }.onErrorResume { - log.error("Error in FinalizationDetector for upstream ${upstream.getId()} $chain — $it") - Flux.empty() } }.filter { it.height > (data[it.type]?.height ?: 0) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetectorTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetectorTest.kt index f109645a5..c18284f2f 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetectorTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumFinalizationDetectorTest.kt @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream.ethereum import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.reader.ChainReader +import io.emeraldpay.dshackle.upstream.ChainCallError import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.ChainResponse import io.emeraldpay.dshackle.upstream.Upstream @@ -14,6 +15,8 @@ import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import org.junit.jupiter.api.Test import org.mockito.kotlin.doReturn import org.mockito.kotlin.mock +import org.mockito.kotlin.times +import org.mockito.kotlin.verify import reactor.core.publisher.Mono import reactor.test.StepVerifier import java.time.Duration @@ -54,6 +57,94 @@ class EthereumFinalizationDetectorTest { .verify(Duration.ofSeconds(1)) } + @Test + fun `safe and finalized blocks returns null`() { + val reader = mock { + on { + read(ChainRequest("eth_getBlockByNumber", ListParams("safe", false), 1)) + } doReturn Mono.just(ChainResponse(Global.nullValue, null)) doReturn Mono.just(ChainResponse(Global.nullValue, null)) + on { + read(ChainRequest("eth_getBlockByNumber", ListParams("finalized", false), 2)) + } doReturn Mono.just(ChainResponse(Global.nullValue, null)) doReturn Mono.just(ChainResponse(Global.nullValue, null)) + } + val upstream = mock { + on { getIngressReader() } doReturn reader + } + + val detector = EthereumFinalizationDetector() + + StepVerifier.withVirtualTime { detector.detectFinalization(upstream, Duration.ofSeconds(10), Chain.ZKSYNC__MAINNET) } + .expectSubscription() + .thenAwait(Duration.ofSeconds(0)) + .expectNoEvent(Duration.ofMillis(50)) + .thenAwait(Duration.ofSeconds(15)) + .expectNoEvent(Duration.ofMillis(50)) + .thenCancel() + .verify(Duration.ofSeconds(1)) + + verify(reader, times(2)).read(ChainRequest("eth_getBlockByNumber", ListParams("safe", false), 1)) + verify(reader, times(2)).read(ChainRequest("eth_getBlockByNumber", ListParams("finalized", false), 2)) + } + + @Test + fun `disable detector if receive specific errors`() { + val reader = mock { + on { + read(ChainRequest("eth_getBlockByNumber", ListParams("safe", false), 1)) + } doReturn Mono.just(ChainResponse(null, ChainCallError(1, "Got an invalid block number, check it"))) + on { + read(ChainRequest("eth_getBlockByNumber", ListParams("finalized", false), 2)) + } doReturn response(2) doReturn response(10) + } + + val upstream = mock { + on { getIngressReader() } doReturn reader + } + + val detector = EthereumFinalizationDetector() + + StepVerifier.withVirtualTime { detector.detectFinalization(upstream, Duration.ofSeconds(10), Chain.ZKSYNC__MAINNET) } + .expectSubscription() + .thenAwait(Duration.ofSeconds(0)) + .expectNext(FinalizationData(2L, FinalizationType.FINALIZED_BLOCK)) + .thenAwait(Duration.ofSeconds(15)) + .expectNext(FinalizationData(10L, FinalizationType.FINALIZED_BLOCK)) + .thenCancel() + .verify(Duration.ofSeconds(1)) + + verify(reader).read(ChainRequest("eth_getBlockByNumber", ListParams("safe", false), 1)) + verify(reader, times(2)).read(ChainRequest("eth_getBlockByNumber", ListParams("finalized", false), 2)) + } + + @Test + fun `ignore response if it cannot be parsed`() { + val reader = mock { + on { + read(ChainRequest("eth_getBlockByNumber", ListParams("safe", false), 1)) + } doReturn Mono.just(ChainResponse("""{"name": "value"}""".toByteArray(), null)) doReturn Mono.just(ChainResponse("""{"name": "value"}""".toByteArray(), null)) + on { + read(ChainRequest("eth_getBlockByNumber", ListParams("finalized", false), 2)) + } doReturn response(2) doReturn response(10) + } + val upstream = mock { + on { getIngressReader() } doReturn reader + } + + val detector = EthereumFinalizationDetector() + + StepVerifier.withVirtualTime { detector.detectFinalization(upstream, Duration.ofSeconds(10), Chain.ZKSYNC__MAINNET) } + .expectSubscription() + .thenAwait(Duration.ofSeconds(0)) + .expectNext(FinalizationData(2L, FinalizationType.FINALIZED_BLOCK)) + .thenAwait(Duration.ofSeconds(15)) + .expectNext(FinalizationData(10L, FinalizationType.FINALIZED_BLOCK)) + .thenCancel() + .verify(Duration.ofSeconds(1)) + + verify(reader, times(2)).read(ChainRequest("eth_getBlockByNumber", ListParams("safe", false), 1)) + verify(reader, times(2)).read(ChainRequest("eth_getBlockByNumber", ListParams("finalized", false), 2)) + } + private fun response(blockNumber: Long) = Mono.just( ChainResponse(