Skip to content

Commit

Permalink
Reduce time if no matchers were found (#343)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Nov 17, 2023
1 parent 92f6e05 commit 6182a91
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 127 deletions.
72 changes: 22 additions & 50 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/FilteredApis.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,20 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.math.max
import kotlin.math.min
import kotlin.math.pow
import kotlin.math.roundToLong
import kotlin.random.Random

class FilteredApis(
val chain: Chain,
private val allUpstreams: List<Upstream>,
private val matcher: Selector.Matcher,
matcher: Selector.Matcher,
private val pos: Int,
/**
* Limit of retries
*/
private val retryLimit: Long,
jitter: Int,
private val retries: Int,
) : ApiSource {
private val internalMatcher: Selector.Matcher

companion object {
private val log = LoggerFactory.getLogger(FilteredApis::class.java)

private const val DEFAULT_DELAY_STEP = 100
private const val MAX_WAIT_MILLIS = 5000L
private const val DEFAULT_RETRY_LIMIT = 3

private const val metricsCode = "select"

Expand All @@ -78,17 +68,24 @@ class FilteredApis(
allUpstreams: List<Upstream>,
matcher: Selector.Matcher,
pos: Int,
) : this(chain, allUpstreams, matcher, pos, 10, 7)
) : this(chain, allUpstreams, matcher, pos, DEFAULT_RETRY_LIMIT)

constructor(
chain: Chain,
allUpstreams: List<Upstream>,
matcher: Selector.Matcher,
) : this(chain, allUpstreams, matcher, 0, 10, 10)
) : this(chain, allUpstreams, matcher, 0, DEFAULT_RETRY_LIMIT)

private val delay: Int
private val primaryUpstreams: List<Upstream>
private val secondaryUpstreams: List<Upstream>
private val primaryUpstreams: List<Upstream> = allUpstreams.filter {
it.getRole() == UpstreamsConfig.UpstreamRole.PRIMARY
}.let {
startFrom(it, pos)
}
private val secondaryUpstreams: List<Upstream> = allUpstreams.filter {
it.getRole() == UpstreamsConfig.UpstreamRole.SECONDARY
}.let {
startFrom(it, pos)
}
private val standardWithFallback: List<Upstream>

private val counter: AtomicInteger = AtomicInteger(0)
Expand All @@ -98,22 +95,6 @@ class FilteredApis(
private var upstreamsMatchesResponse: UpstreamsMatchesResponse? = UpstreamsMatchesResponse()

init {
delay = if (jitter > 0) {
Random.nextInt(DEFAULT_DELAY_STEP - jitter, DEFAULT_DELAY_STEP + jitter)
} else {
DEFAULT_DELAY_STEP
}

primaryUpstreams = allUpstreams.filter {
it.getRole() == UpstreamsConfig.UpstreamRole.PRIMARY
}.let {
startFrom(it, pos)
}
secondaryUpstreams = allUpstreams.filter {
it.getRole() == UpstreamsConfig.UpstreamRole.SECONDARY
}.let {
startFrom(it, pos)
}
val fallbackUpstreams = allUpstreams.filter {
it.getRole() == UpstreamsConfig.UpstreamRole.FALLBACK
}.let {
Expand Down Expand Up @@ -154,29 +135,20 @@ class FilteredApis(
}
}

fun waitDuration(rawn: Long): Duration {
val n = max(rawn, 1)
val time = min(
(n.toDouble().pow(2.0) * delay).roundToLong(),
MAX_WAIT_MILLIS,
)
return Duration.ofMillis(time)
}

override fun subscribe(subscriber: Subscriber<in Upstream>) {
// initially try only standard upstreams
val first = Flux.fromIterable(primaryUpstreams.sortedBy { it.getStatus().grpcId })
val second = Flux.fromIterable(secondaryUpstreams.sortedBy { it.getStatus().grpcId })
// if all failed, try both standard and fallback upstreams, repeating in cycle
val retries = (0 until (retryLimit - 1)).map { r ->
Flux.fromIterable(standardWithFallback.sortedBy { it.getStatus().grpcId })
// add a delay to let upstream to restore if it's a temp failure
// but delay only start of the check, not between upstreams
// i.e. if all upstreams failed -> wait -> check all without waiting in between
.delaySubscription(waitDuration(r + 1))
val retries = (0 until this.retries).map {
val retryDelay = (it + 1) * 30
Flux.fromIterable(
standardWithFallback.sortedBy { up -> up.getStatus().grpcId },
).delaySubscription(Duration.ofMillis(retryDelay.toLong()))
}.let { Flux.concat(it) }

var result = Flux.concat(first, second, retries).take(Duration.ofSeconds(3))
val size = primaryUpstreams.size + secondaryUpstreams.size + standardWithFallback.size * this.retries
var result = Flux.concat(first, second, retries).take(size.toLong(), false)

if (Global.metricsExtended) {
var count = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class FilteredApisSpec extends Specification {
it.setStatus(UpstreamAvailability.OK)
}
when:
def iter = new FilteredApis(Chain.ETHEREUM__MAINNET, upstreams, matcher, 0, 1, 0)
def iter = new FilteredApis(Chain.ETHEREUM__MAINNET, upstreams, matcher, 0, 0)
iter.request(10)
then:
StepVerifier.create(iter)
Expand All @@ -98,7 +98,7 @@ class FilteredApisSpec extends Specification {
.verify(Duration.ofSeconds(1))

when:
iter = new FilteredApis(Chain.ETHEREUM__MAINNET, upstreams, matcher, 1, 1, 0)
iter = new FilteredApis(Chain.ETHEREUM__MAINNET, upstreams, matcher, 1, 0)
iter.request(10)
then:
StepVerifier.create(iter)
Expand All @@ -109,7 +109,7 @@ class FilteredApisSpec extends Specification {
.verify(Duration.ofSeconds(1))

when:
iter = new FilteredApis(Chain.ETHEREUM__MAINNET, upstreams, matcher, 1, 2, 0)
iter = new FilteredApis(Chain.ETHEREUM__MAINNET, upstreams, matcher, 1, 2)
iter.request(10)
then:
StepVerifier.create(iter)
Expand All @@ -119,75 +119,13 @@ class FilteredApisSpec extends Specification {
.expectNext(upstreams[2])
.expectNext(upstreams[3])
.expectNext(upstreams[0])
.expectNext(upstreams[2])
.expectNext(upstreams[3])
.expectNext(upstreams[0])
.expectComplete()
.verify(Duration.ofSeconds(1))
}

def "Exponential backoff"() {
setup:
def apis = new FilteredApis(Chain.ETHEREUM__MAINNET, [], Selector.empty, 0, 1, 0)
expect:
wait == apis.waitDuration(n).toMillis() as Integer
where:
n | wait
0 | 100
1 | 100
2 | 400
3 | 900
4 | 1600
5 | 2500
6 | 3600
7 | 4900
8 | 5000
9 | 5000
10 | 5000
-1 | 100
}

@Retry
def "Backoff uses jitter"() {
setup:
def apis = new FilteredApis(Chain.ETHEREUM__MAINNET, [], Selector.empty, 0, 1, 20)
when:
def act = apis.waitDuration(1).toMillis()
println act
then:
act >= 80
act <= 120
act != 100

when:
act = apis.waitDuration(3).toMillis()
println act
then:
act >= 900 - 9 * 20
act <= 900 + 9 * 20
act != 900
}

def "Makes pause between batches"() {
when:
def api1 = TestingCommons.api()
def api2 = TestingCommons.api()
def up1 = TestingCommons.upstream(api1)
def up2 = TestingCommons.upstream(api2)
then:
StepVerifier.withVirtualTime({
def apis = new FilteredApis(Chain.ETHEREUM__MAINNET, [up1, up2], Selector.empty, 0, 4, 0)
apis.request(10)
return apis
})
.expectNext(up1, up2).as("Batch 1")
.expectNoEvent(Duration.ofMillis(100)).as("Wait 1")
.expectNext(up1, up2).as("Batch 2")
.expectNoEvent(Duration.ofMillis(400)).as("Wait 2")
.expectNext(up1, up2).as("Batch 3")
.expectNoEvent(Duration.ofMillis(900)).as("Wait 3")
.expectNext(up1, up2).as("Batch 4")
.expectComplete()
.verify(Duration.ofSeconds(10))
}

def "Starts with right position"() {
setup:
def apis = (0..5).collect {
Expand All @@ -197,7 +135,7 @@ class FilteredApisSpec extends Specification {
TestingCommons.upstream(it)
}
when:
def act = new FilteredApis(Chain.ETHEREUM__MAINNET, ups, Selector.empty, 2, 1, 0)
def act = new FilteredApis(Chain.ETHEREUM__MAINNET, ups, Selector.empty, 2, 0)
act.request(10)
then:
StepVerifier.create(act)
Expand All @@ -216,7 +154,7 @@ class FilteredApisSpec extends Specification {
TestingCommons.upstream(it)
}
when:
def act = new FilteredApis(Chain.ETHEREUM__MAINNET, ups, Selector.empty, 2, 1, 0)
def act = new FilteredApis(Chain.ETHEREUM__MAINNET, ups, Selector.empty, 2, 0)
act.request(3)
then:
StepVerifier.create(act)
Expand Down Expand Up @@ -275,17 +213,13 @@ class FilteredApisSpec extends Specification {
when:
def act = new FilteredApis(Chain.ETHEREUM__MAINNET,
[] + fallback + standard,
Selector.empty, 0, 3, 0)
Selector.empty, 0, 1)
act.request(10)
then:
StepVerifier.create(act)
.expectNext(standard[0], standard[1]).as("Initial requests")

.expectNext(standard[0], standard[1]).as("Retry with standard")
.expectNext(fallback[0]).as("Retry with fallback")

.expectNext(standard[0], standard[1]).as("Second retry with standard")
.expectNext(fallback[0]).as("Second retry with fallback")
.expectComplete()
.verify(Duration.ofSeconds(1))
}
Expand Down Expand Up @@ -315,7 +249,7 @@ class FilteredApisSpec extends Specification {
when:
def act = new FilteredApis(Chain.ETHEREUM__MAINNET,
[] + fallback + standard + secondary,
Selector.empty, 0, 3, 0)
Selector.empty, 0, 2)
act.request(11)
then:
StepVerifier.create(act)
Expand Down Expand Up @@ -352,7 +286,7 @@ class FilteredApisSpec extends Specification {
when:
def act = new FilteredApis(Chain.ETHEREUM__MAINNET,
[] + lagging + ok,
Selector.empty, 0, 2, 0)
Selector.empty, 0, 1)
act.request(4)
then:
StepVerifier.create(act)
Expand Down

0 comments on commit 6182a91

Please sign in to comment.