Skip to content

Commit

Permalink
Fixes head liveness check: timeout logic and periodic false negative …
Browse files Browse the repository at this point in the history
…events (#280)
  • Loading branch information
Termina1 authored Aug 14, 2023
1 parent 6ff75f0 commit e0cce9a
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.upstream.Head
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.scheduler.Scheduler
import java.time.Duration

class HeadLivenessValidator(
val head: Head,
val expectedBlockTime: Duration,
val scheduler: Scheduler
val scheduler: Scheduler,
private val upstreamId: String
) {

companion object {
const val CHECKED_BLOCKS_UNTIL_LIVE = 3
private val log = LoggerFactory.getLogger(HeadLivenessValidator::class.java)
}

private fun fallback(): Flux<Boolean> {
return Flux.defer {
log.info("head liveness check broken with timeout in $upstreamId")
Flux.just(false).concatWith(getFlux()) // emit false and then restart the Flux
}
}

fun getFlux(): Flux<Boolean> {
Expand All @@ -24,18 +33,21 @@ class HeadLivenessValidator(
if (value) {
Pair(acc.first + 1, true)
} else {
log.info("non consecutive blocks in head for $upstreamId")
Pair(0, false)
}
}.flatMap { (count, value) ->
// we emit when we have false or checked CHECKED_BLOCKS_UNTIL_LIVE blocks
// CHECKED_BLOCKS_UNTIL_LIVE blocks == (CHECKED_BLOCKS_UNTIL_LIVE - 1) consecutive true
when {
count == (CHECKED_BLOCKS_UNTIL_LIVE - 1) -> Flux.just(true)
count >= (CHECKED_BLOCKS_UNTIL_LIVE - 1) -> Flux.just(true)
!value -> Flux.just(false)
else -> Flux.empty()
}
// finally, we timeout after we waited for double the time we needed to emit those blocks
}.timeout(expectedBlockTime.multipliedBy(CHECKED_BLOCKS_UNTIL_LIVE.toLong() * 2), Flux.just(false))
.distinctUntilChanged().subscribeOn(scheduler)
}.timeout(
expectedBlockTime.multipliedBy(CHECKED_BLOCKS_UNTIL_LIVE.toLong() * 2),
fallback()
).subscribeOn(scheduler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class EthereumRpcConnector(
)
}
}
liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler)
liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler, id)
}

override fun setCaches(caches: Caches) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class EthereumWsConnector(
wsConnectionResubscribeScheduler,
headScheduler
)
liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler)
liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler, upstream.getId())
subscriptions = EthereumWsIngressSubscription(wsSubscriptions)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import reactor.core.publisher.Sinks

class EthereumHeadMock implements Head {

private Sinks.Many<BlockContainer> bus = Sinks.many().multicast().onBackpressureBuffer()
private Sinks.Many<BlockContainer> bus = Sinks.many().multicast().onBackpressureBuffer(10, false)
private Publisher<BlockContainer> predefined = null
private BlockContainer latest
private List<Runnable> handlers = []
Expand Down Expand Up @@ -54,7 +54,7 @@ class EthereumHeadMock implements Head {
if (predefined != null) {
return Flux.concat(Mono.justOrEmpty(latest), Flux.from(predefined))
} else {
return Flux.concat(Mono.justOrEmpty(latest), bus.asFlux()).distinctUntilChanged()
return Flux.concat(Mono.justOrEmpty(latest), bus.asFlux())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import reactor.test.StepVerifier
import spock.lang.Specification

import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger

class HeadLivenessValidatorSpec extends Specification{
def "emits true"() {
when:
def head = new EthereumHeadMock()
def checker = new HeadLivenessValidator(head, Duration.ofSeconds(10), Schedulers.boundedElastic())
def checker = new HeadLivenessValidator(head, Duration.ofSeconds(10), Schedulers.boundedElastic(), "test")
then:
StepVerifier.create(checker.flux)
.then {
Expand All @@ -25,7 +27,7 @@ class HeadLivenessValidatorSpec extends Specification{
def "starts accumulating trues but immediately emits after false"() {
when:
def head = new EthereumHeadMock()
def checker = new HeadLivenessValidator(head, Duration.ofSeconds(100), Schedulers.boundedElastic())
def checker = new HeadLivenessValidator(head, Duration.ofSeconds(100), Schedulers.boundedElastic(), "test")
then:
StepVerifier.create(checker.flux)
.then {
Expand All @@ -43,7 +45,7 @@ class HeadLivenessValidatorSpec extends Specification{
def "starts accumulating trues but timeouts because head staled"() {
when:
def head = new EthereumHeadMock()
def checker = new HeadLivenessValidator(head, Duration.ofMillis(100), Schedulers.boundedElastic())
def checker = new HeadLivenessValidator(head, Duration.ofMillis(100), Schedulers.boundedElastic(), "test")
then:
StepVerifier.create(checker.flux)
.then {
Expand All @@ -54,4 +56,25 @@ class HeadLivenessValidatorSpec extends Specification{
.expectNext(false)
.thenCancel().verify(Duration.ofSeconds(2))
}

def "it recovers after timeout"() {
when:
def head = new EthereumHeadMock()
def checker = new HeadLivenessValidator(head, Duration.ofMillis(200), Schedulers.boundedElastic(), "test")
then:
StepVerifier.create(checker.flux)
.then {
head.nextBlock(TestingCommons.blockForEthereum(1))
head.nextBlock(TestingCommons.blockForEthereum(2))
}
.thenAwait(Duration.ofSeconds(1))
.expectNext(false)
.then {
head.nextBlock(TestingCommons.blockForEthereum(3))
head.nextBlock(TestingCommons.blockForEthereum(4))
head.nextBlock(TestingCommons.blockForEthereum(5))
}
.expectNext(true)
.thenCancel().verify(Duration.ofSeconds(3))
}
}

0 comments on commit e0cce9a

Please sign in to comment.