Skip to content

Commit

Permalink
Predict lower height and use lower height matcher (#552)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Aug 22, 2024
1 parent 32cb726 commit e184bb3
Show file tree
Hide file tree
Showing 30 changed files with 582 additions and 40 deletions.
19 changes: 16 additions & 3 deletions buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import io.emeraldpay.dshackle.config.ChainsConfig
import io.emeraldpay.dshackle.config.ChainsConfigReader
import io.emeraldpay.dshackle.foundation.ChainOptionsReader
import java.math.BigInteger
import java.time.Duration
import kotlin.math.ceil

open class CodeGen(private val config: ChainsConfig) {
companion object {
Expand All @@ -25,7 +27,7 @@ open class CodeGen(private val config: ChainsConfig) {
builder.addEnumConstant(
"UNSPECIFIED",
TypeSpec.anonymousClassBuilder()
.addSuperclassConstructorParameter("%L, %S, %S, %S, %L, %L, %L", 0, "UNSPECIFIED", "Unknown", "0x0", "BigInteger.ZERO", "emptyList()", "BlockchainType.UNKNOWN")
.addSuperclassConstructorParameter("%L, %S, %S, %S, %L, %L, %L, %L", 0, "UNSPECIFIED", "Unknown", "0x0", "BigInteger.ZERO", "emptyList()", "BlockchainType.UNKNOWN", 0.0)
.build(),
)
for (chain in config) {
Expand All @@ -34,14 +36,15 @@ open class CodeGen(private val config: ChainsConfig) {
.replace(' ', '_'),
TypeSpec.anonymousClassBuilder()
.addSuperclassConstructorParameter(
"%L, %S, %S, %S, %L, %L, %L",
"%L, %S, %S, %S, %L, %L, %L, %L",
chain.grpcId,
chain.code,
chain.blockchain.replaceFirstChar { it.uppercase() } + " " + chain.id.replaceFirstChar { it.uppercase() },
chain.chainId,
"BigInteger(\"" + chain.netVersion + "\")",
"listOf(" + chain.shortNames.map { "\"${it}\"" }.joinToString() + ")",
type(chain.type)
type(chain.type),
averageRemoveSpeed(chain.expectedBlockTime),
)
.build(),
)
Expand Down Expand Up @@ -74,6 +77,7 @@ open class CodeGen(private val config: ChainsConfig) {
.addParameter("netVersion", BigInteger::class)
.addParameter("shortNames", List::class.asClassName().parameterizedBy(String::class.asClassName()))
.addParameter("type", BlockchainType::class)
.addParameter("averageRemoveDataSpeed", Double::class.java)
.build(),
)
.addProperty(
Expand Down Expand Up @@ -111,6 +115,11 @@ open class CodeGen(private val config: ChainsConfig) {
.initializer("type")
.build(),
)
.addProperty(
PropertySpec.builder("averageRemoveDataSpeed", Double::class)
.initializer("averageRemoveDataSpeed")
.build(),
)
).build()
return FileSpec.builder("io.emeraldpay.dshackle", "Chain")
.addType(chainType)
Expand All @@ -130,6 +139,10 @@ open class CodeGen(private val config: ChainsConfig) {
else -> throw IllegalArgumentException("unknown blockchain type $type")
}
}

private fun averageRemoveSpeed(expectedBlockTime: Duration): Double {
return ceil(1000.0/expectedBlockTime.toMillis()*100) / 100
}
}

open class ChainsCodeGenTask : DefaultTask() {
Expand Down
3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ httpcomponents = "4.5.8"
[libraries]
apache-commons-lang3 = "org.apache.commons:commons-lang3:3.9"
apache-commons-collections4 = "org.apache.commons:commons-collections4:4.3"
apache-commons-math3 = "org.apache.commons:commons-math3:3.6.1"

bitcoinj = "org.bitcoinj:bitcoinj-core:0.15.8"

Expand Down Expand Up @@ -122,7 +123,7 @@ auth0-jwt = "com.auth0:java-jwt:4.4.0"
mockito-inline = "org.mockito:mockito-inline:4.0.0"

[bundles]
apache-commons = ["commons-io", "apache-commons-lang3", "apache-commons-collections4"]
apache-commons = ["commons-io", "apache-commons-lang3", "apache-commons-collections4", "apache-commons-math3"]
grpc = ["grpc-protobuf", "grpc-stub", "grpc-netty", "grpc-proto-util", "grpc-services"]
httpcomponents = ["httpcomponents-httpmime", "httpcomponents-httpclient"]
jackson = ["jackson-core", "jackson-databind", "jackson-datatype-jdk8", "jackson-datatype-jsr310", "jackson-module-kotlin", "jackson-yaml"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ abstract class DefaultUpstream(
// NOOP
}

override fun predictLowerBound(type: LowerBoundType): Long {
return 0
}

protected fun sendUpstreamStateEvent(eventType: UpstreamChangeEvent.ChangeType) {
stateEventStream.emitNext(
UpstreamChangeEvent(chain, this, eventType),
Expand Down
15 changes: 15 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/MatchesResponse.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.emeraldpay.dshackle.upstream

import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType

sealed class MatchesResponse {

fun matched(): Boolean {
Expand All @@ -25,6 +27,13 @@ sealed class MatchesResponse {
.joinToString("; ") { it.getCause()!! }
is NotMatchedResponse -> "Not matched - ${response.getCause()}"
is SameNodeResponse -> "Upstream does not have hash ${this.upstreamHash}"
is LowerHeightResponse -> {
if (this.predictedHeight == 0L) {
"Upstream lower height of type ${this.boundType} cannot be predicted"
} else {
"Upstream lower height ${this.predictedHeight} of type ${this.boundType} is greater than ${this.lowerHeight}"
}
}
else -> null
}

Expand Down Expand Up @@ -71,6 +80,12 @@ sealed class MatchesResponse {

object GrpcResponse : MatchesResponse()

data class LowerHeightResponse(
val lowerHeight: Long,
val predictedHeight: Long,
val boundType: LowerBoundType,
) : MatchesResponse()

data class HeightResponse(
val height: Long,
val currentHeight: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ abstract class Multistream(
return getAll().any { it.isAvailable() }
}

override fun predictLowerBound(type: LowerBoundType): Long {
return 0
}

override fun getStatus(): UpstreamAvailability {
return state.getStatus()
}
Expand Down
41 changes: 39 additions & 2 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import io.emeraldpay.dshackle.upstream.MatchesResponse.CapabilityResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.ExistsResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.GrpcResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.HeightResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.LowerHeightResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.NotMatchedResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.SameNodeResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.SlotHeightResponse
import io.emeraldpay.dshackle.upstream.MatchesResponse.Success
import io.emeraldpay.dshackle.upstream.finalization.FinalizationType
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import io.emeraldpay.dshackle.upstream.lowerbound.fromProtoType
import org.apache.commons.lang3.StringUtils
import java.util.Collections
Expand Down Expand Up @@ -101,6 +103,16 @@ class Selector {
else -> empty
}
}
it.hasLowerHeightSelector() -> {
if (it.lowerHeightSelector.height > 0) {
LowerHeightMatcher(
it.lowerHeightSelector.height,
it.lowerHeightSelector.lowerBoundType.fromProtoType(),
)
} else {
empty
}
}
else -> empty
}
}.run {
Expand All @@ -112,8 +124,11 @@ class Selector {
private fun getSort(selectors: List<BlockchainOuterClass.Selector>): Sort {
selectors.forEach { selector ->
if (selector.hasHeightSelector()) {
return HeightNumberOrTag.fromHeightSelector(selector.heightSelector)?.getSort() ?: Sort.default
} else if (selector.hasLowerHeightSelector()) {
val heightSort = HeightNumberOrTag.fromHeightSelector(selector.heightSelector)?.getSort() ?: Sort.default
if (heightSort != Sort.default) {
return heightSort
}
} else if (selector.hasLowerHeightSelector() && selector.lowerHeightSelector.height == 0L) {
return Sort(
compareBy(nullsLast()) {
it.getLowerBound(selector.lowerHeightSelector.lowerBoundType.fromProtoType())?.lowerBound
Expand Down Expand Up @@ -546,6 +561,28 @@ class Selector {
}
}

data class LowerHeightMatcher(
private val lowerHeight: Long,
private val boundType: LowerBoundType,
) : Matcher() {
override fun matchesWithCause(up: Upstream): MatchesResponse {
val predictedLowerBound = up.predictLowerBound(boundType)
return if (lowerHeight >= predictedLowerBound && predictedLowerBound != 0L) {
Success
} else {
LowerHeightResponse(lowerHeight, predictedLowerBound, boundType)
}
}

override fun describeInternal(): String {
return "lower height $lowerHeight"
}

override fun toString(): String {
return "Matcher: ${describeInternal()}"
}
}

class HeightMatcher(val height: Long) : Matcher() {

override fun matchesWithCause(up: Upstream): MatchesResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ interface Upstream : Lifecycle {
fun addFinalization(finalization: FinalizationData, upstreamId: String)
fun getUpstreamSettingsData(): UpstreamSettingsData?
fun updateLowerBound(lowerBound: Long, type: LowerBoundType)
fun predictLowerBound(type: LowerBoundType): Long

fun getChain(): Chain

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundService

class BeaconChainLowerBoundService(
chain: Chain,
private val chain: Chain,
upstream: Upstream,
) : LowerBoundService(chain, upstream) {
override fun detectors(): List<LowerBoundDetector> {
return listOf(BeaconChainLowerBoundStateDetector())
return listOf(BeaconChainLowerBoundStateDetector(chain))
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package io.emeraldpay.dshackle.upstream.beaconchain

import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundData
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundDetector
import io.emeraldpay.dshackle.upstream.lowerbound.LowerBoundType
import reactor.core.publisher.Flux

class BeaconChainLowerBoundStateDetector : LowerBoundDetector() {
class BeaconChainLowerBoundStateDetector(
private val chain: Chain,
) : LowerBoundDetector(chain) {

override fun period(): Long {
return 120
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class CosmosLowerBoundService(

class CosmosLowerBoundStateDetector(
private val upstream: Upstream,
) : LowerBoundDetector() {
) : LowerBoundDetector(upstream.getChain()) {

override fun period(): Long {
return 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import reactor.core.publisher.Mono

class EthereumLowerBoundBlockDetector(
private val upstream: Upstream,
) : LowerBoundDetector() {
) : LowerBoundDetector(upstream.getChain()) {

companion object {
private const val NO_BLOCK_DATA = "No block data"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import reactor.core.publisher.Flux

class EthereumLowerBoundLogsDetector(
private val upstream: Upstream,
) : LowerBoundDetector() {
) : LowerBoundDetector(upstream.getChain()) {

companion object {
const val MAX_OFFSET = 20
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import reactor.core.publisher.Mono

class EthereumLowerBoundStateDetector(
private val upstream: Upstream,
) : LowerBoundDetector() {
) : LowerBoundDetector(upstream.getChain()) {
private val recursiveLowerBound = RecursiveLowerBound(upstream, LowerBoundType.STATE, stateErrors, lowerBounds)

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import reactor.core.publisher.Flux

class EthereumLowerBoundTxDetector(
private val upstream: Upstream,
) : LowerBoundDetector() {
) : LowerBoundDetector(upstream.getChain()) {

companion object {
const val MAX_OFFSET = 20
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,5 +354,9 @@ open class GenericUpstream(
lowerBoundService.updateLowerBound(lowerBound, type)
}

override fun predictLowerBound(type: LowerBoundType): Long {
return lowerBoundService.predictLowerBound(type)
}

fun isValid(): Boolean = isUpstreamValid.get()
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package io.emeraldpay.dshackle.upstream.lowerbound

import io.emeraldpay.dshackle.Chain
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean

fun Long.toHex() = "0x${this.toString(16)}"

abstract class LowerBoundDetector {
abstract class LowerBoundDetector(
chain: Chain,
) {
protected val log = LoggerFactory.getLogger(this::class.java)

protected val lowerBounds = ConcurrentHashMap<LowerBoundType, LowerBoundData>()
protected val lowerBounds = LowerBounds(chain)
private val lowerBoundSink = Sinks.many().multicast().directBestEffort<LowerBoundData>()

fun detectLowerBound(): Flux<LowerBoundData> {
Expand All @@ -35,10 +37,10 @@ abstract class LowerBoundDetector {
},
)
.filter {
it.lowerBound >= (lowerBounds[it.type]?.lowerBound ?: 0)
it.lowerBound >= (lowerBounds.getLastBound(it.type)?.lowerBound ?: 0)
}
.map {
lowerBounds[it.type] = it
lowerBounds.updateBound(it)
it
}
}
Expand All @@ -53,4 +55,8 @@ abstract class LowerBoundDetector {
fun updateLowerBound(lowerBound: Long, type: LowerBoundType) {
lowerBoundSink.emitNext(LowerBoundData(lowerBound, type)) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
}

fun predictLowerBound(type: LowerBoundType): Long {
return lowerBounds.predictNextBound(type)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ abstract class LowerBoundService(
.forEach { it.updateLowerBound(lowerBound, type) }
}

fun predictLowerBound(type: LowerBoundType): Long {
return detectors
.firstOrNull { it.types().contains(type) }
?.predictLowerBound(type)
?: 0
}

fun getLowerBounds(): Collection<LowerBoundData> = lowerBounds.values

fun getLowerBound(lowerBoundType: LowerBoundType): LowerBoundData? = lowerBounds[lowerBoundType]
Expand Down
Loading

0 comments on commit e184bb3

Please sign in to comment.