diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt index 88b04832e..cf15afce9 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt @@ -15,17 +15,17 @@ import java.util.concurrent.Executors @Configuration open class SchedulersConfig { - private val log = LoggerFactory.getLogger(SchedulersConfig::class.java) - private val threadsMultiplier: Int - init { - val cores = Runtime.getRuntime().availableProcessors() - threadsMultiplier = if (cores < 3) { - 1 - } else { - cores / 2 - } - log.info("Creating schedulers with multiplier: {}...", threadsMultiplier) + companion object { + private val log = LoggerFactory.getLogger(SchedulersConfig::class.java) + val threadsMultiplier = run { + val cores = Runtime.getRuntime().availableProcessors() + if (cores < 3) { + 1 + } else { + cores / 2 + } + }.also { log.info("Creating schedulers with multiplier: {}...", it) } } @Bean diff --git a/src/main/kotlin/io/emeraldpay/dshackle/quorum/QuorumRequestReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/quorum/QuorumRequestReader.kt index a7f167df5..09d50429a 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/quorum/QuorumRequestReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/quorum/QuorumRequestReader.kt @@ -28,6 +28,7 @@ import io.emeraldpay.dshackle.upstream.ChainException import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.ChainResponse import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.error.UpstreamErrorHandler import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException import io.emeraldpay.dshackle.upstream.signature.ResponseSigner import org.slf4j.LoggerFactory @@ -51,6 +52,7 @@ class QuorumRequestReader( signer: ResponseSigner?, private val tracer: Tracer, ) : RequestReader(signer) { + private val errorHandler = UpstreamErrorHandler companion object { private val log = LoggerFactory.getLogger(QuorumRequestReader::class.java) @@ -174,6 +176,8 @@ class QuorumRequestReader( private fun withErrorResume(api: Upstream, key: ChainRequest): Function, Mono> { return Function { src -> src.onErrorResume { err -> + errorHandler.handle(api, key, err.message) + val msgError = "Error during call upstream ${api.getId()} with method ${key.method}" if (err is ChainCallUpstreamException) { log.debug(msgError, err) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/reader/BroadcastReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/reader/BroadcastReader.kt index 6dd4551d0..5dea8a6b4 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/reader/BroadcastReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/reader/BroadcastReader.kt @@ -8,6 +8,7 @@ import io.emeraldpay.dshackle.upstream.ChainRequest import io.emeraldpay.dshackle.upstream.ChainResponse import io.emeraldpay.dshackle.upstream.Selector import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.error.UpstreamErrorHandler import io.emeraldpay.dshackle.upstream.signature.ResponseSigner import org.slf4j.LoggerFactory import org.springframework.cloud.sleuth.Tracer @@ -22,6 +23,7 @@ class BroadcastReader( private val quorum: CallQuorum, private val tracer: Tracer, ) : RequestReader(signer) { + private val errorHandler = UpstreamErrorHandler private val internalMatcher = Selector.MultiMatcher( listOf(Selector.AvailabilityMatcher(), matcher), ) @@ -81,6 +83,8 @@ class BroadcastReader( .read(key) .map { BroadcastResponse(it, upstream) } .onErrorResume { + errorHandler.handle(upstream, key, it.message) + log.warn("Error during execution ${key.method} from upstream ${upstream.getId()} with message - ${it.message}") Mono.just( BroadcastResponse(ChainResponse(null, getError(key, it).error), upstream), diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt index 44b273d2f..27ee20f2b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt @@ -23,6 +23,7 @@ import io.emeraldpay.dshackle.foundation.ChainOptions import io.emeraldpay.dshackle.startup.QuorumForLabels import io.emeraldpay.dshackle.startup.UpstreamChangeEvent import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Sinks @@ -154,5 +155,9 @@ abstract class DefaultUpstream( return id } + override fun updateLowerBound(lowerBound: Long, type: LowerBoundType) { + // NOOP + } + data class Status(val lag: Long?, val avail: UpstreamAvailability, val status: UpstreamAvailability) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index 10359a0fd..fbf92100d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -420,6 +420,10 @@ abstract class Multistream( override fun nodeId(): Byte = 0 + override fun updateLowerBound(lowerBound: Long, type: LowerBoundType) { + // NOOP + } + fun printStatus() { var height: Long? = null try { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt index 986db4d0b..9276f796b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt @@ -22,6 +22,7 @@ import io.emeraldpay.dshackle.reader.ChainReader import io.emeraldpay.dshackle.startup.UpstreamChangeEvent import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType import reactor.core.publisher.Flux interface Upstream : Lifecycle { @@ -47,6 +48,7 @@ interface Upstream : Lifecycle { fun isGrpc(): Boolean fun getLowerBounds(): Collection fun getUpstreamSettingsData(): UpstreamSettingsData? + fun updateLowerBound(lowerBound: Long, type: LowerBoundType) fun cast(selfType: Class): T diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainLowerBoundStateDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainLowerBoundStateDetector.kt index 2793c1b5e..4581b7993 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainLowerBoundStateDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainLowerBoundStateDetector.kt @@ -14,4 +14,8 @@ class BeaconChainLowerBoundStateDetector : LowerBoundDetector() { override fun internalDetectLowerBound(): Flux { return Flux.just(LowerBoundData(1, LowerBoundType.STATE)) } + + override fun types(): Set { + return setOf(LowerBoundType.STATE) + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/error/EthereumStateLowerBoundErrorHandler.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/error/EthereumStateLowerBoundErrorHandler.kt new file mode 100644 index 000000000..064d03a7d --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/error/EthereumStateLowerBoundErrorHandler.kt @@ -0,0 +1,66 @@ +package io.emeraldpay.dshackle.upstream.error + +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundStateDetector.Companion.stateErrors +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import org.slf4j.LoggerFactory + +object EthereumStateLowerBoundErrorHandler : ErrorHandler { + private val log = LoggerFactory.getLogger(this::class.java) + + private val firstTagIndexMethods = setOf( + "eth_call", + "debug_traceCall", + "eth_getBalance", + "eth_estimateGas", + "eth_getCode", + "eth_getTransactionCount", + ) + private val secondTagIndexMethods = setOf( + "eth_getProof", + "eth_getStorageAt", + ) + + private val applicableMethods = firstTagIndexMethods + secondTagIndexMethods + + override fun handle(upstream: Upstream, request: ChainRequest, errorMessage: String?) { + try { + if (canHandle(request, errorMessage)) { + parseTagParam(request, tagIndex(request.method))?.let { + upstream.updateLowerBound(it, LowerBoundType.STATE) + } + } + } catch (e: RuntimeException) { + log.warn("Couldn't update the {} lower bound of {}, reason - {}", LowerBoundType.STATE, upstream.getId(), e.message) + } + } + + override fun canHandle(request: ChainRequest, errorMessage: String?): Boolean { + return stateErrors.any { errorMessage?.contains(it) ?: false } && applicableMethods.contains(request.method) + } + + private fun parseTagParam(request: ChainRequest, tagIndex: Int): Long? { + if (tagIndex != -1 && request.params is ListParams) { + val params = request.params.list + if (params.size >= tagIndex) { + val tag = params[tagIndex] + if (tag is String && tag.startsWith("0x")) { + return tag.substring(2).toLong(16) + } + } + } + return null + } + + private fun tagIndex(method: String): Int { + return if (firstTagIndexMethods.contains(method)) { + 1 + } else if (secondTagIndexMethods.contains(method)) { + 2 + } else { + -1 + } + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/error/UpstreamErrorHandler.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/error/UpstreamErrorHandler.kt new file mode 100644 index 000000000..b1293036e --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/error/UpstreamErrorHandler.kt @@ -0,0 +1,35 @@ +package io.emeraldpay.dshackle.upstream.error + +import io.emeraldpay.dshackle.config.context.SchedulersConfig +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.Upstream +import org.springframework.scheduling.concurrent.CustomizableThreadFactory +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors + +interface ErrorHandler { + fun handle(upstream: Upstream, request: ChainRequest, errorMessage: String?) + + fun canHandle(request: ChainRequest, errorMessage: String?): Boolean +} + +object UpstreamErrorHandler { + private val errorHandlers = listOf( + EthereumStateLowerBoundErrorHandler, + ) + private val errorHandlerExecutor = Executors.newFixedThreadPool( + 2 * SchedulersConfig.threadsMultiplier, + CustomizableThreadFactory("error-handler-"), + ) + + fun handle(upstream: Upstream, request: ChainRequest, errorMessage: String?) { + CompletableFuture.runAsync( + { + errorHandlers + .filter { it.canHandle(request, errorMessage) } + .forEach { it.handle(upstream, request, errorMessage) } + }, + errorHandlerExecutor, + ) + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt index 37f1a0f15..b00051ffe 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt @@ -15,7 +15,7 @@ import reactor.core.publisher.Mono class EthereumLowerBoundBlockDetector( private val upstream: Upstream, ) : LowerBoundDetector() { - private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.BLOCK, setOf("No block data")) + private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.BLOCK, setOf("No block data"), lowerBounds) override fun period(): Long { return 3 @@ -41,4 +41,8 @@ class EthereumLowerBoundBlockDetector( } } } + + override fun types(): Set { + return setOf(LowerBoundType.BLOCK) + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt index 53a12de53..4c0282fe1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt @@ -15,10 +15,10 @@ import reactor.core.publisher.Mono class EthereumLowerBoundStateDetector( private val upstream: Upstream, ) : LowerBoundDetector() { - private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.STATE, nonRetryableErrors) + private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.STATE, stateErrors, lowerBounds) companion object { - private val nonRetryableErrors = setOf( + val stateErrors = setOf( "No state available for block", // nethermind "missing trie node", // geth "header not found", // optimism, bsc, avalanche @@ -62,4 +62,8 @@ class EthereumLowerBoundStateDetector( } } } + + override fun types(): Set { + return setOf(LowerBoundType.STATE) + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 716fc18df..59dd137fa 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -25,6 +25,7 @@ import io.emeraldpay.dshackle.upstream.generic.connectors.ConnectorFactory import io.emeraldpay.dshackle.upstream.generic.connectors.GenericConnector import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundServiceBuilder +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType import org.springframework.context.Lifecycle import reactor.core.Disposable import reactor.core.publisher.Flux @@ -245,6 +246,10 @@ open class GenericUpstream( override fun isRunning() = connector.isRunning() || started.get() + override fun updateLowerBound(lowerBound: Long, type: LowerBoundType) { + lowerBoundService.updateLowerBound(lowerBound, type) + } + fun isValid(): Boolean = isUpstreamValid.get() private fun sendUpstreamStateEvent(eventType: UpstreamChangeEvent.ChangeType) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundDetector.kt index 81c217421..e7cad448d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundDetector.kt @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream.lowerbound import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.core.publisher.Sinks import java.time.Duration import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean @@ -12,23 +13,27 @@ fun Long.toHex() = "0x${this.toString(16)}" abstract class LowerBoundDetector { protected val log = LoggerFactory.getLogger(this::class.java) - private val lowerBounds = ConcurrentHashMap() + protected val lowerBounds = ConcurrentHashMap() + private val lowerBoundSink = Sinks.many().multicast().directBestEffort() fun detectLowerBound(): Flux { val notProcessing = AtomicBoolean(true) - return Flux.interval( - Duration.ofSeconds(15), - Duration.ofMinutes(period()), + return Flux.merge( + lowerBoundSink.asFlux(), + Flux.interval( + Duration.ofSeconds(15), + Duration.ofMinutes(period()), + ) + .filter { notProcessing.get() } + .flatMap { + notProcessing.set(false) + internalDetectLowerBound() + .onErrorResume { Mono.just(LowerBoundData.default()) } + .switchIfEmpty(Flux.just(LowerBoundData.default())) + .doFinally { notProcessing.set(true) } + }, ) - .filter { notProcessing.get() } - .flatMap { - notProcessing.set(false) - internalDetectLowerBound() - .onErrorResume { Mono.just(LowerBoundData.default()) } - .switchIfEmpty(Flux.just(LowerBoundData.default())) - .doFinally { notProcessing.set(true) } - } .filter { it.lowerBound >= (lowerBounds[it.type]?.lowerBound ?: 0) } @@ -42,4 +47,10 @@ abstract class LowerBoundDetector { protected abstract fun period(): Long protected abstract fun internalDetectLowerBound(): Flux + + abstract fun types(): Set + + fun updateLowerBound(lowerBound: Long, type: LowerBoundType) { + lowerBoundSink.emitNext(LowerBoundData(lowerBound, type)) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundService.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundService.kt index 13ca03be6..2b19653ca 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundService.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundService.kt @@ -15,10 +15,11 @@ abstract class LowerBoundService( private val log = LoggerFactory.getLogger(this::class.java) private val lowerBounds = ConcurrentHashMap() + private val detectors: List by lazy { detectors() } fun detectLowerBounds(): Flux { return Flux.merge( - detectors().map { it.detectLowerBound() }, + detectors.map { it.detectLowerBound() }, ) .doOnNext { log.info("Lower bound of type ${it.type} is ${it.lowerBound} for upstream ${upstream.getId()} of chain $chain") @@ -26,6 +27,12 @@ abstract class LowerBoundService( } } + fun updateLowerBound(lowerBound: Long, type: LowerBoundType) { + detectors + .filter { it.types().contains(type) } + .forEach { it.updateLowerBound(lowerBound, type) } + } + fun getLowerBounds(): Collection = lowerBounds.values protected abstract fun detectors(): List diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt index 89645d4cc..5622b0786 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt @@ -16,6 +16,7 @@ class RecursiveLowerBound( private val upstream: Upstream, private val type: LowerBoundType, private val nonRetryableErrors: Set, + private val lowerBounds: Map, ) { private val log = LoggerFactory.getLogger(this::class.java) @@ -25,8 +26,11 @@ class RecursiveLowerBound( val currentHeight = it.getCurrentHeight() if (currentHeight == null) { Mono.empty() - } else { + } else if (!lowerBounds.contains(type)) { Mono.just(LowerBoundBinarySearch(0, currentHeight)) + } else { + // next calculations will be carried out only within the last range + Mono.just(LowerBoundBinarySearch(lowerBounds[type]!!.lowerBound, currentHeight)) } } .expand { data -> diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearLowerBoundStateDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearLowerBoundStateDetector.kt index 3cd85770d..870c8b475 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearLowerBoundStateDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearLowerBoundStateDetector.kt @@ -24,4 +24,8 @@ class NearLowerBoundStateDetector( LowerBoundData(resp.syncInfo.earliestHeight, LowerBoundType.STATE) }.toFlux() } + + override fun types(): Set { + return setOf(LowerBoundType.STATE) + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotLowerBoundStateDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotLowerBoundStateDetector.kt index 384b7560e..2b8c79f3a 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotLowerBoundStateDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotLowerBoundStateDetector.kt @@ -14,7 +14,7 @@ import reactor.core.publisher.Flux class PolkadotLowerBoundStateDetector( private val upstream: Upstream, ) : LowerBoundDetector() { - private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.STATE, nonRetryableErrors) + private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.STATE, nonRetryableErrors, lowerBounds) companion object { private val nonRetryableErrors = setOf( @@ -48,4 +48,8 @@ class PolkadotLowerBoundStateDetector( } } } + + override fun types(): Set { + return setOf(LowerBoundType.STATE) + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundSlotDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundSlotDetector.kt index 2623a9d74..8597d9f3d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundSlotDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundSlotDetector.kt @@ -76,4 +76,8 @@ class SolanaLowerBoundSlotDetector( }, ) } + + override fun types(): Set { + return setOf(LowerBoundType.SLOT, LowerBoundType.STATE) + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundStateDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundStateDetector.kt index 983870dba..2cd10ca94 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundStateDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundStateDetector.kt @@ -14,4 +14,8 @@ class StarknetLowerBoundStateDetector : LowerBoundDetector() { override fun internalDetectLowerBound(): Flux { return Flux.just(LowerBoundData(1, LowerBoundType.STATE)) } + + override fun types(): Set { + return setOf(LowerBoundType.STATE) + } } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/error/EthereumStateLowerBoundErrorHandlerTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/error/EthereumStateLowerBoundErrorHandlerTest.kt new file mode 100644 index 000000000..12ea47f9c --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/error/EthereumStateLowerBoundErrorHandlerTest.kt @@ -0,0 +1,73 @@ +package io.emeraldpay.dshackle.upstream.error + +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.Arguments.of +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.Mockito.anyLong +import org.mockito.Mockito.mock +import org.mockito.kotlin.any +import org.mockito.kotlin.never +import org.mockito.kotlin.verify + +class EthereumStateLowerBoundErrorHandlerTest { + + @ParameterizedTest + @MethodSource("requests") + fun `update lower bound`(request: ChainRequest) { + val upstream = mock() + val handler = EthereumStateLowerBoundErrorHandler + + handler.handle(upstream, request, "missing trie node d5648cc9aef48154159d53800f2f") + + verify(upstream).updateLowerBound(213229736, LowerBoundType.STATE) + } + + @Test + fun `no update lower bound if error is not about state`() { + val upstream = mock() + val request = ChainRequest( + "eth_getTransactionCount", + ListParams("0x343", "0xCB5A0A8"), + ) + val handler = EthereumStateLowerBoundErrorHandler + + handler.handle(upstream, request, "strange error") + + verify(upstream, never()).updateLowerBound(anyLong(), any()) + } + + @Test + fun `no update lower bound if there is a non-state method`() { + val upstream = mock() + val request = ChainRequest( + "eth_getBlockByNumber", + ListParams("0xCB5A0A8", true), + ) + val handler = EthereumStateLowerBoundErrorHandler + + handler.handle(upstream, request, "missing trie node d5648cc9aef48154159d53800f2f") + + verify(upstream, never()).updateLowerBound(anyLong(), any()) + } + + companion object { + @JvmStatic + fun requests(): List = + listOf( + of(ChainRequest("eth_getTransactionCount", ListParams("0x343", "0xCB5A0A8"))), + of(ChainRequest("eth_getCode", ListParams("0x343", "0xCB5A0A8"))), + of(ChainRequest("eth_estimateGas", ListParams("0x343", "0xCB5A0A8"))), + of(ChainRequest("eth_getBalance", ListParams("0x343", "0xCB5A0A8"))), + of(ChainRequest("debug_traceCall", ListParams("0x343", "0xCB5A0A8"))), + of(ChainRequest("eth_call", ListParams("0x343", "0xCB5A0A8"))), + of(ChainRequest("eth_getProof", ListParams("0x343", "test", "0xCB5A0A8"))), + of(ChainRequest("eth_getStorageAt", ListParams("0x343", "test", "0xCB5A0A8"))), + ) + } +} diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/error/UpstreamErrorHandlerTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/error/UpstreamErrorHandlerTest.kt new file mode 100644 index 000000000..5504d12bf --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/error/UpstreamErrorHandlerTest.kt @@ -0,0 +1,25 @@ +package io.emeraldpay.dshackle.upstream.error + +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType +import io.emeraldpay.dshackle.upstream.rpcclient.ListParams +import org.junit.jupiter.api.Test +import org.mockito.Mockito.mock +import org.mockito.kotlin.verify + +class UpstreamErrorHandlerTest { + + @Test + fun `use lower bound error handler`() { + val upstream = mock() + val request = ChainRequest("eth_getCode", ListParams("0x343", "0xCB5A0A8")) + val handler = UpstreamErrorHandler + + handler.handle(upstream, request, "missing trie node d5648cc9aef48154159d53800f2f") + + Thread.sleep(100) + + verify(upstream).updateLowerBound(213229736, LowerBoundType.STATE) + } +}