Skip to content

Commit

Permalink
Finalization detector fixes (#554)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Aug 22, 2024
1 parent a8b91fa commit 32cb726
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package io.emeraldpay.dshackle.upstream.ethereum

import com.fasterxml.jackson.module.kotlin.readValue
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.ChainResponse
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson
import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionRefJson
import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException
import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcResponseError
import io.emeraldpay.dshackle.upstream.finalization.FinalizationData
import io.emeraldpay.dshackle.upstream.finalization.FinalizationDetector
import io.emeraldpay.dshackle.upstream.finalization.FinalizationType
Expand Down Expand Up @@ -74,35 +72,28 @@ class EthereumFinalizationDetector : FinalizationDetector {
upstream
.getIngressReader()
.read(req)
.onErrorResume {
if (it.message != null && it.message!!.matches(Regex(errorRegex))) {
log.warn("Can't retrieve tagged block, finalization detector for upstream ${upstream.getId()} $chain tag $type disabled")
disableDetector[type] = true
} else {
throw it
}
Mono.empty<ChainResponse>()
}
.flatMap {
it.requireResult().map { result ->
val block = Global.objectMapper
.readValue(
result,
BlockJson::class.java,
) as BlockJson<TransactionRefJson>?
it.requireResult().flatMap { result ->
val block = Global.objectMapper.readValue<BlockJson<TransactionRefJson>>(result)
if (block != null) {
FinalizationData(block.number, type)
Mono.just(FinalizationData(block.number, type))
} else {
throw RpcException(RpcResponseError.CODE_INVALID_JSON, "can't parse block data")
Mono.empty()
}
}
}
.onErrorResume {
if (it.message != null && it.message!!.matches(Regex(errorRegex))) {
log.warn("Can't retrieve tagged block, finalization detector of upstream {} tag {} is disabled", upstream.getId(), type)
disableDetector[type] = true
} else {
log.error("Error in FinalizationDetector of upstream {}, reason - {}", upstream.getId(), it.message)
}
Mono.empty()
}
} else {
Flux.empty()
}
}.onErrorResume {
log.error("Error in FinalizationDetector for upstream ${upstream.getId()} $chain$it")
Flux.empty()
}
}.filter {
it.height > (data[it.type]?.height ?: 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream.ethereum
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.ChainCallError
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.ChainResponse
import io.emeraldpay.dshackle.upstream.Upstream
Expand All @@ -14,6 +15,8 @@ import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import org.junit.jupiter.api.Test
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.mock
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import reactor.core.publisher.Mono
import reactor.test.StepVerifier
import java.time.Duration
Expand Down Expand Up @@ -54,6 +57,94 @@ class EthereumFinalizationDetectorTest {
.verify(Duration.ofSeconds(1))
}

@Test
fun `safe and finalized blocks returns null`() {
val reader = mock<ChainReader> {
on {
read(ChainRequest("eth_getBlockByNumber", ListParams("safe", false), 1))
} doReturn Mono.just(ChainResponse(Global.nullValue, null)) doReturn Mono.just(ChainResponse(Global.nullValue, null))
on {
read(ChainRequest("eth_getBlockByNumber", ListParams("finalized", false), 2))
} doReturn Mono.just(ChainResponse(Global.nullValue, null)) doReturn Mono.just(ChainResponse(Global.nullValue, null))
}
val upstream = mock<Upstream> {
on { getIngressReader() } doReturn reader
}

val detector = EthereumFinalizationDetector()

StepVerifier.withVirtualTime { detector.detectFinalization(upstream, Duration.ofSeconds(10), Chain.ZKSYNC__MAINNET) }
.expectSubscription()
.thenAwait(Duration.ofSeconds(0))
.expectNoEvent(Duration.ofMillis(50))
.thenAwait(Duration.ofSeconds(15))
.expectNoEvent(Duration.ofMillis(50))
.thenCancel()
.verify(Duration.ofSeconds(1))

verify(reader, times(2)).read(ChainRequest("eth_getBlockByNumber", ListParams("safe", false), 1))
verify(reader, times(2)).read(ChainRequest("eth_getBlockByNumber", ListParams("finalized", false), 2))
}

@Test
fun `disable detector if receive specific errors`() {
val reader = mock<ChainReader> {
on {
read(ChainRequest("eth_getBlockByNumber", ListParams("safe", false), 1))
} doReturn Mono.just(ChainResponse(null, ChainCallError(1, "Got an invalid block number, check it")))
on {
read(ChainRequest("eth_getBlockByNumber", ListParams("finalized", false), 2))
} doReturn response(2) doReturn response(10)
}

val upstream = mock<Upstream> {
on { getIngressReader() } doReturn reader
}

val detector = EthereumFinalizationDetector()

StepVerifier.withVirtualTime { detector.detectFinalization(upstream, Duration.ofSeconds(10), Chain.ZKSYNC__MAINNET) }
.expectSubscription()
.thenAwait(Duration.ofSeconds(0))
.expectNext(FinalizationData(2L, FinalizationType.FINALIZED_BLOCK))
.thenAwait(Duration.ofSeconds(15))
.expectNext(FinalizationData(10L, FinalizationType.FINALIZED_BLOCK))
.thenCancel()
.verify(Duration.ofSeconds(1))

verify(reader).read(ChainRequest("eth_getBlockByNumber", ListParams("safe", false), 1))
verify(reader, times(2)).read(ChainRequest("eth_getBlockByNumber", ListParams("finalized", false), 2))
}

@Test
fun `ignore response if it cannot be parsed`() {
val reader = mock<ChainReader> {
on {
read(ChainRequest("eth_getBlockByNumber", ListParams("safe", false), 1))
} doReturn Mono.just(ChainResponse("""{"name": "value"}""".toByteArray(), null)) doReturn Mono.just(ChainResponse("""{"name": "value"}""".toByteArray(), null))
on {
read(ChainRequest("eth_getBlockByNumber", ListParams("finalized", false), 2))
} doReturn response(2) doReturn response(10)
}
val upstream = mock<Upstream> {
on { getIngressReader() } doReturn reader
}

val detector = EthereumFinalizationDetector()

StepVerifier.withVirtualTime { detector.detectFinalization(upstream, Duration.ofSeconds(10), Chain.ZKSYNC__MAINNET) }
.expectSubscription()
.thenAwait(Duration.ofSeconds(0))
.expectNext(FinalizationData(2L, FinalizationType.FINALIZED_BLOCK))
.thenAwait(Duration.ofSeconds(15))
.expectNext(FinalizationData(10L, FinalizationType.FINALIZED_BLOCK))
.thenCancel()
.verify(Duration.ofSeconds(1))

verify(reader, times(2)).read(ChainRequest("eth_getBlockByNumber", ListParams("safe", false), 1))
verify(reader, times(2)).read(ChainRequest("eth_getBlockByNumber", ListParams("finalized", false), 2))
}

private fun response(blockNumber: Long) =
Mono.just(
ChainResponse(
Expand Down

0 comments on commit 32cb726

Please sign in to comment.