Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

problem: treats a fresh Syncing node as a normal one #274

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,15 @@ abstract class DefaultUpstream(
if (forked.get()) {
return UpstreamAvailability.IMMATURE
}
return status.get().status
val value = status.get().status
if (value == UpstreamAvailability.UNAVAILABLE || value == UpstreamAvailability.SYNCING) {
return value
}
// if height is 0, then it's definitely on OK, probably just started with syncing
if (this.getHead().getCurrentHeight() == 0L) {
return UpstreamAvailability.SYNCING
}
return value
}

open fun setStatus(status: UpstreamAvailability) {
Expand All @@ -158,13 +166,21 @@ abstract class DefaultUpstream(
// the status calculation and trust the proposed value as is
return proposed
}
return if (proposed == UpstreamAvailability.OK) {
when {
lag > 6 -> UpstreamAvailability.SYNCING
lag > 1 -> UpstreamAvailability.LAGGING
else -> proposed
return when {
proposed == UpstreamAvailability.OK -> {
// make sure it's actually usable
when {
lag > 6 -> UpstreamAvailability.SYNCING
lag > 1 -> UpstreamAvailability.LAGGING
else -> proposed
}
}
} else proposed
proposed == UpstreamAvailability.LAGGING && lag > 6 -> {
// to large lag, mark as syncing
UpstreamAvailability.SYNCING
}
else -> proposed
}
}

val availabilityByForks: Flux<UpstreamAvailability>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream
import io.emeraldpay.api.Chain
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.reader.StandardRpcReader
import io.emeraldpay.dshackle.test.FixedHead
import io.emeraldpay.dshackle.test.ForkWatchMock
import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods
import io.kotest.core.spec.style.ShouldSpec
Expand All @@ -15,6 +16,7 @@ class DefaultUpstreamTest : ShouldSpec({

should("Produce current status availability") {
val upstream = DefaultUpstreamTestImpl("test", ForkWatch.Never())
upstream.setLag(3)
upstream.setStatus(UpstreamAvailability.LAGGING)
val act = upstream.availabilityByStatus.take(1)
.collectList().block(Duration.ofSeconds(1))
Expand All @@ -23,15 +25,17 @@ class DefaultUpstreamTest : ShouldSpec({

should("Produce updated status availability") {
val upstream = DefaultUpstreamTestImpl("test", ForkWatch.Never())
upstream.setLag(3)
upstream.setStatus(UpstreamAvailability.LAGGING)
val act = upstream.availabilityByStatus.take(2)

StepVerifier.create(act)
.expectNext(UpstreamAvailability.LAGGING)
.then {
upstream.setLag(10)
upstream.setStatus(UpstreamAvailability.OK)
}
// syncing because we didn't set the height lag
// syncing because lag is too big
.expectNext(UpstreamAvailability.SYNCING)
.expectComplete()
.verify(Duration.ofSeconds(1))
Expand Down Expand Up @@ -115,17 +119,34 @@ class DefaultUpstreamTest : ShouldSpec({
upstream.watchHttpCodes.accept(429)
upstream.getStatus() shouldBe UpstreamAvailability.UNAVAILABLE
}

should("Be Syncing when height is 0") {
val upstream = DefaultUpstreamTestImpl("test", ForkWatch.Never(), head = FixedHead(0))
upstream.setStatus(UpstreamAvailability.LAGGING)
val act = upstream.getStatus()
act shouldBe UpstreamAvailability.SYNCING
}

should("Switch to syncing when lag is too large") {
val upstream = DefaultUpstreamTestImpl("test", ForkWatch.Never())
upstream.setLag(100)
upstream.setStatus(UpstreamAvailability.LAGGING)
val act = upstream.availabilityByStatus.take(1)
.collectList().block(Duration.ofSeconds(1))
act shouldBe listOf(UpstreamAvailability.SYNCING)
}
})

class DefaultUpstreamTestImpl(
id: String,
forkWatch: ForkWatch,
options: UpstreamsConfig.Options = UpstreamsConfig.PartialOptions.getDefaults().build()
options: UpstreamsConfig.Options = UpstreamsConfig.PartialOptions.getDefaults().build(),
private val head: Head = FixedHead(100)
) : DefaultUpstream(
id, Chain.ETHEREUM, forkWatch, options, UpstreamsConfig.UpstreamRole.PRIMARY, DefaultEthereumMethods(Chain.ETHEREUM)
) {
override fun getHead(): Head {
throw NotImplementedError()
return head
}

override fun getIngressReader(): StandardRpcReader {
Expand Down
Loading