From e184bb3547698d6910566caadcbcaae1287ea36f Mon Sep 17 00:00:00 2001 From: KirillPamPam Date: Thu, 22 Aug 2024 19:52:57 +0400 Subject: [PATCH] Predict lower height and use lower height matcher (#552) --- .../kotlin/chainsconfig.codegen.gradle.kts | 19 +- gradle/libs.versions.toml | 3 +- .../dshackle/upstream/DefaultUpstream.kt | 4 + .../dshackle/upstream/MatchesResponse.kt | 15 ++ .../dshackle/upstream/Multistream.kt | 4 + .../emeraldpay/dshackle/upstream/Selector.kt | 41 +++- .../emeraldpay/dshackle/upstream/Upstream.kt | 1 + .../BeaconChainLowerBoundService.kt | 4 +- .../BeaconChainLowerBoundStateDetector.kt | 5 +- .../cosmos/CosmosLowerBoundService.kt | 2 +- .../EthereumLowerBoundBlockDetector.kt | 2 +- .../EthereumLowerBoundLogsDetector.kt | 2 +- .../EthereumLowerBoundStateDetector.kt | 2 +- .../ethereum/EthereumLowerBoundTxDetector.kt | 2 +- .../upstream/generic/GenericUpstream.kt | 4 + .../upstream/lowerbound/LowerBoundDetector.kt | 16 +- .../upstream/lowerbound/LowerBoundService.kt | 7 + .../upstream/lowerbound/LowerBounds.kt | 124 ++++++++++ .../detector/RecursiveLowerBound.kt | 17 +- .../near/NearLowerBoundStateDetector.kt | 2 +- .../PolkadotLowerBoundStateDetector.kt | 2 +- .../solana/SolanaLowerBoundSlotDetector.kt | 2 +- .../starknet/StarknetLowerBoundService.kt | 4 +- .../StarknetLowerBoundStateDetector.kt | 5 +- .../dshackle/upstream/SelectorTest.kt | 34 ++- .../lowerbound/LowerBoundServiceTest.kt | 55 +++++ .../lowerbound/LowerBoundsPredictionTest.kt | 227 ++++++++++++++++++ .../RecursiveLowerBoundServiceTest.kt | 13 +- .../solana/SolanaLowerBoundServiceTest.kt | 1 + .../StarknetLowerBoundStateDetectorTest.kt | 3 +- 30 files changed, 582 insertions(+), 40 deletions(-) create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBounds.kt create mode 100644 src/test/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundServiceTest.kt create mode 100644 src/test/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundsPredictionTest.kt rename src/test/kotlin/io/emeraldpay/dshackle/upstream/{ => lowerbound}/RecursiveLowerBoundServiceTest.kt (95%) diff --git a/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts b/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts index cf8d318b9..57c0c5168 100644 --- a/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts +++ b/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts @@ -11,6 +11,8 @@ import io.emeraldpay.dshackle.config.ChainsConfig import io.emeraldpay.dshackle.config.ChainsConfigReader import io.emeraldpay.dshackle.foundation.ChainOptionsReader import java.math.BigInteger +import java.time.Duration +import kotlin.math.ceil open class CodeGen(private val config: ChainsConfig) { companion object { @@ -25,7 +27,7 @@ open class CodeGen(private val config: ChainsConfig) { builder.addEnumConstant( "UNSPECIFIED", TypeSpec.anonymousClassBuilder() - .addSuperclassConstructorParameter("%L, %S, %S, %S, %L, %L, %L", 0, "UNSPECIFIED", "Unknown", "0x0", "BigInteger.ZERO", "emptyList()", "BlockchainType.UNKNOWN") + .addSuperclassConstructorParameter("%L, %S, %S, %S, %L, %L, %L, %L", 0, "UNSPECIFIED", "Unknown", "0x0", "BigInteger.ZERO", "emptyList()", "BlockchainType.UNKNOWN", 0.0) .build(), ) for (chain in config) { @@ -34,14 +36,15 @@ open class CodeGen(private val config: ChainsConfig) { .replace(' ', '_'), TypeSpec.anonymousClassBuilder() .addSuperclassConstructorParameter( - "%L, %S, %S, %S, %L, %L, %L", + "%L, %S, %S, %S, %L, %L, %L, %L", chain.grpcId, chain.code, chain.blockchain.replaceFirstChar { it.uppercase() } + " " + chain.id.replaceFirstChar { it.uppercase() }, chain.chainId, "BigInteger(\"" + chain.netVersion + "\")", "listOf(" + chain.shortNames.map { "\"${it}\"" }.joinToString() + ")", - type(chain.type) + type(chain.type), + averageRemoveSpeed(chain.expectedBlockTime), ) .build(), ) @@ -74,6 +77,7 @@ open class CodeGen(private val config: ChainsConfig) { .addParameter("netVersion", BigInteger::class) .addParameter("shortNames", List::class.asClassName().parameterizedBy(String::class.asClassName())) .addParameter("type", BlockchainType::class) + .addParameter("averageRemoveDataSpeed", Double::class.java) .build(), ) .addProperty( @@ -111,6 +115,11 @@ open class CodeGen(private val config: ChainsConfig) { .initializer("type") .build(), ) + .addProperty( + PropertySpec.builder("averageRemoveDataSpeed", Double::class) + .initializer("averageRemoveDataSpeed") + .build(), + ) ).build() return FileSpec.builder("io.emeraldpay.dshackle", "Chain") .addType(chainType) @@ -130,6 +139,10 @@ open class CodeGen(private val config: ChainsConfig) { else -> throw IllegalArgumentException("unknown blockchain type $type") } } + + private fun averageRemoveSpeed(expectedBlockTime: Duration): Double { + return ceil(1000.0/expectedBlockTime.toMillis()*100) / 100 + } } open class ChainsCodeGenTask : DefaultTask() { diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index d7fe359f3..5727b18db 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -16,6 +16,7 @@ httpcomponents = "4.5.8" [libraries] apache-commons-lang3 = "org.apache.commons:commons-lang3:3.9" apache-commons-collections4 = "org.apache.commons:commons-collections4:4.3" +apache-commons-math3 = "org.apache.commons:commons-math3:3.6.1" bitcoinj = "org.bitcoinj:bitcoinj-core:0.15.8" @@ -122,7 +123,7 @@ auth0-jwt = "com.auth0:java-jwt:4.4.0" mockito-inline = "org.mockito:mockito-inline:4.0.0" [bundles] -apache-commons = ["commons-io", "apache-commons-lang3", "apache-commons-collections4"] +apache-commons = ["commons-io", "apache-commons-lang3", "apache-commons-collections4", "apache-commons-math3"] grpc = ["grpc-protobuf", "grpc-stub", "grpc-netty", "grpc-proto-util", "grpc-services"] httpcomponents = ["httpcomponents-httpmime", "httpcomponents-httpclient"] jackson = ["jackson-core", "jackson-databind", "jackson-datatype-jdk8", "jackson-datatype-jsr310", "jackson-module-kotlin", "jackson-yaml"] diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt index b5d6b896e..eb350888b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt @@ -168,6 +168,10 @@ abstract class DefaultUpstream( // NOOP } + override fun predictLowerBound(type: LowerBoundType): Long { + return 0 + } + protected fun sendUpstreamStateEvent(eventType: UpstreamChangeEvent.ChangeType) { stateEventStream.emitNext( UpstreamChangeEvent(chain, this, eventType), diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/MatchesResponse.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/MatchesResponse.kt index 302534333..f8026ce08 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/MatchesResponse.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/MatchesResponse.kt @@ -1,5 +1,7 @@ package io.emeraldpay.dshackle.upstream +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType + sealed class MatchesResponse { fun matched(): Boolean { @@ -25,6 +27,13 @@ sealed class MatchesResponse { .joinToString("; ") { it.getCause()!! } is NotMatchedResponse -> "Not matched - ${response.getCause()}" is SameNodeResponse -> "Upstream does not have hash ${this.upstreamHash}" + is LowerHeightResponse -> { + if (this.predictedHeight == 0L) { + "Upstream lower height of type ${this.boundType} cannot be predicted" + } else { + "Upstream lower height ${this.predictedHeight} of type ${this.boundType} is greater than ${this.lowerHeight}" + } + } else -> null } @@ -71,6 +80,12 @@ sealed class MatchesResponse { object GrpcResponse : MatchesResponse() + data class LowerHeightResponse( + val lowerHeight: Long, + val predictedHeight: Long, + val boundType: LowerBoundType, + ) : MatchesResponse() + data class HeightResponse( val height: Long, val currentHeight: Long, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index 56208c2d9..27113e268 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -257,6 +257,10 @@ abstract class Multistream( return getAll().any { it.isAvailable() } } + override fun predictLowerBound(type: LowerBoundType): Long { + return 0 + } + override fun getStatus(): UpstreamAvailability { return state.getStatus() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt index fa47b83ba..9ae865394 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt @@ -23,11 +23,13 @@ import io.emeraldpay.dshackle.upstream.MatchesResponse.CapabilityResponse import io.emeraldpay.dshackle.upstream.MatchesResponse.ExistsResponse import io.emeraldpay.dshackle.upstream.MatchesResponse.GrpcResponse import io.emeraldpay.dshackle.upstream.MatchesResponse.HeightResponse +import io.emeraldpay.dshackle.upstream.MatchesResponse.LowerHeightResponse import io.emeraldpay.dshackle.upstream.MatchesResponse.NotMatchedResponse import io.emeraldpay.dshackle.upstream.MatchesResponse.SameNodeResponse import io.emeraldpay.dshackle.upstream.MatchesResponse.SlotHeightResponse import io.emeraldpay.dshackle.upstream.MatchesResponse.Success import io.emeraldpay.dshackle.upstream.finalization.FinalizationType +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType import io.emeraldpay.dshackle.upstream.lowerbound.fromProtoType import org.apache.commons.lang3.StringUtils import java.util.Collections @@ -101,6 +103,16 @@ class Selector { else -> empty } } + it.hasLowerHeightSelector() -> { + if (it.lowerHeightSelector.height > 0) { + LowerHeightMatcher( + it.lowerHeightSelector.height, + it.lowerHeightSelector.lowerBoundType.fromProtoType(), + ) + } else { + empty + } + } else -> empty } }.run { @@ -112,8 +124,11 @@ class Selector { private fun getSort(selectors: List): Sort { selectors.forEach { selector -> if (selector.hasHeightSelector()) { - return HeightNumberOrTag.fromHeightSelector(selector.heightSelector)?.getSort() ?: Sort.default - } else if (selector.hasLowerHeightSelector()) { + val heightSort = HeightNumberOrTag.fromHeightSelector(selector.heightSelector)?.getSort() ?: Sort.default + if (heightSort != Sort.default) { + return heightSort + } + } else if (selector.hasLowerHeightSelector() && selector.lowerHeightSelector.height == 0L) { return Sort( compareBy(nullsLast()) { it.getLowerBound(selector.lowerHeightSelector.lowerBoundType.fromProtoType())?.lowerBound @@ -546,6 +561,28 @@ class Selector { } } + data class LowerHeightMatcher( + private val lowerHeight: Long, + private val boundType: LowerBoundType, + ) : Matcher() { + override fun matchesWithCause(up: Upstream): MatchesResponse { + val predictedLowerBound = up.predictLowerBound(boundType) + return if (lowerHeight >= predictedLowerBound && predictedLowerBound != 0L) { + Success + } else { + LowerHeightResponse(lowerHeight, predictedLowerBound, boundType) + } + } + + override fun describeInternal(): String { + return "lower height $lowerHeight" + } + + override fun toString(): String { + return "Matcher: ${describeInternal()}" + } + } + class HeightMatcher(val height: Long) : Matcher() { override fun matchesWithCause(up: Upstream): MatchesResponse { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt index 396d41c34..8457b60e7 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt @@ -55,6 +55,7 @@ interface Upstream : Lifecycle { fun addFinalization(finalization: FinalizationData, upstreamId: String) fun getUpstreamSettingsData(): UpstreamSettingsData? fun updateLowerBound(lowerBound: Long, type: LowerBoundType) + fun predictLowerBound(type: LowerBoundType): Long fun getChain(): Chain diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainLowerBoundService.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainLowerBoundService.kt index 910542b43..adbf24fc0 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainLowerBoundService.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainLowerBoundService.kt @@ -6,10 +6,10 @@ import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService class BeaconChainLowerBoundService( - chain: Chain, + private val chain: Chain, upstream: Upstream, ) : LowerBoundService(chain, upstream) { override fun detectors(): List { - return listOf(BeaconChainLowerBoundStateDetector()) + return listOf(BeaconChainLowerBoundStateDetector(chain)) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainLowerBoundStateDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainLowerBoundStateDetector.kt index 4581b7993..da2e84b59 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainLowerBoundStateDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/beaconchain/BeaconChainLowerBoundStateDetector.kt @@ -1,11 +1,14 @@ package io.emeraldpay.dshackle.upstream.beaconchain +import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType import reactor.core.publisher.Flux -class BeaconChainLowerBoundStateDetector : LowerBoundDetector() { +class BeaconChainLowerBoundStateDetector( + private val chain: Chain, +) : LowerBoundDetector(chain) { override fun period(): Long { return 120 diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosLowerBoundService.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosLowerBoundService.kt index 7f49e5d55..a5cdc691c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosLowerBoundService.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/cosmos/CosmosLowerBoundService.kt @@ -24,7 +24,7 @@ class CosmosLowerBoundService( class CosmosLowerBoundStateDetector( private val upstream: Upstream, -) : LowerBoundDetector() { +) : LowerBoundDetector(upstream.getChain()) { override fun period(): Long { return 3 diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt index c703a63a1..ea37df1a7 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundBlockDetector.kt @@ -16,7 +16,7 @@ import reactor.core.publisher.Mono class EthereumLowerBoundBlockDetector( private val upstream: Upstream, -) : LowerBoundDetector() { +) : LowerBoundDetector(upstream.getChain()) { companion object { private const val NO_BLOCK_DATA = "No block data" diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundLogsDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundLogsDetector.kt index 4b2a1308d..ed875bd2c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundLogsDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundLogsDetector.kt @@ -12,7 +12,7 @@ import reactor.core.publisher.Flux class EthereumLowerBoundLogsDetector( private val upstream: Upstream, -) : LowerBoundDetector() { +) : LowerBoundDetector(upstream.getChain()) { companion object { const val MAX_OFFSET = 20 diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt index a49ddd930..7fb6b985d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundStateDetector.kt @@ -15,7 +15,7 @@ import reactor.core.publisher.Mono class EthereumLowerBoundStateDetector( private val upstream: Upstream, -) : LowerBoundDetector() { +) : LowerBoundDetector(upstream.getChain()) { private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.STATE, stateErrors, lowerBounds) companion object { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt index ffa2329fc..d86c74328 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLowerBoundTxDetector.kt @@ -13,7 +13,7 @@ import reactor.core.publisher.Flux class EthereumLowerBoundTxDetector( private val upstream: Upstream, -) : LowerBoundDetector() { +) : LowerBoundDetector(upstream.getChain()) { companion object { const val MAX_OFFSET = 20 diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 340d6a0db..4d57763c1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -354,5 +354,9 @@ open class GenericUpstream( lowerBoundService.updateLowerBound(lowerBound, type) } + override fun predictLowerBound(type: LowerBoundType): Long { + return lowerBoundService.predictLowerBound(type) + } + fun isValid(): Boolean = isUpstreamValid.get() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundDetector.kt index e7cad448d..3409b6eba 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundDetector.kt @@ -1,19 +1,21 @@ package io.emeraldpay.dshackle.upstream.lowerbound +import io.emeraldpay.dshackle.Chain import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.Sinks import java.time.Duration -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean fun Long.toHex() = "0x${this.toString(16)}" -abstract class LowerBoundDetector { +abstract class LowerBoundDetector( + chain: Chain, +) { protected val log = LoggerFactory.getLogger(this::class.java) - protected val lowerBounds = ConcurrentHashMap() + protected val lowerBounds = LowerBounds(chain) private val lowerBoundSink = Sinks.many().multicast().directBestEffort() fun detectLowerBound(): Flux { @@ -35,10 +37,10 @@ abstract class LowerBoundDetector { }, ) .filter { - it.lowerBound >= (lowerBounds[it.type]?.lowerBound ?: 0) + it.lowerBound >= (lowerBounds.getLastBound(it.type)?.lowerBound ?: 0) } .map { - lowerBounds[it.type] = it + lowerBounds.updateBound(it) it } } @@ -53,4 +55,8 @@ abstract class LowerBoundDetector { fun updateLowerBound(lowerBound: Long, type: LowerBoundType) { lowerBoundSink.emitNext(LowerBoundData(lowerBound, type)) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } } + + fun predictLowerBound(type: LowerBoundType): Long { + return lowerBounds.predictNextBound(type) + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundService.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundService.kt index d23f81af3..945d99189 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundService.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundService.kt @@ -33,6 +33,13 @@ abstract class LowerBoundService( .forEach { it.updateLowerBound(lowerBound, type) } } + fun predictLowerBound(type: LowerBoundType): Long { + return detectors + .firstOrNull { it.types().contains(type) } + ?.predictLowerBound(type) + ?: 0 + } + fun getLowerBounds(): Collection = lowerBounds.values fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? = lowerBounds[lowerBoundType] diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBounds.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBounds.kt new file mode 100644 index 000000000..307360674 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBounds.kt @@ -0,0 +1,124 @@ +package io.emeraldpay.dshackle.upstream.lowerbound + +import com.google.common.util.concurrent.AtomicDouble +import io.emeraldpay.dshackle.Chain +import org.apache.commons.math3.stat.regression.SimpleRegression +import java.time.Instant +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentLinkedDeque +import kotlin.math.roundToLong + +class LowerBounds( + chain: Chain, +) { + companion object { + private const val MAX_BOUNDS = 3 + } + + private val averageSpeed = chain.averageRemoveDataSpeed + + private val lowerBounds = ConcurrentHashMap() + + fun updateBound(newBound: LowerBoundData) { + if (lowerBounds.containsKey(newBound.type)) { + val lowerBoundCoeffs = lowerBounds[newBound.type]!! + + // we add only bounds with different timestamps + if (newBound.timestamp != lowerBoundCoeffs.getLastBound().timestamp) { + if (newBound.lowerBound == 1L) { + // this is the fully archival node, so there is no need to accumulate bounds and calculate the coeffs + lowerBoundCoeffs.updateCoeffs(0.0, 1.0) + lowerBoundCoeffs.clearBounds() + lowerBoundCoeffs.addBound(newBound) + } else { + // accumulate up to MAX_BOUNDS and preserve this size + if (lowerBoundCoeffs.boundsSize() == MAX_BOUNDS) { + lowerBoundCoeffs.removeFirst() + } + lowerBoundCoeffs.addBound(newBound) + if (lowerBoundCoeffs.boundsSize() < MAX_BOUNDS) { + // calculate coeffs based on the average speed until we accumulate al least MAX_BOUNDS bounds + lowerBoundCoeffs.updateCoeffs(averageSpeed, calculateB(newBound)) + } else { + // having MAX_BOUNDS bounds we can use linear regression + lowerBoundCoeffs.train() + } + } + } + } else { + // add new bound if it hasn't existed yet + lowerBounds[newBound.type] = LowerBoundCoeffs() + .apply { + addBound(newBound) + if (newBound.lowerBound == 1L) { + // this is the fully archival node + updateCoeffs(0.0, 1.0) + } else { + // otherwise we calculate the coeffs based on the average speed + updateCoeffs(averageSpeed, calculateB(newBound)) + } + } + } + } + + fun predictNextBound(type: LowerBoundType): Long { + val lowerBoundCoeffs = lowerBounds[type] ?: return 0 + + val xTime = Instant.now().epochSecond + + return (lowerBoundCoeffs.k.get() * xTime + lowerBoundCoeffs.b.get()).roundToLong() + } + + fun getLastBound(type: LowerBoundType): LowerBoundData? { + return lowerBounds[type]?.getLastBound() + } + + fun getAllBounds(type: LowerBoundType): List { + return lowerBounds[type]?.lowerBounds?.toList() ?: emptyList() + } + + private fun calculateB(bound: LowerBoundData): Double { + return bound.lowerBound.toDouble() - (averageSpeed * bound.timestamp) + } + + // to predict the next lower bound we use linear regression, y = kx + b, + // where x - current time, y - the predicted bound, k and b - coefficients + // to achieve that we accumulate up to max bounds (3 by default) and then calculate the coefficients using the regression lib + // having these coeffs we can predict the next bound in the predictNextBound() method + private class LowerBoundCoeffs { + val lowerBounds = ConcurrentLinkedDeque() + val k = AtomicDouble() + val b = AtomicDouble() + + fun addBound(bound: LowerBoundData) { + lowerBounds.add(bound) + } + + fun updateCoeffs(newK: Double, newB: Double) { + k.set(newK) + b.set(newB) + } + + fun clearBounds() { + lowerBounds.clear() + } + + fun removeFirst() { + lowerBounds.removeFirst() + } + + fun boundsSize(): Int = lowerBounds.size + + fun getLastBound(): LowerBoundData = lowerBounds.last + + fun train() { + val regression = SimpleRegression() + + lowerBounds.forEach { + regression.addObservation(doubleArrayOf(it.timestamp.toDouble()), it.lowerBound.toDouble()) + } + + updateCoeffs(regression.slope, regression.intercept) + } + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt index 611dd996f..dce37336f 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/detector/RecursiveLowerBound.kt @@ -4,6 +4,7 @@ import io.emeraldpay.dshackle.upstream.ChainResponse import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType +import io.emeraldpay.dshackle.upstream.lowerbound.LowerBounds import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -17,7 +18,7 @@ class RecursiveLowerBound( private val upstream: Upstream, private val type: LowerBoundType, private val nonRetryableErrors: Set, - private val lowerBounds: Map, + private val lowerBounds: LowerBounds, ) { private val log = LoggerFactory.getLogger(this::class.java) @@ -56,13 +57,13 @@ class RecursiveLowerBound( fun recursiveDetectLowerBoundWithOffset(maxLimit: Int, hasData: (Long) -> Mono): Flux { val visitedBlocks = HashSet() - return Mono.justOrEmpty(lowerBounds[type]?.lowerBound) - .flatMapMany { + return Mono.justOrEmpty(lowerBounds.getLastBound(type)?.lowerBound) + .flatMapMany { bound -> // at first, we try to check the current bound to prevent huge calculations - hasData(it!!) - .retryWhen(retrySpec(it, nonRetryableErrors)) + hasData(bound!!) + .retryWhen(retrySpec(bound, nonRetryableErrors)) .flatMap(ChainResponse::requireResult) - .map { LowerBoundData(lowerBounds[type]!!.lowerBound, type) } + .map { LowerBoundData(bound, type) } .onErrorResume { Mono.empty() } }.switchIfEmpty( initialRange() @@ -150,11 +151,11 @@ class RecursiveLowerBound( val currentHeight = it.getCurrentHeight() if (currentHeight == null) { Mono.empty() - } else if (!lowerBounds.contains(type)) { + } else if (lowerBounds.getLastBound(type) == null) { Mono.just(LowerBoundBinarySearchData(0, currentHeight)) } else { // next calculations will be carried out only within the last range - Mono.just(LowerBoundBinarySearchData(lowerBounds[type]!!.lowerBound, currentHeight)) + Mono.just(LowerBoundBinarySearchData(lowerBounds.getLastBound(type)!!.lowerBound, currentHeight)) } } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearLowerBoundStateDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearLowerBoundStateDetector.kt index dda819749..e3c9e12fd 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearLowerBoundStateDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/near/NearLowerBoundStateDetector.kt @@ -11,7 +11,7 @@ import reactor.core.publisher.Flux class NearLowerBoundStateDetector( private val upstream: Upstream, -) : LowerBoundDetector() { +) : LowerBoundDetector(upstream.getChain()) { override fun period(): Long { return 3 diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotLowerBoundStateDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotLowerBoundStateDetector.kt index 2b8c79f3a..2192953ae 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotLowerBoundStateDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotLowerBoundStateDetector.kt @@ -13,7 +13,7 @@ import reactor.core.publisher.Flux class PolkadotLowerBoundStateDetector( private val upstream: Upstream, -) : LowerBoundDetector() { +) : LowerBoundDetector(upstream.getChain()) { private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.STATE, nonRetryableErrors, lowerBounds) companion object { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundSlotDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundSlotDetector.kt index 8597d9f3d..cfa679ff6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundSlotDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundSlotDetector.kt @@ -16,7 +16,7 @@ import kotlin.math.max class SolanaLowerBoundSlotDetector( private val upstream: Upstream, -) : LowerBoundDetector() { +) : LowerBoundDetector(upstream.getChain()) { private val reader = upstream.getIngressReader() override fun period(): Long { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundService.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundService.kt index e228eb22e..d134876ca 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundService.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundService.kt @@ -6,10 +6,10 @@ import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService class StarknetLowerBoundService( - chain: Chain, + private val chain: Chain, upstream: Upstream, ) : LowerBoundService(chain, upstream) { override fun detectors(): List { - return listOf(StarknetLowerBoundStateDetector()) + return listOf(StarknetLowerBoundStateDetector(chain)) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundStateDetector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundStateDetector.kt index 2cd10ca94..89cd2f117 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundStateDetector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundStateDetector.kt @@ -1,11 +1,14 @@ package io.emeraldpay.dshackle.upstream.starknet +import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType import reactor.core.publisher.Flux -class StarknetLowerBoundStateDetector : LowerBoundDetector() { +class StarknetLowerBoundStateDetector( + chain: Chain, +) : LowerBoundDetector(chain) { override fun period(): Long { return 120 diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/SelectorTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/SelectorTest.kt index a4edb3bc7..f110cad9f 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/SelectorTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/SelectorTest.kt @@ -195,6 +195,7 @@ class SelectorTest { BlockchainOuterClass.Selector.newBuilder() .setLowerHeightSelector( BlockchainOuterClass.LowerHeightSelector.newBuilder() + .setHeight(100050003) .setLowerBoundType(BlockchainOuterClass.LowerBoundType.LOWER_BOUND_BLOCK) .build(), ) @@ -204,14 +205,45 @@ class SelectorTest { val upstreamFilter = Selector.convertToUpstreamFilter(requestSelectors) val actual = ups.sortedWith(upstreamFilter.sort.comparator) + val actualMatcher = Selector.MultiMatcher(listOf(Selector.LowerHeightMatcher(100050003, LowerBoundType.BLOCK))) assertEquals( - listOf(up2, up3, up1), + upstreamFilter.matcher, + actualMatcher, + ) + assertEquals( + listOf(up1, up3, up2), actual, ) } + @ParameterizedTest + @MethodSource("lowerHeightData") + fun `test lower height matcher`( + lowerHeight: Long, + predicted: Long, + expected: MatchesResponse, + ) { + val up = mock { + on { predictLowerBound(LowerBoundType.STATE) } doReturn predicted + } + val matcher = Selector.LowerHeightMatcher(lowerHeight, LowerBoundType.STATE) + + val actualResponse = matcher.matchesWithCause(up) + + assertEquals(expected, actualResponse) + } + companion object { + @JvmStatic + fun lowerHeightData(): List = + listOf( + of(10000, 400, MatchesResponse.Success), + of(10000, 50000, MatchesResponse.LowerHeightResponse(10000, 50000, LowerBoundType.STATE)), + of(5000, 5000, MatchesResponse.Success), + of(3000, 0, MatchesResponse.LowerHeightResponse(3000, 0, LowerBoundType.STATE)), + ) + @JvmStatic fun data(): List = listOf( diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundServiceTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundServiceTest.kt new file mode 100644 index 000000000..f5d166bc9 --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundServiceTest.kt @@ -0,0 +1,55 @@ +package io.emeraldpay.dshackle.upstream.lowerbound + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.upstream.Upstream +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test +import org.mockito.kotlin.any +import org.mockito.kotlin.doReturn +import org.mockito.kotlin.mock +import org.mockito.kotlin.never +import org.mockito.kotlin.verify + +class LowerBoundServiceTest { + + @Test + fun `predict lower bound`() { + val detector = mock { + on { predictLowerBound(LowerBoundType.STATE) } doReturn 4000 + on { types() } doReturn setOf(LowerBoundType.STATE) + } + val boundService = LowerBoundServiceMock(mock(), listOf(detector)) + + val bound = boundService.predictLowerBound(LowerBoundType.STATE) + + verify(detector).types() + verify(detector).predictLowerBound(LowerBoundType.STATE) + + assertThat(bound).isEqualTo(4000) + } + + @Test + fun `the predicted lower bound is 0 if there is no such bound type`() { + val detector = mock { + on { types() } doReturn setOf(LowerBoundType.STATE) + } + val boundService = LowerBoundServiceMock(mock(), listOf(detector)) + + val bound = boundService.predictLowerBound(LowerBoundType.BLOCK) + + verify(detector).types() + verify(detector, never()).predictLowerBound(any()) + + assertThat(bound).isEqualTo(0) + } + + private class LowerBoundServiceMock( + upstream: Upstream, + private val detectors: List, + ) : LowerBoundService(Chain.ETHEREUM__MAINNET, upstream) { + + override fun detectors(): List { + return detectors + } + } +} diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundsPredictionTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundsPredictionTest.kt new file mode 100644 index 000000000..5015bfa10 --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/LowerBoundsPredictionTest.kt @@ -0,0 +1,227 @@ +package io.emeraldpay.dshackle.upstream.lowerbound + +import io.emeraldpay.dshackle.Chain +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test +import java.time.Instant +import java.time.temporal.ChronoUnit + +class LowerBoundsPredictionTest { + + @Test + fun `first archival lower bound data, get it and predict the next bound`() { + val lowerBounds = LowerBounds(Chain.ETHEREUM__MAINNET) + val newLowerBound = LowerBoundData(1L, 1000, LowerBoundType.STATE) + + lowerBounds.updateBound(newLowerBound) + + val lastBound = lowerBounds.getLastBound(LowerBoundType.STATE) + val predictedNextBound = lowerBounds.predictNextBound(LowerBoundType.STATE) + val allBounds = lowerBounds.getAllBounds(LowerBoundType.STATE) + + assertThat(lastBound).isEqualTo(newLowerBound) + assertThat(predictedNextBound).isEqualTo(1) + assertThat(allBounds).isEqualTo(listOf(newLowerBound)) + } + + @Test + fun `if no bound the default values`() { + val lowerBounds = LowerBounds(Chain.ETHEREUM__MAINNET) + + val lastBound = lowerBounds.getLastBound(LowerBoundType.STATE) + val predictedNextBound = lowerBounds.predictNextBound(LowerBoundType.STATE) + val allBounds = lowerBounds.getAllBounds(LowerBoundType.STATE) + + assertThat(lastBound).isNull() + assertThat(predictedNextBound).isEqualTo(0) + assertThat(allBounds).isEmpty() + } + + @Test + fun `sequential archival lower bound data and get only the last`() { + val lowerBounds = LowerBounds(Chain.ETHEREUM__MAINNET) + val newLowerBound = LowerBoundData(1L, 1000, LowerBoundType.STATE) + val nextNewLowerBound = LowerBoundData(1L, 1005, LowerBoundType.STATE) + + lowerBounds.updateBound(newLowerBound) + + val lastBound = lowerBounds.getLastBound(LowerBoundType.STATE) + val predictedNextBound = lowerBounds.predictNextBound(LowerBoundType.STATE) + val allBounds = lowerBounds.getAllBounds(LowerBoundType.STATE) + + assertThat(lastBound).isEqualTo(newLowerBound) + assertThat(predictedNextBound).isEqualTo(1) + assertThat(allBounds).isEqualTo(listOf(newLowerBound)) + + lowerBounds.updateBound(nextNewLowerBound) + + val newLastBound = lowerBounds.getLastBound(LowerBoundType.STATE) + val newPredictedNextBound = lowerBounds.predictNextBound(LowerBoundType.STATE) + val newAllBounds = lowerBounds.getAllBounds(LowerBoundType.STATE) + + assertThat(newLastBound).isEqualTo(nextNewLowerBound) + assertThat(newPredictedNextBound).isEqualTo(1) + assertThat(newAllBounds).isEqualTo(listOf(nextNewLowerBound)) + } + + @Test + fun `don't update the lower bounds if the same timestamp`() { + val lowerBounds = LowerBounds(Chain.ETHEREUM__MAINNET) + val newLowerBound = LowerBoundData(1L, 1000, LowerBoundType.STATE) + + lowerBounds.updateBound(newLowerBound) + lowerBounds.updateBound(LowerBoundData(100000L, 1000, LowerBoundType.STATE)) + + val lastBound = lowerBounds.getLastBound(LowerBoundType.STATE) + val predictedNextBound = lowerBounds.predictNextBound(LowerBoundType.STATE) + val allBounds = lowerBounds.getAllBounds(LowerBoundType.STATE) + + assertThat(lastBound).isEqualTo(newLowerBound) + assertThat(predictedNextBound).isEqualTo(1) + assertThat(allBounds).isEqualTo(listOf(newLowerBound)) + } + + @Test + fun `always get the last bound`() { + val lowerBounds = LowerBounds(Chain.ETHEREUM__MAINNET) + val lowerBound1 = LowerBoundData(1000L, 1000, LowerBoundType.STATE) + val lowerBound2 = LowerBoundData(1005L, 1005, LowerBoundType.STATE) + val lowerBound3 = LowerBoundData(1010L, 1010, LowerBoundType.STATE) + + lowerBounds.updateBound(lowerBound1) + + assertThat(lowerBounds.getLastBound(LowerBoundType.STATE)).isEqualTo(lowerBound1) + assertThat(lowerBounds.getAllBounds(LowerBoundType.STATE)).isEqualTo(listOf(lowerBound1)) + + lowerBounds.updateBound(lowerBound2) + + assertThat(lowerBounds.getLastBound(LowerBoundType.STATE)).isEqualTo(lowerBound2) + assertThat(lowerBounds.getAllBounds(LowerBoundType.STATE)).isEqualTo(listOf(lowerBound1, lowerBound2)) + + lowerBounds.updateBound(lowerBound3) + + assertThat(lowerBounds.getLastBound(LowerBoundType.STATE)).isEqualTo(lowerBound3) + assertThat(lowerBounds.getAllBounds(LowerBoundType.STATE)).isEqualTo(listOf(lowerBound1, lowerBound2, lowerBound3)) + } + + @Test + fun `preserve the maximum number of bounds`() { + val lowerBounds = LowerBounds(Chain.ETHEREUM__MAINNET) + val lowerBound1 = LowerBoundData(1000L, 1000, LowerBoundType.STATE) + val lowerBound2 = LowerBoundData(1005L, 1005, LowerBoundType.STATE) + val lowerBound3 = LowerBoundData(1010L, 1010, LowerBoundType.STATE) + val lowerBound4 = LowerBoundData(1050L, 1050, LowerBoundType.STATE) + val lowerBound5 = LowerBoundData(1060L, 1060, LowerBoundType.STATE) + + lowerBounds.updateBound(lowerBound1) + lowerBounds.updateBound(lowerBound2) + lowerBounds.updateBound(lowerBound3) + + assertThat(lowerBounds.getLastBound(LowerBoundType.STATE)).isEqualTo(lowerBound3) + assertThat(lowerBounds.getAllBounds(LowerBoundType.STATE)).isEqualTo(listOf(lowerBound1, lowerBound2, lowerBound3)) + + lowerBounds.updateBound(lowerBound4) + + assertThat(lowerBounds.getLastBound(LowerBoundType.STATE)).isEqualTo(lowerBound4) + assertThat(lowerBounds.getAllBounds(LowerBoundType.STATE)).isEqualTo(listOf(lowerBound2, lowerBound3, lowerBound4)) + + lowerBounds.updateBound(lowerBound5) + + assertThat(lowerBounds.getLastBound(LowerBoundType.STATE)).isEqualTo(lowerBound5) + assertThat(lowerBounds.getAllBounds(LowerBoundType.STATE)).isEqualTo(listOf(lowerBound3, lowerBound4, lowerBound5)) + } + + @Test + fun `if get the archival bound then remove previous ones`() { + val lowerBounds = LowerBounds(Chain.ETHEREUM__MAINNET) + val lowerBound1 = LowerBoundData(1000L, 1000, LowerBoundType.STATE) + val lowerBound2 = LowerBoundData(1005L, 1005, LowerBoundType.STATE) + val lowerBound3 = LowerBoundData(1, 1010, LowerBoundType.STATE) + + lowerBounds.updateBound(lowerBound1) + lowerBounds.updateBound(lowerBound2) + + assertThat(lowerBounds.getLastBound(LowerBoundType.STATE)).isEqualTo(lowerBound2) + assertThat(lowerBounds.getAllBounds(LowerBoundType.STATE)).isEqualTo(listOf(lowerBound1, lowerBound2)) + + lowerBounds.updateBound(lowerBound3) + + assertThat(lowerBounds.getLastBound(LowerBoundType.STATE)).isEqualTo(lowerBound3) + assertThat(lowerBounds.getAllBounds(LowerBoundType.STATE)).isEqualTo(listOf(lowerBound3)) + } + + @Test + fun `predict the same bound if all bounds are equal to each other`() { + val lowerBounds = LowerBounds(Chain.ETHEREUM__MAINNET) + val lowerBound1 = LowerBoundData(15060L, 1000, LowerBoundType.STATE) + val lowerBound2 = LowerBoundData(15060L, 2000, LowerBoundType.STATE) + val lowerBound3 = LowerBoundData(15060L, 3000, LowerBoundType.STATE) + + lowerBounds.updateBound(lowerBound1) + lowerBounds.updateBound(lowerBound2) + lowerBounds.updateBound(lowerBound3) + + assertThat(lowerBounds.getLastBound(LowerBoundType.STATE)).isEqualTo(lowerBound3) + assertThat(lowerBounds.predictNextBound(LowerBoundType.STATE)).isEqualTo(15060L) + assertThat(lowerBounds.getAllBounds(LowerBoundType.STATE)).isEqualTo(listOf(lowerBound1, lowerBound2, lowerBound3)) + } + + @Test + fun `predict the next bound based on different bounds`() { + val now = Instant.now() + val lowerBounds = LowerBounds(Chain.BSC__MAINNET) + val lowerBound1 = LowerBoundData(37995846, now.minus(9, ChronoUnit.MINUTES).epochSecond, LowerBoundType.STATE) + val lowerBound2 = LowerBoundData(37995906, now.minus(6, ChronoUnit.MINUTES).epochSecond, LowerBoundType.STATE) + val lowerBound3 = LowerBoundData(37995966, now.minus(3, ChronoUnit.MINUTES).epochSecond, LowerBoundType.STATE) + + lowerBounds.updateBound(lowerBound1) + lowerBounds.updateBound(lowerBound2) + lowerBounds.updateBound(lowerBound3) + + val predicted = lowerBounds.predictNextBound(LowerBoundType.STATE) + + assertThat(predicted) + .isLessThan(37996030) + .isGreaterThan(37996020) + } + + @Test + fun `predict the next bound based on average speed`() { + val now = Instant.now() + val lowerBounds = LowerBounds(Chain.BSC__MAINNET) + val lowerBound1 = LowerBoundData(37995966, now.minus(3, ChronoUnit.MINUTES).epochSecond, LowerBoundType.STATE) + + lowerBounds.updateBound(lowerBound1) + + val predicted = lowerBounds.predictNextBound(LowerBoundType.STATE) + println(predicted) + + assertThat(predicted) + .isLessThan(37996030) + .isGreaterThan(37996020) + } + + @Test + fun `update different bounds`() { + val lowerBounds = LowerBounds(Chain.ETHEREUM__MAINNET) + val lowerBoundState1 = LowerBoundData(15060L, 1010, LowerBoundType.STATE) + val lowerBoundState2 = LowerBoundData(16060L, 1020, LowerBoundType.STATE) + val lowerBoundState3 = LowerBoundData(17060L, 1030, LowerBoundType.STATE) + val lowerBoundBlock1 = LowerBoundData(20000, 1010, LowerBoundType.BLOCK) + val lowerBoundBlock2 = LowerBoundData(21000, 1020, LowerBoundType.BLOCK) + val lowerBoundBlock3 = LowerBoundData(22000, 1030, LowerBoundType.BLOCK) + + lowerBounds.updateBound(lowerBoundState1) + lowerBounds.updateBound(lowerBoundState2) + lowerBounds.updateBound(lowerBoundState3) + lowerBounds.updateBound(lowerBoundBlock1) + lowerBounds.updateBound(lowerBoundBlock2) + lowerBounds.updateBound(lowerBoundBlock3) + + assertThat(lowerBounds.getLastBound(LowerBoundType.STATE)).isEqualTo(lowerBoundState3) + assertThat(lowerBounds.getAllBounds(LowerBoundType.STATE)).isEqualTo(listOf(lowerBoundState1, lowerBoundState2, lowerBoundState3)) + + assertThat(lowerBounds.getLastBound(LowerBoundType.BLOCK)).isEqualTo(lowerBoundBlock3) + assertThat(lowerBounds.getAllBounds(LowerBoundType.BLOCK)).isEqualTo(listOf(lowerBoundBlock1, lowerBoundBlock2, lowerBoundBlock3)) + } +} diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundServiceTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/RecursiveLowerBoundServiceTest.kt similarity index 95% rename from src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundServiceTest.kt rename to src/test/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/RecursiveLowerBoundServiceTest.kt index 437e5e7f7..3a0a03ed6 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/RecursiveLowerBoundServiceTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/lowerbound/RecursiveLowerBoundServiceTest.kt @@ -1,15 +1,15 @@ -package io.emeraldpay.dshackle.upstream +package io.emeraldpay.dshackle.upstream.lowerbound import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.reader.ChainReader +import io.emeraldpay.dshackle.upstream.ChainRequest +import io.emeraldpay.dshackle.upstream.ChainResponse +import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundService import io.emeraldpay.dshackle.upstream.ethereum.EthereumLowerBoundTxDetector.Companion.MAX_OFFSET import io.emeraldpay.dshackle.upstream.ethereum.ZERO_ADDRESS -import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData -import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService -import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType -import io.emeraldpay.dshackle.upstream.lowerbound.toHex import io.emeraldpay.dshackle.upstream.polkadot.PolkadotLowerBoundService import io.emeraldpay.dshackle.upstream.rpcclient.ListParams import org.assertj.core.api.Assertions.assertThat @@ -77,6 +77,7 @@ class RecursiveLowerBoundServiceTest { on { getId() } doReturn "id" on { getHead() } doReturn head on { getIngressReader() } doReturn reader + on { getChain() } doReturn Chain.UNSPECIFIED } val detector = EthereumLowerBoundService(Chain.UNSPECIFIED, upstream) @@ -130,6 +131,7 @@ class RecursiveLowerBoundServiceTest { val upstream = mock { on { getHead() } doReturn head on { getIngressReader() } doReturn reader + on { getChain() } doReturn Chain.STARKNET__MAINNET } val detector = PolkadotLowerBoundService(Chain.UNSPECIFIED, upstream) @@ -162,6 +164,7 @@ class RecursiveLowerBoundServiceTest { val upstream = mock { on { getHead() } doReturn head on { getIngressReader() } doReturn reader + on { getChain() } doReturn Chain.UNSPECIFIED } val detector = detectorClass.getConstructor(Chain::class.java, Upstream::class.java).newInstance(Chain.UNSPECIFIED, upstream) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundServiceTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundServiceTest.kt index 6df90e152..7d0be676b 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundServiceTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaLowerBoundServiceTest.kt @@ -54,6 +54,7 @@ class SolanaLowerBoundServiceTest { } val upstream = mock { on { getIngressReader() } doReturn reader + on { getChain() } doReturn Chain.UNSPECIFIED } val detector = SolanaLowerBoundService(Chain.UNSPECIFIED, upstream) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundStateDetectorTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundStateDetectorTest.kt index 81ba18454..473aefbae 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundStateDetectorTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetLowerBoundStateDetectorTest.kt @@ -1,5 +1,6 @@ package io.emeraldpay.dshackle.upstream.starknet +import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType import org.junit.jupiter.api.Test @@ -10,7 +11,7 @@ class StarknetLowerBoundStateDetectorTest { @Test fun `starknet lower block is 1`() { - val detector = StarknetLowerBoundStateDetector() + val detector = StarknetLowerBoundStateDetector(Chain.STARKNET__MAINNET) StepVerifier.withVirtualTime { detector.detectLowerBound() } .expectSubscription()