Skip to content

Commit

Permalink
Check chain settings via ws (#544)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Aug 12, 2024
1 parent e8410d7 commit 96487b6
Show file tree
Hide file tree
Showing 20 changed files with 449 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.emeraldpay.dshackle.upstream

import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.Meter
Expand Down Expand Up @@ -164,15 +165,15 @@ abstract class AbstractHead @JvmOverloads constructor(
metrics.forEach { Metrics.globalRegistry.remove(it) }
}

protected open fun onNoHeadUpdates() {
open fun onNoHeadUpdates() {
// NOOP
}

override fun onSyncingNode(isSyncing: Boolean) {
// NOOP
}

override fun headLiveness(): Flux<Boolean> {
override fun headLiveness(): Flux<HeadLivenessState> {
return Flux.empty()
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/kotlin/io/emeraldpay/dshackle/upstream/EmptyHead.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.emeraldpay.dshackle.upstream

import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState
import reactor.core.publisher.Flux

class EmptyHead : Head {
Expand Down Expand Up @@ -44,7 +45,7 @@ class EmptyHead : Head {
override fun onSyncingNode(isSyncing: Boolean) {
}

override fun headLiveness(): Flux<Boolean> = Flux.empty()
override fun headLiveness(): Flux<HeadLivenessState> = Flux.empty()

override fun getCurrent(): BlockContainer? {
return null
Expand Down
3 changes: 2 additions & 1 deletion src/main/kotlin/io/emeraldpay/dshackle/upstream/Head.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.emeraldpay.dshackle.upstream

import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessState
import reactor.core.publisher.Flux

/**
Expand Down Expand Up @@ -46,7 +47,7 @@ interface Head {

fun onSyncingNode(isSyncing: Boolean)

fun headLiveness(): Flux<Boolean>
fun headLiveness(): Flux<HeadLivenessState>

fun getCurrent(): BlockContainer?
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,17 @@ object EthereumChainSpecific : AbstractPollChainSpecific() {
}
}

override fun chainSettingsValidator(
chain: Chain,
upstream: Upstream,
reader: ChainReader,
): SingleValidator<ValidateUpstreamSettingsResult>? {
if (upstream.getOptions().disableUpstreamValidation) {
return null
}
return ChainIdValidator(upstream, chain, reader)
}

override fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector {
return BasicEthUpstreamRpcModulesDetector(upstream)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig
import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.ChainResponse
import io.emeraldpay.dshackle.upstream.SingleValidator
Expand All @@ -37,6 +38,8 @@ import reactor.kotlin.extra.retry.retryRandomBackoff
import java.math.BigInteger
import java.time.Duration
import java.util.concurrent.TimeoutException
import java.util.function.Supplier

interface CallLimitValidator : SingleValidator<ValidateUpstreamSettingsResult> {
fun isEnabled(): Boolean
}
Expand Down Expand Up @@ -144,7 +147,11 @@ fun callLimitValidatorFactory(
class ChainIdValidator(
private val upstream: Upstream,
private val chain: Chain,
private val customReader: ChainReader? = null,
) : SingleValidator<ValidateUpstreamSettingsResult> {
private val validatorReader: Supplier<ChainReader> = Supplier {
customReader ?: upstream.getIngressReader()
}

companion object {
@JvmStatic
Expand Down Expand Up @@ -186,7 +193,7 @@ class ChainIdValidator(
}

private fun chainId(): Mono<String> {
return upstream.getIngressReader()
return validatorReader.get()
.read(ChainRequest("eth_chainId", ListParams()))
.retryRandomBackoff(3, Duration.ofMillis(100), Duration.ofMillis(500)) { ctx ->
log.warn(
Expand All @@ -199,7 +206,7 @@ class ChainIdValidator(
}

private fun netVersion(): Mono<String> {
return upstream.getIngressReader()
return validatorReader.get()
.read(ChainRequest("net_version", ListParams()))
.retryRandomBackoff(3, Duration.ofMillis(100), Duration.ofMillis(500)) { ctx ->
log.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_SETTINGS_ERROR
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult.UPSTREAM_VALID
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import io.emeraldpay.dshackle.upstream.generic.ChainSpecific
import io.emeraldpay.dshackle.upstream.generic.GenericHead
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcWsClient
import reactor.core.Disposable
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import reactor.core.scheduler.Scheduler
import reactor.kotlin.core.publisher.switchIfEmpty
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
Expand All @@ -42,6 +46,7 @@ class GenericWsHead(
headScheduler: Scheduler,
upstream: DefaultUpstream,
private val chainSpecific: ChainSpecific,
jsonRpcWsClient: JsonRpcWsClient,
timeout: Duration,
) : GenericHead(upstream.getId(), forkChoice, blockValidator, headScheduler, chainSpecific), Lifecycle {
private val wsHeadTimeout = run {
Expand All @@ -54,6 +59,7 @@ class GenericWsHead(
}.also {
log.info("WS head timeout for ${upstream.getId()} is $it")
}
private val chainIdValidator = chainSpecific.chainSettingsValidator(upstream.getChain(), upstream, jsonRpcWsClient)

private var connectionId: String? = null
private var subscribed = false
Expand All @@ -63,7 +69,7 @@ class GenericWsHead(
private var subscription: Disposable? = null
private var headResubSubscription: Disposable? = null
private val noHeadUpdatesSink = Sinks.many().multicast().directBestEffort<Boolean>()
private val headLivenessSink = Sinks.many().multicast().directBestEffort<Boolean>()
private val headLivenessSink = Sinks.many().multicast().directBestEffort<HeadLivenessState>()

private var subscriptionId = AtomicReference("")

Expand Down Expand Up @@ -99,15 +105,37 @@ class GenericWsHead(
}

private fun listenNewHeads(): Flux<BlockContainer> {
return subscribe()
return Mono.justOrEmpty(chainIdValidator)
.flatMap {
chainSpecific.getFromHeader(it, "unknown", api)
it!!.validate(UPSTREAM_SETTINGS_ERROR)
}
.timeout(wsHeadTimeout, Mono.error(RuntimeException("No response from subscribe to newHeads")))
.onErrorResume {
log.error("Error getting heads for $upstreamId", it)
subscribed = false
unsubscribe()
.switchIfEmpty {
Mono.just(UPSTREAM_VALID)
}
.flatMapMany {
when (it) {
UPSTREAM_VALID -> {
subscribe()
.flatMap { data ->
chainSpecific.getFromHeader(data, "unknown", api)
}
.timeout(wsHeadTimeout, Mono.error(RuntimeException("No response from subscribe to newHeads")))
.onErrorResume { err ->
log.error("Error getting heads for {}, message {}", upstreamId, err.message)
unsubscribe()
}
}
UPSTREAM_SETTINGS_ERROR -> {
log.warn("Couldn't check chain settings via ws connection for {}, ws sub will be recreated", upstreamId)
subscribed = false
Mono.empty()
}
else -> {
log.error("Chain settings check hasn't been passed via ws connection, upstream {} will be removed", upstreamId)
headLivenessSink.emitNext(HeadLivenessState.FATAL_ERROR) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
Mono.empty()
}
}
}
}

Expand All @@ -118,9 +146,10 @@ class GenericWsHead(
headResubSubscription = null
}

override fun headLiveness(): Flux<Boolean> = headLivenessSink.asFlux()
override fun headLiveness(): Flux<HeadLivenessState> = headLivenessSink.asFlux()

private fun unsubscribe(): Mono<BlockContainer> {
subscribed = false
return wsSubscriptions.unsubscribe(chainSpecific.unsubscribeNewHeadsRequest(subscriptionId.get()).copy(id = ids.getAndIncrement()))
.flatMap { it.requireResult() }
.doOnNext { log.warn("{} has just unsubscribed from newHeads", upstreamId) }
Expand Down Expand Up @@ -152,7 +181,7 @@ class GenericWsHead(
val connectionStates = wsSubscriptions.connectionInfoFlux()
.map {
if (it.connectionId == connectionId && it.connectionState == WsConnection.ConnectionState.DISCONNECTED) {
headLivenessSink.emitNext(false) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
headLivenessSink.emitNext(HeadLivenessState.DISCONNECTED) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
subscribed = false
connected = false
connectionId = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@ package io.emeraldpay.dshackle.upstream.ethereum

import reactor.core.publisher.Flux

enum class HeadLivenessState {
OK, NON_CONSECUTIVE, DISCONNECTED, FATAL_ERROR
}

interface HeadLivenessValidator {
fun getFlux(): Flux<Boolean>
fun getFlux(): Flux<HeadLivenessState>
}

class NoHeadLivenessValidator : HeadLivenessValidator {

override fun getFlux(): Flux<Boolean> {
return Flux.just(false)
override fun getFlux(): Flux<HeadLivenessState> {
return Flux.just(HeadLivenessState.NON_CONSECUTIVE)
}
}

class AlwaysHeadLivenessValidator : HeadLivenessValidator {
override fun getFlux(): Flux<Boolean> {
return Flux.just(true)
override fun getFlux(): Flux<HeadLivenessState> {
return Flux.just(HeadLivenessState.OK)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class HeadLivenessValidatorImpl(
private val log = LoggerFactory.getLogger(HeadLivenessValidatorImpl::class.java)
}

override fun getFlux(): Flux<Boolean> {
override fun getFlux(): Flux<HeadLivenessState> {
val headLiveness = head.headLiveness()
// first we have moving window of 2 blocks and check that they are consecutive ones
val headFlux = head.getFlux().map { it.height }.buffer(2, 1).map {
Expand All @@ -39,13 +39,13 @@ class HeadLivenessValidatorImpl(
// we emit when we have false or checked CHECKED_BLOCKS_UNTIL_LIVE blocks
// CHECKED_BLOCKS_UNTIL_LIVE blocks == (CHECKED_BLOCKS_UNTIL_LIVE - 1) consecutive true
when {
count >= (CHECKED_BLOCKS_UNTIL_LIVE - 1) -> Flux.just(true)
!value -> Flux.just(false)
count >= (CHECKED_BLOCKS_UNTIL_LIVE - 1) -> Flux.just(HeadLivenessState.OK)
!value -> Flux.just(HeadLivenessState.NON_CONSECUTIVE)
else -> Flux.empty()
}
}.timeout(
expectedBlockTime.multipliedBy(CHECKED_BLOCKS_UNTIL_LIVE.toLong() * 2),
Flux.just(false).doOnNext {
Flux.just(HeadLivenessState.NON_CONSECUTIVE).doOnNext {
if (log.isDebugEnabled) {
log.debug("head liveness check broken with timeout in $upstreamId")
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ abstract class AbstractChainSpecific : ChainSpecific {
return null
}

override fun chainSettingsValidator(
chain: Chain,
upstream: Upstream,
reader: ChainReader,
): SingleValidator<ValidateUpstreamSettingsResult>? {
return null
}

override fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector? {
return null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.LogsOracle
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.SingleValidator
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamRpcModulesDetector
import io.emeraldpay.dshackle.upstream.UpstreamSettingsDetector
import io.emeraldpay.dshackle.upstream.UpstreamValidator
import io.emeraldpay.dshackle.upstream.ValidateUpstreamSettingsResult
import io.emeraldpay.dshackle.upstream.beaconchain.BeaconChainSpecific
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.calls.CallSelector
Expand Down Expand Up @@ -82,6 +84,8 @@ interface ChainSpecific {

fun upstreamSettingsDetector(chain: Chain, upstream: Upstream): UpstreamSettingsDetector?

fun chainSettingsValidator(chain: Chain, upstream: Upstream, reader: ChainReader): SingleValidator<ValidateUpstreamSettingsResult>?

fun upstreamRpcModulesDetector(upstream: Upstream): UpstreamRpcModulesDetector?

fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription
Expand Down
Loading

0 comments on commit 96487b6

Please sign in to comment.