Skip to content

Commit

Permalink
Filter and sort upstreams if latest (#476)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored May 15, 2024
1 parent de4e163 commit b468b40
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,16 @@ interface RequestReaderFactory {
class Default : RequestReaderFactory {
override fun create(data: ReaderData): RequestReader {
if (data.quorum is MaximumValueQuorum || data.quorum is BroadcastQuorum) {
return BroadcastReader(data.multistream.getAll(), data.matcher, data.signer, data.quorum, data.tracer)
return BroadcastReader(data.multistream.getAll(), data.upstreamFilter.matcher, data.signer, data.quorum, data.tracer)
}
val apis = data.multistream.getApiSource(data.matcher)
val apis = data.multistream.getApiSource(data.upstreamFilter)
return QuorumRequestReader(apis, data.quorum, data.signer, data.tracer)
}
}

data class ReaderData(
val multistream: Multistream,
val matcher: Selector.Matcher,
val upstreamFilter: Selector.UpstreamFilter,
val quorum: CallQuorum,
val signer: ResponseSigner?,
val tracer: Tracer,
Expand Down
20 changes: 10 additions & 10 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,12 @@ open class NativeCall(
),
)
}
val requestMatcher = requestItem.selectorsList
val upstreamFilter = requestItem.selectorsList
.takeIf { it.isNotEmpty() }
?.run { Mono.just(Selector.convertToMatcher(this, upstream.getHead())) }
?.run { Selector.convertToUpstreamFilter(this) }
// for ethereum the actual block needed for the call may be specified in the call parameters
val callSpecificMatcher: Mono<Selector.Matcher> =
requestMatcher ?: upstream.callSelector?.getMatcher(method, params, upstream.getHead(), passthrough) ?: Mono.empty()
upstreamFilter?.matcher?.let { Mono.just(it) } ?: upstream.callSelector?.getMatcher(method, params, upstream.getHead(), passthrough) ?: Mono.empty()
return callSpecificMatcher.defaultIfEmpty(Selector.empty).map { csm ->
val matcher = Selector.Builder()
.withMatcher(csm)
Expand All @@ -363,7 +363,7 @@ open class NativeCall(
requestItem.id,
nonce,
upstream,
matcher.build(),
Selector.UpstreamFilter(upstreamFilter?.sort ?: Selector.Sort.default, matcher.build()),
callQuorum,
parsedCallDetails(requestItem),
requestDecorator,
Expand Down Expand Up @@ -433,7 +433,7 @@ open class NativeCall(
return Mono.error(RpcException(RpcResponseError.CODE_METHOD_NOT_EXIST, "Unsupported method"))
}
val reader = requestReaderFactory.create(
ReaderData(ctx.upstream, ctx.matcher, ctx.callQuorum, signer, tracer),
ReaderData(ctx.upstream, ctx.upstreamFilter, ctx.callQuorum, signer, tracer),
)
val counter = reader.attempts()

Expand Down Expand Up @@ -572,7 +572,7 @@ open class NativeCall(
val id: Int,
val nonce: Long?,
val upstream: Multistream,
val matcher: Selector.Matcher,
val upstreamFilter: Selector.UpstreamFilter,
val callQuorum: CallQuorum,
val payload: T,
val requestDecorator: RequestDecorator,
Expand All @@ -587,13 +587,13 @@ open class NativeCall(
id: Int,
nonce: Long?,
upstream: Multistream,
matcher: Selector.Matcher,
upstreamFilter: Selector.UpstreamFilter,
callQuorum: CallQuorum,
payload: T,
requestId: String,
requestCount: Int,
) : this(
id, nonce, upstream, matcher, callQuorum, payload,
id, nonce, upstream, upstreamFilter, callQuorum, payload,
NoneRequestDecorator(), NoneResultDecorator(), null, false, requestId, requestCount,
)

Expand All @@ -613,13 +613,13 @@ open class NativeCall(

fun <X> withPayload(payload: X): ValidCallContext<X> {
return ValidCallContext(
id, nonce, upstream, matcher, callQuorum, payload,
id, nonce, upstream, upstreamFilter, callQuorum, payload,
requestDecorator, resultDecorator, forwardedSelector, streamRequest, requestId, requestCount,
)
}

fun getApis(): ApiSource {
return upstream.getApiSource(matcher)
return upstream.getApiSource(upstreamFilter)
}
}

Expand Down
14 changes: 11 additions & 3 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/FilteredApis.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class FilteredApis(
matcher: Selector.Matcher,
private val pos: Int,
private val retries: Int,
sort: Selector.Sort = Selector.Sort.default,
) : ApiSource {
private val internalMatcher: Selector.Matcher

Expand Down Expand Up @@ -69,6 +70,13 @@ class FilteredApis(
pos: Int,
) : this(chain, allUpstreams, matcher, pos, DEFAULT_RETRY_LIMIT)

constructor(
chain: Chain,
allUpstreams: List<Upstream>,
upstreamFilter: Selector.UpstreamFilter,
pos: Int,
) : this(chain, allUpstreams, upstreamFilter.matcher, pos, DEFAULT_RETRY_LIMIT, upstreamFilter.sort)

constructor(
chain: Chain,
allUpstreams: List<Upstream>,
Expand All @@ -79,12 +87,12 @@ class FilteredApis(
it.getRole() == UpstreamsConfig.UpstreamRole.PRIMARY
}.let {
startFrom(it, pos)
}
}.sortedWith(sort.comparator)
private val secondaryUpstreams: List<Upstream> = allUpstreams.filter {
it.getRole() == UpstreamsConfig.UpstreamRole.SECONDARY
}.let {
startFrom(it, pos)
}
}.sortedWith(sort.comparator)
private val standardWithFallback: List<Upstream>

private val counter: AtomicInteger = AtomicInteger(0)
Expand All @@ -98,7 +106,7 @@ class FilteredApis(
it.getRole() == UpstreamsConfig.UpstreamRole.FALLBACK
}.let {
startFrom(it, pos)
}
}.sortedWith(sort.comparator)
standardWithFallback = emptyList<Upstream>()
.plus(primaryUpstreams)
.plus(secondaryUpstreams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,12 @@ abstract class Multistream(
/**
* Get a source for direct APIs
*/
open fun getApiSource(matcher: Selector.Matcher): ApiSource {
open fun getApiSource(upstreamFilter: Selector.UpstreamFilter): ApiSource {
val i = seq++
if (seq >= Int.MAX_VALUE / 2) {
seq = 0
}
return FilteredApis(chain, getUpstreams(), matcher, i)
return FilteredApis(chain, getUpstreams(), upstreamFilter, i)
}

/**
Expand Down
30 changes: 27 additions & 3 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Selector.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,19 @@ class Selector {
val anyLabel = AnyLabelMatcher()

@JvmStatic
fun convertToMatcher(selectors: List<BlockchainOuterClass.Selector>, head: Head): Matcher {
return selectors
fun convertToUpstreamFilter(selectors: List<BlockchainOuterClass.Selector>): UpstreamFilter {
val matcher = selectors
.map {
when {
it.hasSlotHeightSelector() -> {
SlotMatcher(it.slotHeightSelector.slotHeight)
}
it.hasHeightSelector() -> {
val height = if (it.heightSelector.height == -1L) head.getCurrentHeight() else it.heightSelector.height
val height = if (it.heightSelector.height == -1L) {
null
} else {
it.heightSelector.height
}
if (height == null) {
empty
} else {
Expand All @@ -61,6 +65,10 @@ class Selector {
}.run {
MultiMatcher(this)
}
val sort = selectors.firstOrNull { it.hasHeightSelector() && it.heightSelector.height == -1L }
?.let { Sort(compareByDescending { it.getHead().getCurrentHeight() }) }
?: Sort.default
return UpstreamFilter(sort, matcher)
}

@JvmStatic
Expand Down Expand Up @@ -153,6 +161,15 @@ class Selector {
}
}

data class Sort(
val comparator: Comparator<Upstream>,
) {
companion object {
@JvmStatic
val default = Sort(compareBy { null })
}
}

abstract class Matcher {
fun matches(up: Upstream): Boolean = matchesWithCause(up).matched()

Expand All @@ -161,6 +178,13 @@ class Selector {
abstract fun describeInternal(): String
}

data class UpstreamFilter(
val sort: Sort,
val matcher: Matcher,
) {
constructor(matcher: Matcher) : this(Sort.default, matcher)
}

data class MultiMatcher(
private val matchers: Collection<Matcher>,
) : Matcher() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ open class BitcoinMultistream(
* Finds an API that executed directly on a remote.
*/
open fun getDirectApi(matcher: Selector.Matcher): Mono<ChainReader> {
val apis = getApiSource(matcher)
val apis = getApiSource(Selector.UpstreamFilter(matcher))
apis.request(1)
return Mono.from(apis)
.map(Upstream::getIngressReader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RemoteUnspentReader(
)

override fun read(key: Address): Mono<List<SimpleUnspent>> {
val apis = upstreams.getApiSource(selector)
val apis = upstreams.getApiSource(Selector.UpstreamFilter(selector))
apis.request(1)
return Mono.empty()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class EthereumDirectReader(
it.create(
RequestReaderFactory.ReaderData(
up,
requestMatcher,
Selector.UpstreamFilter(requestMatcher),
callMethodsFactory.create().createQuorumFor(request.method),
null,
tracer,
Expand Down
34 changes: 17 additions & 17 deletions src/test/groovy/io/emeraldpay/dshackle/rpc/NativeCallSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class NativeCallSpec extends Specification {

def nativeCall = nativeCall()
def ctx = new NativeCall.ValidCallContext<NativeCall.ParsedCallDetails>(
1, null, upstream, Selector.empty, new AlwaysQuorum(),
1, null, upstream, new Selector.UpstreamFilter(Selector.empty), new AlwaysQuorum(),
new NativeCall.ParsedCallDetails("eth_test", new ListParams()), "reqId", 1
)

Expand All @@ -101,7 +101,7 @@ class NativeCallSpec extends Specification {

def nativeCall = nativeCall()
def ctx = new NativeCall.ValidCallContext<NativeCall.ParsedCallDetails>(
15, null, upstream, Selector.empty, new AlwaysQuorum(),
15, null, upstream, new Selector.UpstreamFilter(Selector.empty), new AlwaysQuorum(),
new NativeCall.ParsedCallDetails("eth_test", new ListParams()), "reqId", 1
)

Expand All @@ -126,7 +126,7 @@ class NativeCallSpec extends Specification {
1 * read(_) >> Mono.just(new RequestReader.Result("\"foo\"".bytes, null, 1, new Upstream.UpstreamSettingsData((byte)1, "test", "v"), null))
}
}
def call = new NativeCall.ValidCallContext(1, 10, TestingCommons.multistream(TestingCommons.api()), Selector.empty, quorum,
def call = new NativeCall.ValidCallContext(1, 10, TestingCommons.multistream(TestingCommons.api()), new Selector.UpstreamFilter(Selector.empty), quorum,
new NativeCall.ParsedCallDetails("eth_test", new ListParams()), "reqId", 1)

when:
Expand All @@ -148,7 +148,7 @@ class NativeCallSpec extends Specification {
1 * read(new ChainRequest("eth_test", new ListParams(), 10)) >> Mono.empty()
}
}
def call = new NativeCall.ValidCallContext(1, 10, TestingCommons.multistream(TestingCommons.api()), Selector.empty, quorum,
def call = new NativeCall.ValidCallContext(1, 10, TestingCommons.multistream(TestingCommons.api()), new Selector.UpstreamFilter(Selector.empty), quorum,
new NativeCall.ParsedCallDetails("eth_test", new ListParams()), "reqId", 1)

when:
Expand All @@ -174,7 +174,7 @@ class NativeCallSpec extends Specification {
)
}
}
def call = new NativeCall.ValidCallContext(12, 10, TestingCommons.multistream(TestingCommons.api()), Selector.empty, quorum,
def call = new NativeCall.ValidCallContext(12, 10, TestingCommons.multistream(TestingCommons.api()), new Selector.UpstreamFilter(Selector.empty), quorum,
new NativeCall.ParsedCallDetails("eth_test", new ListParams()), "reqId", 1)

when:
Expand Down Expand Up @@ -420,9 +420,9 @@ class NativeCallSpec extends Specification {
def act = nativeCall.prepareCall(req, multistream)
.collectList().block(Duration.ofSeconds(1)).first()
then:
act.matcher != null
act.matcher instanceof Selector.MultiMatcher
with((Selector.MultiMatcher) act.matcher) {
act.upstreamFilter.matcher != null
act.upstreamFilter.matcher instanceof Selector.MultiMatcher
with((Selector.MultiMatcher) act.upstreamFilter.matcher) {
it.getMatchers().size() >= 1
it.getMatcher(Selector.HeightMatcher) != null
with(it.getMatcher(Selector.HeightMatcher)) {
Expand Down Expand Up @@ -528,7 +528,7 @@ class NativeCallSpec extends Specification {
def "Parse empty params"() {
setup:
def nativeCall = nativeCall()
def ctx = new NativeCall.ValidCallContext(1, null, Stub(Multistream), Selector.empty, new AlwaysQuorum(),
def ctx = new NativeCall.ValidCallContext(1, null, Stub(Multistream), new Selector.UpstreamFilter(Selector.empty), new AlwaysQuorum(),
new NativeCall.ParsedCallDetails("eth_test", new ListParams()), "reqId", 1)
when:
def act = nativeCall.parseParams(ctx)
Expand All @@ -541,7 +541,7 @@ class NativeCallSpec extends Specification {
def "Parse none params"() {
setup:
def nativeCall = nativeCall()
def ctx = new NativeCall.ValidCallContext(1, null, Stub(Multistream), Selector.empty, new AlwaysQuorum(),
def ctx = new NativeCall.ValidCallContext(1, null, Stub(Multistream), new Selector.UpstreamFilter(Selector.empty), new AlwaysQuorum(),
new NativeCall.ParsedCallDetails("eth_test", new ListParams()), "reqId", 1)
when:
def act = nativeCall.parseParams(ctx)
Expand All @@ -554,7 +554,7 @@ class NativeCallSpec extends Specification {
def "Parse single param"() {
setup:
def nativeCall = nativeCall()
def ctx = new NativeCall.ValidCallContext(1, null, Stub(Multistream), Selector.empty, new AlwaysQuorum(),
def ctx = new NativeCall.ValidCallContext(1, null, Stub(Multistream), new Selector.UpstreamFilter(Selector.empty), new AlwaysQuorum(),
new NativeCall.ParsedCallDetails("eth_test", new ListParams(false)), "reqId", 1)
when:
def act = nativeCall.parseParams(ctx)
Expand All @@ -567,7 +567,7 @@ class NativeCallSpec extends Specification {
def "Parse multi param"() {
setup:
def nativeCall = nativeCall()
def ctx = new NativeCall.ValidCallContext(1, null, Stub(Multistream), Selector.empty, new AlwaysQuorum(),
def ctx = new NativeCall.ValidCallContext(1, null, Stub(Multistream), new Selector.UpstreamFilter(Selector.empty), new AlwaysQuorum(),
new NativeCall.ParsedCallDetails("eth_test", new ListParams(false, 123)), "reqId", 1)
when:
def act = nativeCall.parseParams(ctx)
Expand All @@ -580,7 +580,7 @@ class NativeCallSpec extends Specification {
def "Decorate eth_getFilterUpdates params"() {
setup:
def nativeCall = nativeCall()
def ctx = new NativeCall.ValidCallContext(1, null, Stub(Multistream), Selector.empty, new AlwaysQuorum(),
def ctx = new NativeCall.ValidCallContext(1, null, Stub(Multistream), new Selector.UpstreamFilter(Selector.empty), new AlwaysQuorum(),
new NativeCall.ParsedCallDetails("eth_getFilterUpdates", new ListParams("0xabcd")),
new NativeCall.WithFilterIdDecorator(), new NativeCall.NoneResultDecorator(), null, false, "reqId", 1)
when:
Expand Down Expand Up @@ -612,7 +612,7 @@ class NativeCallSpec extends Specification {
1 * read(_) >> Mono.just(new RequestReader.Result("\"0xab\"".bytes, null, 1, new Upstream.UpstreamSettingsData((byte) 255, "", ""), null))
}
}
def call = new NativeCall.ValidCallContext(1, 10, multistream, Selector.empty, quorum,
def call = new NativeCall.ValidCallContext(1, 10, multistream, new Selector.UpstreamFilter(Selector.empty), quorum,
new NativeCall.ParsedCallDetails("eth_getFilterChanges", new ListParams()),
new NativeCall.WithFilterIdDecorator(), new NativeCall.CreateFilterDecorator(), null, false, "reqId", 1)

Expand Down Expand Up @@ -645,7 +645,7 @@ class NativeCallSpec extends Specification {
1 * read(_) >> Mono.just(new RequestReader.Result("\"0xab\"".bytes, null, 1, new Upstream.UpstreamSettingsData((byte) 1, "", ""), null))
}
}
def call = new NativeCall.ValidCallContext(1, 10, multistream, Selector.empty, quorum,
def call = new NativeCall.ValidCallContext(1, 10, multistream, new Selector.UpstreamFilter(Selector.empty), quorum,
new NativeCall.ParsedCallDetails("eth_getFilterChanges", new ListParams()),
new NativeCall.WithFilterIdDecorator(), new NativeCall.CreateFilterDecorator(), null, false, "reqId", 1)

Expand All @@ -667,7 +667,7 @@ class NativeCallSpec extends Specification {

def ctx = new NativeCall.ValidCallContext<NativeCall.ParsedCallDetails>(10, null,
upstream,
Selector.empty, new AlwaysQuorum(),
new Selector.UpstreamFilter(Selector.empty), new AlwaysQuorum(),
new NativeCall.ParsedCallDetails("eth_test", new ListParams()), "reqId", 1)
when:
nativeCall.fetch(ctx)
Expand All @@ -684,7 +684,7 @@ class NativeCallSpec extends Specification {

def ctx = new NativeCall.ValidCallContext<NativeCall.ParsedCallDetails>(10, null,
upstream,
Selector.empty, new AlwaysQuorum(),
new Selector.UpstreamFilter(Selector.empty), new AlwaysQuorum(),
new NativeCall.ParsedCallDetails("eth_test", new ListParams()), "reqId", 1)
when:
def act = nativeCall.fetch(ctx)
Expand Down
Loading

0 comments on commit b468b40

Please sign in to comment.