From 6182a91b954273ac10085f19172b36ada3ae2ff0 Mon Sep 17 00:00:00 2001 From: KirillPamPam Date: Fri, 17 Nov 2023 17:11:47 +0400 Subject: [PATCH] Reduce time if no matchers were found (#343) --- .../dshackle/upstream/FilteredApis.kt | 72 +++++---------- .../dshackle/upstream/FilteredApisSpec.groovy | 88 +++---------------- 2 files changed, 33 insertions(+), 127 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/FilteredApis.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/FilteredApis.kt index bd4a473d4..43266c529 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/FilteredApis.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/FilteredApis.kt @@ -32,30 +32,20 @@ import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.Lock import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock -import kotlin.math.max -import kotlin.math.min -import kotlin.math.pow -import kotlin.math.roundToLong -import kotlin.random.Random class FilteredApis( val chain: Chain, private val allUpstreams: List, - private val matcher: Selector.Matcher, + matcher: Selector.Matcher, private val pos: Int, - /** - * Limit of retries - */ - private val retryLimit: Long, - jitter: Int, + private val retries: Int, ) : ApiSource { private val internalMatcher: Selector.Matcher companion object { private val log = LoggerFactory.getLogger(FilteredApis::class.java) - private const val DEFAULT_DELAY_STEP = 100 - private const val MAX_WAIT_MILLIS = 5000L + private const val DEFAULT_RETRY_LIMIT = 3 private const val metricsCode = "select" @@ -78,17 +68,24 @@ class FilteredApis( allUpstreams: List, matcher: Selector.Matcher, pos: Int, - ) : this(chain, allUpstreams, matcher, pos, 10, 7) + ) : this(chain, allUpstreams, matcher, pos, DEFAULT_RETRY_LIMIT) constructor( chain: Chain, allUpstreams: List, matcher: Selector.Matcher, - ) : this(chain, allUpstreams, matcher, 0, 10, 10) + ) : this(chain, allUpstreams, matcher, 0, DEFAULT_RETRY_LIMIT) - private val delay: Int - private val primaryUpstreams: List - private val secondaryUpstreams: List + private val primaryUpstreams: List = allUpstreams.filter { + it.getRole() == UpstreamsConfig.UpstreamRole.PRIMARY + }.let { + startFrom(it, pos) + } + private val secondaryUpstreams: List = allUpstreams.filter { + it.getRole() == UpstreamsConfig.UpstreamRole.SECONDARY + }.let { + startFrom(it, pos) + } private val standardWithFallback: List private val counter: AtomicInteger = AtomicInteger(0) @@ -98,22 +95,6 @@ class FilteredApis( private var upstreamsMatchesResponse: UpstreamsMatchesResponse? = UpstreamsMatchesResponse() init { - delay = if (jitter > 0) { - Random.nextInt(DEFAULT_DELAY_STEP - jitter, DEFAULT_DELAY_STEP + jitter) - } else { - DEFAULT_DELAY_STEP - } - - primaryUpstreams = allUpstreams.filter { - it.getRole() == UpstreamsConfig.UpstreamRole.PRIMARY - }.let { - startFrom(it, pos) - } - secondaryUpstreams = allUpstreams.filter { - it.getRole() == UpstreamsConfig.UpstreamRole.SECONDARY - }.let { - startFrom(it, pos) - } val fallbackUpstreams = allUpstreams.filter { it.getRole() == UpstreamsConfig.UpstreamRole.FALLBACK }.let { @@ -154,29 +135,20 @@ class FilteredApis( } } - fun waitDuration(rawn: Long): Duration { - val n = max(rawn, 1) - val time = min( - (n.toDouble().pow(2.0) * delay).roundToLong(), - MAX_WAIT_MILLIS, - ) - return Duration.ofMillis(time) - } - override fun subscribe(subscriber: Subscriber) { // initially try only standard upstreams val first = Flux.fromIterable(primaryUpstreams.sortedBy { it.getStatus().grpcId }) val second = Flux.fromIterable(secondaryUpstreams.sortedBy { it.getStatus().grpcId }) // if all failed, try both standard and fallback upstreams, repeating in cycle - val retries = (0 until (retryLimit - 1)).map { r -> - Flux.fromIterable(standardWithFallback.sortedBy { it.getStatus().grpcId }) - // add a delay to let upstream to restore if it's a temp failure - // but delay only start of the check, not between upstreams - // i.e. if all upstreams failed -> wait -> check all without waiting in between - .delaySubscription(waitDuration(r + 1)) + val retries = (0 until this.retries).map { + val retryDelay = (it + 1) * 30 + Flux.fromIterable( + standardWithFallback.sortedBy { up -> up.getStatus().grpcId }, + ).delaySubscription(Duration.ofMillis(retryDelay.toLong())) }.let { Flux.concat(it) } - var result = Flux.concat(first, second, retries).take(Duration.ofSeconds(3)) + val size = primaryUpstreams.size + secondaryUpstreams.size + standardWithFallback.size * this.retries + var result = Flux.concat(first, second, retries).take(size.toLong(), false) if (Global.metricsExtended) { var count = 0 diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy index 1a2cad449..3a629d4ef 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy @@ -87,7 +87,7 @@ class FilteredApisSpec extends Specification { it.setStatus(UpstreamAvailability.OK) } when: - def iter = new FilteredApis(Chain.ETHEREUM__MAINNET, upstreams, matcher, 0, 1, 0) + def iter = new FilteredApis(Chain.ETHEREUM__MAINNET, upstreams, matcher, 0, 0) iter.request(10) then: StepVerifier.create(iter) @@ -98,7 +98,7 @@ class FilteredApisSpec extends Specification { .verify(Duration.ofSeconds(1)) when: - iter = new FilteredApis(Chain.ETHEREUM__MAINNET, upstreams, matcher, 1, 1, 0) + iter = new FilteredApis(Chain.ETHEREUM__MAINNET, upstreams, matcher, 1, 0) iter.request(10) then: StepVerifier.create(iter) @@ -109,7 +109,7 @@ class FilteredApisSpec extends Specification { .verify(Duration.ofSeconds(1)) when: - iter = new FilteredApis(Chain.ETHEREUM__MAINNET, upstreams, matcher, 1, 2, 0) + iter = new FilteredApis(Chain.ETHEREUM__MAINNET, upstreams, matcher, 1, 2) iter.request(10) then: StepVerifier.create(iter) @@ -119,75 +119,13 @@ class FilteredApisSpec extends Specification { .expectNext(upstreams[2]) .expectNext(upstreams[3]) .expectNext(upstreams[0]) + .expectNext(upstreams[2]) + .expectNext(upstreams[3]) + .expectNext(upstreams[0]) .expectComplete() .verify(Duration.ofSeconds(1)) } - def "Exponential backoff"() { - setup: - def apis = new FilteredApis(Chain.ETHEREUM__MAINNET, [], Selector.empty, 0, 1, 0) - expect: - wait == apis.waitDuration(n).toMillis() as Integer - where: - n | wait - 0 | 100 - 1 | 100 - 2 | 400 - 3 | 900 - 4 | 1600 - 5 | 2500 - 6 | 3600 - 7 | 4900 - 8 | 5000 - 9 | 5000 - 10 | 5000 - -1 | 100 - } - - @Retry - def "Backoff uses jitter"() { - setup: - def apis = new FilteredApis(Chain.ETHEREUM__MAINNET, [], Selector.empty, 0, 1, 20) - when: - def act = apis.waitDuration(1).toMillis() - println act - then: - act >= 80 - act <= 120 - act != 100 - - when: - act = apis.waitDuration(3).toMillis() - println act - then: - act >= 900 - 9 * 20 - act <= 900 + 9 * 20 - act != 900 - } - - def "Makes pause between batches"() { - when: - def api1 = TestingCommons.api() - def api2 = TestingCommons.api() - def up1 = TestingCommons.upstream(api1) - def up2 = TestingCommons.upstream(api2) - then: - StepVerifier.withVirtualTime({ - def apis = new FilteredApis(Chain.ETHEREUM__MAINNET, [up1, up2], Selector.empty, 0, 4, 0) - apis.request(10) - return apis - }) - .expectNext(up1, up2).as("Batch 1") - .expectNoEvent(Duration.ofMillis(100)).as("Wait 1") - .expectNext(up1, up2).as("Batch 2") - .expectNoEvent(Duration.ofMillis(400)).as("Wait 2") - .expectNext(up1, up2).as("Batch 3") - .expectNoEvent(Duration.ofMillis(900)).as("Wait 3") - .expectNext(up1, up2).as("Batch 4") - .expectComplete() - .verify(Duration.ofSeconds(10)) - } - def "Starts with right position"() { setup: def apis = (0..5).collect { @@ -197,7 +135,7 @@ class FilteredApisSpec extends Specification { TestingCommons.upstream(it) } when: - def act = new FilteredApis(Chain.ETHEREUM__MAINNET, ups, Selector.empty, 2, 1, 0) + def act = new FilteredApis(Chain.ETHEREUM__MAINNET, ups, Selector.empty, 2, 0) act.request(10) then: StepVerifier.create(act) @@ -216,7 +154,7 @@ class FilteredApisSpec extends Specification { TestingCommons.upstream(it) } when: - def act = new FilteredApis(Chain.ETHEREUM__MAINNET, ups, Selector.empty, 2, 1, 0) + def act = new FilteredApis(Chain.ETHEREUM__MAINNET, ups, Selector.empty, 2, 0) act.request(3) then: StepVerifier.create(act) @@ -275,17 +213,13 @@ class FilteredApisSpec extends Specification { when: def act = new FilteredApis(Chain.ETHEREUM__MAINNET, [] + fallback + standard, - Selector.empty, 0, 3, 0) + Selector.empty, 0, 1) act.request(10) then: StepVerifier.create(act) .expectNext(standard[0], standard[1]).as("Initial requests") - .expectNext(standard[0], standard[1]).as("Retry with standard") .expectNext(fallback[0]).as("Retry with fallback") - - .expectNext(standard[0], standard[1]).as("Second retry with standard") - .expectNext(fallback[0]).as("Second retry with fallback") .expectComplete() .verify(Duration.ofSeconds(1)) } @@ -315,7 +249,7 @@ class FilteredApisSpec extends Specification { when: def act = new FilteredApis(Chain.ETHEREUM__MAINNET, [] + fallback + standard + secondary, - Selector.empty, 0, 3, 0) + Selector.empty, 0, 2) act.request(11) then: StepVerifier.create(act) @@ -352,7 +286,7 @@ class FilteredApisSpec extends Specification { when: def act = new FilteredApis(Chain.ETHEREUM__MAINNET, [] + lagging + ok, - Selector.empty, 0, 2, 0) + Selector.empty, 0, 1) act.request(4) then: StepVerifier.create(act)