diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt index 7edce49bf..d82cce039 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt @@ -142,7 +142,15 @@ abstract class DefaultUpstream( if (forked.get()) { return UpstreamAvailability.IMMATURE } - return status.get().status + val value = status.get().status + if (value == UpstreamAvailability.UNAVAILABLE || value == UpstreamAvailability.SYNCING) { + return value + } + // if height is 0, then it's definitely on OK, probably just started with syncing + if (this.getHead().getCurrentHeight() == 0L) { + return UpstreamAvailability.SYNCING + } + return value } open fun setStatus(status: UpstreamAvailability) { @@ -158,13 +166,21 @@ abstract class DefaultUpstream( // the status calculation and trust the proposed value as is return proposed } - return if (proposed == UpstreamAvailability.OK) { - when { - lag > 6 -> UpstreamAvailability.SYNCING - lag > 1 -> UpstreamAvailability.LAGGING - else -> proposed + return when { + proposed == UpstreamAvailability.OK -> { + // make sure it's actually usable + when { + lag > 6 -> UpstreamAvailability.SYNCING + lag > 1 -> UpstreamAvailability.LAGGING + else -> proposed + } } - } else proposed + proposed == UpstreamAvailability.LAGGING && lag > 6 -> { + // to large lag, mark as syncing + UpstreamAvailability.SYNCING + } + else -> proposed + } } val availabilityByForks: Flux diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstreamTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstreamTest.kt index 1cbe191d6..9dea02f37 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstreamTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstreamTest.kt @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream import io.emeraldpay.api.Chain import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.reader.StandardRpcReader +import io.emeraldpay.dshackle.test.FixedHead import io.emeraldpay.dshackle.test.ForkWatchMock import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods import io.kotest.core.spec.style.ShouldSpec @@ -15,6 +16,7 @@ class DefaultUpstreamTest : ShouldSpec({ should("Produce current status availability") { val upstream = DefaultUpstreamTestImpl("test", ForkWatch.Never()) + upstream.setLag(3) upstream.setStatus(UpstreamAvailability.LAGGING) val act = upstream.availabilityByStatus.take(1) .collectList().block(Duration.ofSeconds(1)) @@ -23,15 +25,17 @@ class DefaultUpstreamTest : ShouldSpec({ should("Produce updated status availability") { val upstream = DefaultUpstreamTestImpl("test", ForkWatch.Never()) + upstream.setLag(3) upstream.setStatus(UpstreamAvailability.LAGGING) val act = upstream.availabilityByStatus.take(2) StepVerifier.create(act) .expectNext(UpstreamAvailability.LAGGING) .then { + upstream.setLag(10) upstream.setStatus(UpstreamAvailability.OK) } - // syncing because we didn't set the height lag + // syncing because lag is too big .expectNext(UpstreamAvailability.SYNCING) .expectComplete() .verify(Duration.ofSeconds(1)) @@ -115,17 +119,34 @@ class DefaultUpstreamTest : ShouldSpec({ upstream.watchHttpCodes.accept(429) upstream.getStatus() shouldBe UpstreamAvailability.UNAVAILABLE } + + should("Be Syncing when height is 0") { + val upstream = DefaultUpstreamTestImpl("test", ForkWatch.Never(), head = FixedHead(0)) + upstream.setStatus(UpstreamAvailability.LAGGING) + val act = upstream.getStatus() + act shouldBe UpstreamAvailability.SYNCING + } + + should("Switch to syncing when lag is too large") { + val upstream = DefaultUpstreamTestImpl("test", ForkWatch.Never()) + upstream.setLag(100) + upstream.setStatus(UpstreamAvailability.LAGGING) + val act = upstream.availabilityByStatus.take(1) + .collectList().block(Duration.ofSeconds(1)) + act shouldBe listOf(UpstreamAvailability.SYNCING) + } }) class DefaultUpstreamTestImpl( id: String, forkWatch: ForkWatch, - options: UpstreamsConfig.Options = UpstreamsConfig.PartialOptions.getDefaults().build() + options: UpstreamsConfig.Options = UpstreamsConfig.PartialOptions.getDefaults().build(), + private val head: Head = FixedHead(100) ) : DefaultUpstream( id, Chain.ETHEREUM, forkWatch, options, UpstreamsConfig.UpstreamRole.PRIMARY, DefaultEthereumMethods(Chain.ETHEREUM) ) { override fun getHead(): Head { - throw NotImplementedError() + return head } override fun getIngressReader(): StandardRpcReader {