Skip to content

Commit

Permalink
Check chain settings if the next height is very different (#548)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Aug 15, 2024
1 parent 96487b6 commit 021add4
Show file tree
Hide file tree
Showing 12 changed files with 584 additions and 19 deletions.
2 changes: 1 addition & 1 deletion foundation/src/main/resources/public
Submodule public updated 1 files
+1 −1 chains.yaml
25 changes: 24 additions & 1 deletion src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.micrometer.core.instrument.Metrics
import org.slf4j.LoggerFactory
import reactor.core.Disposable
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.SignalType
import reactor.core.publisher.Sinks
import reactor.core.publisher.Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER
Expand Down Expand Up @@ -95,6 +96,20 @@ abstract class AbstractHead @JvmOverloads constructor(
log.warn("Received signal $upstreamId $it, continue emit heads")
}
}
.flatMap {
if (isSuspiciousBlock(it)) {
log.warn(
"Got a suspicious head of upstream {} with a height {} " +
"that is very different from the previous head with a height {}, validating this upstream",
upstreamId,
it.height,
getCurrentHeight(),
)
checkSuspiciousBlock(it)
} else {
Mono.just(it)
}
}
.subscribeOn(headScheduler)
.subscribe { block ->
val valid = runCatching {
Expand Down Expand Up @@ -165,7 +180,7 @@ abstract class AbstractHead @JvmOverloads constructor(
metrics.forEach { Metrics.globalRegistry.remove(it) }
}

open fun onNoHeadUpdates() {
protected open fun onNoHeadUpdates() {
// NOOP
}

Expand Down Expand Up @@ -196,4 +211,12 @@ abstract class AbstractHead @JvmOverloads constructor(
)
}
}

protected open fun isSuspiciousBlock(block: BlockContainer): Boolean {
return false
}

protected open fun checkSuspiciousBlock(block: BlockContainer): Mono<BlockContainer> {
return Mono.just(block)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ object EthereumChainSpecific : AbstractPollChainSpecific() {
override fun chainSettingsValidator(
chain: Chain,
upstream: Upstream,
reader: ChainReader,
reader: ChainReader?,
): SingleValidator<ValidateUpstreamSettingsResult>? {
if (upstream.getOptions().disableUpstreamValidation) {
return null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.SingleValidator
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_VALID
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
Expand Down Expand Up @@ -69,7 +72,6 @@ class GenericWsHead(
private var subscription: Disposable? = null
private var headResubSubscription: Disposable? = null
private val noHeadUpdatesSink = Sinks.many().multicast().directBestEffort<Boolean>()
private val headLivenessSink = Sinks.many().multicast().directBestEffort<HeadLivenessState>()

private var subscriptionId = AtomicReference("")

Expand Down Expand Up @@ -130,7 +132,7 @@ class GenericWsHead(
subscribed = false
Mono.empty()
}
else -> {
UPSTREAM_FATAL_SETTINGS_ERROR -> {
log.error("Chain settings check hasn't been passed via ws connection, upstream {} will be removed", upstreamId)
headLivenessSink.emitNext(HeadLivenessState.FATAL_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
Mono.empty()
Expand All @@ -146,7 +148,9 @@ class GenericWsHead(
headResubSubscription = null
}

override fun headLiveness(): Flux<HeadLivenessState> = headLivenessSink.asFlux()
override fun chainIdValidator(): SingleValidator<ValidateUpstreamSettingsResult>? {
return chainIdValidator
}

private fun unsubscribe(): Mono<BlockContainer> {
subscribed = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ abstract class AbstractChainSpecific : ChainSpecific {
override fun chainSettingsValidator(
chain: Chain,
upstream: Upstream,
reader: ChainReader,
reader: ChainReader?,
): SingleValidator<ValidateUpstreamSettingsResult>? {
return null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ interface ChainSpecific {

fun upstreamSettingsDetector(chain: Chain, upstream: Upstream): UpstreamSettingsDetector?

fun chainSettingsValidator(chain: Chain, upstream: Upstream, reader: ChainReader): SingleValidator<ValidateUpstreamSettingsResult>?
fun chainSettingsValidator(chain: Chain, upstream: Upstream, reader: ChainReader?): SingleValidator<ValidateUpstreamSettingsResult>?

fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,16 @@ import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.AbstractHead
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.SingleValidator
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult
import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import reactor.core.scheduler.Scheduler
import reactor.kotlin.core.publisher.switchIfEmpty
import kotlin.math.abs

open class GenericHead(
protected val upstreamId: String,
Expand All @@ -32,6 +39,7 @@ open class GenericHead(
private val headScheduler: Scheduler,
private val chainSpecific: ChainSpecific,
) : Head, AbstractHead(forkChoice, headScheduler, blockValidator, 60_000, upstreamId) {
protected val headLivenessSink: Sinks.Many<HeadLivenessState> = Sinks.many().multicast().directBestEffort()

fun getLatestBlock(api: ChainReader): Mono<BlockContainer> {
return chainSpecific.getLatestBlock(api, upstreamId)
Expand All @@ -42,4 +50,44 @@ open class GenericHead(
Mono.empty()
}
}

override fun isSuspiciousBlock(block: BlockContainer): Boolean {
return getCurrentHeight()
?.let {
abs(block.height - it) > 10000
} ?: false
}

override fun checkSuspiciousBlock(block: BlockContainer): Mono<BlockContainer> {
return Mono.justOrEmpty(chainIdValidator())
.flatMap {
it!!.validate(ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR)
}
.switchIfEmpty {
Mono.just(ValidateUpstreamSettingsResult.UPSTREAM_VALID)
}
.flatMap { validationResult ->
when (validationResult) {
ValidateUpstreamSettingsResult.UPSTREAM_VALID -> {
log.info("Block {} of upstream {} has been received from the same chain, validation is passed", block.height, upstreamId)
Mono.just(block)
}
ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR -> {
log.warn("Block {} of upstream {} is filtered and can not be emitted due to upstream settings error check", block.height, upstreamId)
Mono.empty()
}
ValidateUpstreamSettingsResult.UPSTREAM_FATAL_SETTINGS_ERROR -> {
log.error("Block {} of upstream {} can not be emitted due to chain inconsistency, upstream will be removed", block.height, upstreamId)
headLivenessSink.emitNext(HeadLivenessState.FATAL_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
Mono.empty()
}
}
}
}

override fun headLiveness(): Flux<HeadLivenessState> = headLivenessSink.asFlux()

protected open fun chainIdValidator(): SingleValidator<ValidateUpstreamSettingsResult>? {
return null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package io.emeraldpay.dshackle.upstream.generic

import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.SingleValidator
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import reactor.core.Disposable
import reactor.core.publisher.Flux
Expand All @@ -30,10 +33,12 @@ class GenericRpcHead(
forkChoice: ForkChoice,
upstreamId: String,
blockValidator: BlockValidator,
upstream: DefaultUpstream,
private val headScheduler: Scheduler,
private val chainSpecific: ChainSpecific,
private val interval: Duration = Duration.ofSeconds(10),
) : GenericHead(upstreamId, forkChoice, blockValidator, headScheduler, chainSpecific), Lifecycle {
private val chainIdValidator = chainSpecific.chainSettingsValidator(upstream.getChain(), upstream, null)

private var refreshSubscription: Disposable? = null
private var isSyncing = false
Expand Down Expand Up @@ -63,4 +68,8 @@ class GenericRpcHead(
refreshSubscription?.dispose()
refreshSubscription = null
}

override fun chainIdValidator(): SingleValidator<ValidateUpstreamSettingsResult>? {
return chainIdValidator
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class GenericRpcConnector(
forkChoice,
id,
blockValidator,
upstream,
headScheduler,
chainSpecific,
expectedBlockTime.coerceAtLeast(Duration.ofSeconds(1)),
Expand Down Expand Up @@ -112,6 +113,7 @@ class GenericRpcConnector(
AlwaysForkChoice(),
id,
blockValidator,
upstream,
headScheduler,
chainSpecific,
Duration.ofSeconds(30),
Expand Down
Loading

0 comments on commit 021add4

Please sign in to comment.