Skip to content

Commit

Permalink
Calculate lower bounds inside range + adaptive update (#485)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored May 23, 2024
1 parent 100578d commit 75d6fb3
Show file tree
Hide file tree
Showing 21 changed files with 301 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ import java.util.concurrent.Executors

@Configuration
open class SchedulersConfig {
private val log = LoggerFactory.getLogger(SchedulersConfig::class.java)
private val threadsMultiplier: Int

init {
val cores = Runtime.getRuntime().availableProcessors()
threadsMultiplier = if (cores < 3) {
1
} else {
cores / 2
}
log.info("Creating schedulers with multiplier: {}...", threadsMultiplier)
companion object {
private val log = LoggerFactory.getLogger(SchedulersConfig::class.java)
val threadsMultiplier = run {
val cores = Runtime.getRuntime().availableProcessors()
if (cores < 3) {
1
} else {
cores / 2
}
}.also { log.info("Creating schedulers with multiplier: {}...", it) }
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import io.emeraldpay.dshackle.upstream.ChainException
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.ChainResponse
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.error.UpstreamErrorHandler
import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException
import io.emeraldpay.dshackle.upstream.signature.ResponseSigner
import org.slf4j.LoggerFactory
Expand All @@ -51,6 +52,7 @@ class QuorumRequestReader(
signer: ResponseSigner?,
private val tracer: Tracer,
) : RequestReader(signer) {
private val errorHandler = UpstreamErrorHandler

companion object {
private val log = LoggerFactory.getLogger(QuorumRequestReader::class.java)
Expand Down Expand Up @@ -174,6 +176,8 @@ class QuorumRequestReader(
private fun <T> withErrorResume(api: Upstream, key: ChainRequest): Function<Mono<T>, Mono<T>> {
return Function { src ->
src.onErrorResume { err ->
errorHandler.handle(api, key, err.message)

val msgError = "Error during call upstream ${api.getId()} with method ${key.method}"
if (err is ChainCallUpstreamException) {
log.debug(msgError, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.ChainResponse
import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.error.UpstreamErrorHandler
import io.emeraldpay.dshackle.upstream.signature.ResponseSigner
import org.slf4j.LoggerFactory
import org.springframework.cloud.sleuth.Tracer
Expand All @@ -22,6 +23,7 @@ class BroadcastReader(
private val quorum: CallQuorum,
private val tracer: Tracer,
) : RequestReader(signer) {
private val errorHandler = UpstreamErrorHandler
private val internalMatcher = Selector.MultiMatcher(
listOf(Selector.AvailabilityMatcher(), matcher),
)
Expand Down Expand Up @@ -81,6 +83,8 @@ class BroadcastReader(
.read(key)
.map { BroadcastResponse(it, upstream) }
.onErrorResume {
errorHandler.handle(upstream, key, it.message)

log.warn("Error during execution ${key.method} from upstream ${upstream.getId()} with message - ${it.message}")
Mono.just(
BroadcastResponse(ChainResponse(null, getError(key, it).error), upstream),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.emeraldpay.dshackle.foundation.ChainOptions
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.startup.UpstreamChangeEvent
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
Expand Down Expand Up @@ -154,5 +155,9 @@ abstract class DefaultUpstream(
return id
}

override fun updateLowerBound(lowerBound: Long, type: LowerBoundType) {
// NOOP
}

data class Status(val lag: Long?, val avail: UpstreamAvailability, val status: UpstreamAvailability)
}
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,10 @@ abstract class Multistream(

override fun nodeId(): Byte = 0

override fun updateLowerBound(lowerBound: Long, type: LowerBoundType) {
// NOOP
}

fun printStatus() {
var height: Long? = null
try {
Expand Down
2 changes: 2 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.emeraldpay.dshackle.reader.ChainReader
import io.emeraldpay.dshackle.startup.UpstreamChangeEvent
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import reactor.core.publisher.Flux

interface Upstream : Lifecycle {
Expand All @@ -47,6 +48,7 @@ interface Upstream : Lifecycle {
fun isGrpc(): Boolean
fun getLowerBounds(): Collection<LowerBoundData>
fun getUpstreamSettingsData(): UpstreamSettingsData?
fun updateLowerBound(lowerBound: Long, type: LowerBoundType)

fun <T : Upstream> cast(selfType: Class<T>): T

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ class BeaconChainLowerBoundStateDetector : LowerBoundDetector() {
override fun internalDetectLowerBound(): Flux<LowerBoundData> {
return Flux.just(LowerBoundData(1, LowerBoundType.STATE))
}

override fun types(): Set<LowerBoundType> {
return setOf(LowerBoundType.STATE)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.emeraldpay.dshackle.upstream.error

import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundStateDetector.Companion.stateErrors
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import org.slf4j.LoggerFactory

object EthereumStateLowerBoundErrorHandler : ErrorHandler {
private val log = LoggerFactory.getLogger(this::class.java)

private val firstTagIndexMethods = setOf(
"eth_call",
"debug_traceCall",
"eth_getBalance",
"eth_estimateGas",
"eth_getCode",
"eth_getTransactionCount",
)
private val secondTagIndexMethods = setOf(
"eth_getProof",
"eth_getStorageAt",
)

private val applicableMethods = firstTagIndexMethods + secondTagIndexMethods

override fun handle(upstream: Upstream, request: ChainRequest, errorMessage: String?) {
try {
if (canHandle(request, errorMessage)) {
parseTagParam(request, tagIndex(request.method))?.let {
upstream.updateLowerBound(it, LowerBoundType.STATE)
}
}
} catch (e: RuntimeException) {
log.warn("Couldn't update the {} lower bound of {}, reason - {}", LowerBoundType.STATE, upstream.getId(), e.message)
}
}

override fun canHandle(request: ChainRequest, errorMessage: String?): Boolean {
return stateErrors.any { errorMessage?.contains(it) ?: false } && applicableMethods.contains(request.method)
}

private fun parseTagParam(request: ChainRequest, tagIndex: Int): Long? {
if (tagIndex != -1 && request.params is ListParams) {
val params = request.params.list
if (params.size >= tagIndex) {
val tag = params[tagIndex]
if (tag is String && tag.startsWith("0x")) {
return tag.substring(2).toLong(16)
}
}
}
return null
}

private fun tagIndex(method: String): Int {
return if (firstTagIndexMethods.contains(method)) {
1
} else if (secondTagIndexMethods.contains(method)) {
2
} else {
-1
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.emeraldpay.dshackle.upstream.error

import io.emeraldpay.dshackle.config.context.SchedulersConfig
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.Upstream
import org.springframework.scheduling.concurrent.CustomizableThreadFactory
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors

interface ErrorHandler {
fun handle(upstream: Upstream, request: ChainRequest, errorMessage: String?)

fun canHandle(request: ChainRequest, errorMessage: String?): Boolean
}

object UpstreamErrorHandler {
private val errorHandlers = listOf(
EthereumStateLowerBoundErrorHandler,
)
private val errorHandlerExecutor = Executors.newFixedThreadPool(
2 * SchedulersConfig.threadsMultiplier,
CustomizableThreadFactory("error-handler-"),
)

fun handle(upstream: Upstream, request: ChainRequest, errorMessage: String?) {
CompletableFuture.runAsync(
{
errorHandlers
.filter { it.canHandle(request, errorMessage) }
.forEach { it.handle(upstream, request, errorMessage) }
},
errorHandlerExecutor,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import reactor.core.publisher.Mono
class EthereumLowerBoundBlockDetector(
private val upstream: Upstream,
) : LowerBoundDetector() {
private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.BLOCK, setOf("No block data"))
private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.BLOCK, setOf("No block data"), lowerBounds)

override fun period(): Long {
return 3
Expand All @@ -41,4 +41,8 @@ class EthereumLowerBoundBlockDetector(
}
}
}

override fun types(): Set<LowerBoundType> {
return setOf(LowerBoundType.BLOCK)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import reactor.core.publisher.Mono
class EthereumLowerBoundStateDetector(
private val upstream: Upstream,
) : LowerBoundDetector() {
private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.STATE, nonRetryableErrors)
private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.STATE, stateErrors, lowerBounds)

companion object {
private val nonRetryableErrors = setOf(
val stateErrors = setOf(
"No state available for block", // nethermind
"missing trie node", // geth
"header not found", // optimism, bsc, avalanche
Expand Down Expand Up @@ -62,4 +62,8 @@ class EthereumLowerBoundStateDetector(
}
}
}

override fun types(): Set<LowerBoundType> {
return setOf(LowerBoundType.STATE)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.emeraldpay.dshackle.upstream.generic.connectors.ConnectorFactory
import io.emeraldpay.dshackle.upstream.generic.connectors.GenericConnector
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundServiceBuilder
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import org.springframework.context.Lifecycle
import reactor.core.Disposable
import reactor.core.publisher.Flux
Expand Down Expand Up @@ -245,6 +246,10 @@ open class GenericUpstream(

override fun isRunning() = connector.isRunning() || started.get()

override fun updateLowerBound(lowerBound: Long, type: LowerBoundType) {
lowerBoundService.updateLowerBound(lowerBound, type)
}

fun isValid(): Boolean = isUpstreamValid.get()

private fun sendUpstreamStateEvent(eventType: UpstreamChangeEvent.ChangeType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream.lowerbound
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
Expand All @@ -12,23 +13,27 @@ fun Long.toHex() = "0x${this.toString(16)}"
abstract class LowerBoundDetector {
protected val log = LoggerFactory.getLogger(this::class.java)

private val lowerBounds = ConcurrentHashMap<LowerBoundType, LowerBoundData>()
protected val lowerBounds = ConcurrentHashMap<LowerBoundType, LowerBoundData>()
private val lowerBoundSink = Sinks.many().multicast().directBestEffort<LowerBoundData>()

fun detectLowerBound(): Flux<LowerBoundData> {
val notProcessing = AtomicBoolean(true)

return Flux.interval(
Duration.ofSeconds(15),
Duration.ofMinutes(period()),
return Flux.merge(
lowerBoundSink.asFlux(),
Flux.interval(
Duration.ofSeconds(15),
Duration.ofMinutes(period()),
)
.filter { notProcessing.get() }
.flatMap {
notProcessing.set(false)
internalDetectLowerBound()
.onErrorResume { Mono.just(LowerBoundData.default()) }
.switchIfEmpty(Flux.just(LowerBoundData.default()))
.doFinally { notProcessing.set(true) }
},
)
.filter { notProcessing.get() }
.flatMap {
notProcessing.set(false)
internalDetectLowerBound()
.onErrorResume { Mono.just(LowerBoundData.default()) }
.switchIfEmpty(Flux.just(LowerBoundData.default()))
.doFinally { notProcessing.set(true) }
}
.filter {
it.lowerBound >= (lowerBounds[it.type]?.lowerBound ?: 0)
}
Expand All @@ -42,4 +47,10 @@ abstract class LowerBoundDetector {
protected abstract fun period(): Long

protected abstract fun internalDetectLowerBound(): Flux<LowerBoundData>

abstract fun types(): Set<LowerBoundType>

fun updateLowerBound(lowerBound: Long, type: LowerBoundType) {
lowerBoundSink.emitNext(LowerBoundData(lowerBound, type)) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,24 @@ abstract class LowerBoundService(
private val log = LoggerFactory.getLogger(this::class.java)

private val lowerBounds = ConcurrentHashMap<LowerBoundType, LowerBoundData>()
private val detectors: List<LowerBoundDetector> by lazy { detectors() }

fun detectLowerBounds(): Flux<LowerBoundData> {
return Flux.merge(
detectors().map { it.detectLowerBound() },
detectors.map { it.detectLowerBound() },
)
.doOnNext {
log.info("Lower bound of type ${it.type} is ${it.lowerBound} for upstream ${upstream.getId()} of chain $chain")
lowerBounds[it.type] = it
}
}

fun updateLowerBound(lowerBound: Long, type: LowerBoundType) {
detectors
.filter { it.types().contains(type) }
.forEach { it.updateLowerBound(lowerBound, type) }
}

fun getLowerBounds(): Collection<LowerBoundData> = lowerBounds.values

protected abstract fun detectors(): List<LowerBoundDetector>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class RecursiveLowerBound(
private val upstream: Upstream,
private val type: LowerBoundType,
private val nonRetryableErrors: Set<String>,
private val lowerBounds: Map<LowerBoundType, LowerBoundData>,
) {
private val log = LoggerFactory.getLogger(this::class.java)

Expand All @@ -25,8 +26,11 @@ class RecursiveLowerBound(
val currentHeight = it.getCurrentHeight()
if (currentHeight == null) {
Mono.empty()
} else {
} else if (!lowerBounds.contains(type)) {
Mono.just(LowerBoundBinarySearch(0, currentHeight))
} else {
// next calculations will be carried out only within the last range
Mono.just(LowerBoundBinarySearch(lowerBounds[type]!!.lowerBound, currentHeight))
}
}
.expand { data ->
Expand Down
Loading

0 comments on commit 75d6fb3

Please sign in to comment.