diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt index 6f86a1408..17ec50c7e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt @@ -1,6 +1,7 @@ package io.emeraldpay.dshackle.upstream.ethereum import io.emeraldpay.dshackle.upstream.Head +import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.scheduler.Scheduler import java.time.Duration @@ -8,11 +9,19 @@ import java.time.Duration class HeadLivenessValidator( val head: Head, val expectedBlockTime: Duration, - val scheduler: Scheduler + val scheduler: Scheduler, + private val upstreamId: String ) { - companion object { const val CHECKED_BLOCKS_UNTIL_LIVE = 3 + private val log = LoggerFactory.getLogger(HeadLivenessValidator::class.java) + } + + private fun fallback(): Flux { + return Flux.defer { + log.info("head liveness check broken with timeout in $upstreamId") + Flux.just(false).concatWith(getFlux()) // emit false and then restart the Flux + } } fun getFlux(): Flux { @@ -24,18 +33,21 @@ class HeadLivenessValidator( if (value) { Pair(acc.first + 1, true) } else { + log.info("non consecutive blocks in head for $upstreamId") Pair(0, false) } }.flatMap { (count, value) -> // we emit when we have false or checked CHECKED_BLOCKS_UNTIL_LIVE blocks // CHECKED_BLOCKS_UNTIL_LIVE blocks == (CHECKED_BLOCKS_UNTIL_LIVE - 1) consecutive true when { - count == (CHECKED_BLOCKS_UNTIL_LIVE - 1) -> Flux.just(true) + count >= (CHECKED_BLOCKS_UNTIL_LIVE - 1) -> Flux.just(true) !value -> Flux.just(false) else -> Flux.empty() } // finally, we timeout after we waited for double the time we needed to emit those blocks - }.timeout(expectedBlockTime.multipliedBy(CHECKED_BLOCKS_UNTIL_LIVE.toLong() * 2), Flux.just(false)) - .distinctUntilChanged().subscribeOn(scheduler) + }.timeout( + expectedBlockTime.multipliedBy(CHECKED_BLOCKS_UNTIL_LIVE.toLong() * 2), + fallback() + ).subscribeOn(scheduler) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumRpcConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumRpcConnector.kt index f0c823905..5c1732db4 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumRpcConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumRpcConnector.kt @@ -99,7 +99,7 @@ class EthereumRpcConnector( ) } } - liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler) + liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler, id) } override fun setCaches(caches: Caches) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumWsConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumWsConnector.kt index ae79bd2ad..9755c578d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumWsConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/connectors/EthereumWsConnector.kt @@ -46,7 +46,7 @@ class EthereumWsConnector( wsConnectionResubscribeScheduler, headScheduler ) - liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler) + liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler, upstream.getId()) subscriptions = EthereumWsIngressSubscription(wsSubscriptions) } diff --git a/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy b/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy index da2eeffa6..ac02a4fbe 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy @@ -26,7 +26,7 @@ import reactor.core.publisher.Sinks class EthereumHeadMock implements Head { - private Sinks.Many bus = Sinks.many().multicast().onBackpressureBuffer() + private Sinks.Many bus = Sinks.many().multicast().onBackpressureBuffer(10, false) private Publisher predefined = null private BlockContainer latest private List handlers = [] @@ -54,7 +54,7 @@ class EthereumHeadMock implements Head { if (predefined != null) { return Flux.concat(Mono.justOrEmpty(latest), Flux.from(predefined)) } else { - return Flux.concat(Mono.justOrEmpty(latest), bus.asFlux()).distinctUntilChanged() + return Flux.concat(Mono.justOrEmpty(latest), bus.asFlux()) } } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy index 811c1f543..3c7ff7540 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy @@ -7,12 +7,14 @@ import reactor.test.StepVerifier import spock.lang.Specification import java.time.Duration +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger class HeadLivenessValidatorSpec extends Specification{ def "emits true"() { when: def head = new EthereumHeadMock() - def checker = new HeadLivenessValidator(head, Duration.ofSeconds(10), Schedulers.boundedElastic()) + def checker = new HeadLivenessValidator(head, Duration.ofSeconds(10), Schedulers.boundedElastic(), "test") then: StepVerifier.create(checker.flux) .then { @@ -25,7 +27,7 @@ class HeadLivenessValidatorSpec extends Specification{ def "starts accumulating trues but immediately emits after false"() { when: def head = new EthereumHeadMock() - def checker = new HeadLivenessValidator(head, Duration.ofSeconds(100), Schedulers.boundedElastic()) + def checker = new HeadLivenessValidator(head, Duration.ofSeconds(100), Schedulers.boundedElastic(), "test") then: StepVerifier.create(checker.flux) .then { @@ -43,7 +45,7 @@ class HeadLivenessValidatorSpec extends Specification{ def "starts accumulating trues but timeouts because head staled"() { when: def head = new EthereumHeadMock() - def checker = new HeadLivenessValidator(head, Duration.ofMillis(100), Schedulers.boundedElastic()) + def checker = new HeadLivenessValidator(head, Duration.ofMillis(100), Schedulers.boundedElastic(), "test") then: StepVerifier.create(checker.flux) .then { @@ -54,4 +56,25 @@ class HeadLivenessValidatorSpec extends Specification{ .expectNext(false) .thenCancel().verify(Duration.ofSeconds(2)) } + + def "it recovers after timeout"() { + when: + def head = new EthereumHeadMock() + def checker = new HeadLivenessValidator(head, Duration.ofMillis(200), Schedulers.boundedElastic(), "test") + then: + StepVerifier.create(checker.flux) + .then { + head.nextBlock(TestingCommons.blockForEthereum(1)) + head.nextBlock(TestingCommons.blockForEthereum(2)) + } + .thenAwait(Duration.ofSeconds(1)) + .expectNext(false) + .then { + head.nextBlock(TestingCommons.blockForEthereum(3)) + head.nextBlock(TestingCommons.blockForEthereum(4)) + head.nextBlock(TestingCommons.blockForEthereum(5)) + } + .expectNext(true) + .thenCancel().verify(Duration.ofSeconds(3)) + } }