From 697db618b7750d6ec2cc9d4efad63553d967508a Mon Sep 17 00:00:00 2001
From: KirillPamPam <kirill.mai308@gmail.com>
Date: Wed, 23 Oct 2024 12:43:33 +0400
Subject: [PATCH] Pass all matchers to direct reader (#581)

---
 .../io/emeraldpay/dshackle/rpc/NativeCall.kt  | 13 ++++-
 .../dshackle/upstream/ChainRequest.kt         | 10 +++-
 .../ethereum/EthereumCachingReader.kt         | 56 +++++--------------
 .../upstream/ethereum/EthereumDirectReader.kt | 54 +++++++++---------
 .../upstream/ethereum/EthereumLocalReader.kt  | 13 +++--
 .../ethereum/EthereumCachingReaderSpec.groovy | 18 +++---
 .../ethereum/EthereumLocalReaderSpec.groovy   | 33 +++++------
 .../ethereum/GenericWsHeadSpec.groovy         |  5 +-
 .../ethereum/WsConnectionImplSpec.groovy      |  7 ++-
 9 files changed, 102 insertions(+), 107 deletions(-)

diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
index d6380d009..358e48d2a 100644
--- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
+++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
@@ -442,7 +442,7 @@ open class NativeCall(
         return ctx.upstream.getLocalReader()
             .flatMap { api ->
                 SpannedReader(api, tracer, LOCAL_READER)
-                    .read(ctx.payload.toChainRequest(ctx.nonce, ctx.forwardedSelector, false))
+                    .read(ctx.payload.toChainRequest(ctx.nonce, ctx.forwardedSelector, false, ctx.upstreamFilter.matcher))
                     .map {
                         val result = it.getResult()
                         val resolvedUpstreamData = it.resolvedUpstreamData.ifEmpty {
@@ -799,7 +799,16 @@ open class NativeCall(
             selector: BlockchainOuterClass.Selector?,
             streamRequest: Boolean,
         ): ChainRequest {
-            return ChainRequest(method, params, nonce, selector, streamRequest)
+            return toChainRequest(nonce, selector, streamRequest, Selector.empty)
+        }
+
+        fun toChainRequest(
+            nonce: Long?,
+            selector: BlockchainOuterClass.Selector?,
+            streamRequest: Boolean,
+            matcher: Selector.Matcher,
+        ): ChainRequest {
+            return ChainRequest(method, params, nonce, selector, streamRequest, matcher)
         }
     }
 }
diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainRequest.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainRequest.kt
index be155b5a5..a7b9cc597 100644
--- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainRequest.kt
+++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainRequest.kt
@@ -30,6 +30,7 @@ data class ChainRequest(
     val nonce: Long?,
     val selector: BlockchainOuterClass.Selector?,
     val isStreamed: Boolean = false,
+    val matcher: Selector.Matcher = Selector.empty,
 ) {
 
     @JvmOverloads constructor(
@@ -38,7 +39,14 @@ data class ChainRequest(
         nonce: Long? = null,
         selectors: BlockchainOuterClass.Selector? = null,
         isStreamed: Boolean = false,
-    ) : this(method, params, 1, nonce, selectors, isStreamed)
+        matcher: Selector.Matcher = Selector.empty,
+    ) : this(method, params, 1, nonce, selectors, isStreamed, matcher)
+
+    constructor(
+        method: String,
+        params: CallParams,
+        matcher: Selector.Matcher,
+    ) : this(method, params, 1, null, null, false, matcher)
 
     fun toJson(): ByteArray {
         return params.toJson(id, method)
diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt
index 282bbd4af..8dece4201 100644
--- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt
+++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt
@@ -15,8 +15,6 @@
  */
 package io.emeraldpay.dshackle.upstream.ethereum
 
-import com.fasterxml.jackson.databind.ObjectMapper
-import io.emeraldpay.dshackle.Global
 import io.emeraldpay.dshackle.cache.Caches
 import io.emeraldpay.dshackle.cache.CurrentBlockCache
 import io.emeraldpay.dshackle.commons.CACHE_BLOCK_BY_HASH_READER
@@ -34,16 +32,15 @@ import io.emeraldpay.dshackle.reader.RekeyingReader
 import io.emeraldpay.dshackle.reader.SpannedReader
 import io.emeraldpay.dshackle.upstream.CachingReader
 import io.emeraldpay.dshackle.upstream.Multistream
+import io.emeraldpay.dshackle.upstream.Selector
 import io.emeraldpay.dshackle.upstream.calls.CallMethods
+import io.emeraldpay.dshackle.upstream.ethereum.EthereumDirectReader.Request
 import io.emeraldpay.dshackle.upstream.ethereum.EthereumDirectReader.Result
 import io.emeraldpay.dshackle.upstream.ethereum.domain.Address
 import io.emeraldpay.dshackle.upstream.ethereum.domain.BlockHash
 import io.emeraldpay.dshackle.upstream.ethereum.domain.TransactionId
 import io.emeraldpay.dshackle.upstream.ethereum.domain.Wei
-import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson
-import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionJsonSnapshot
 import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionLogJson
-import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionRefJson
 import io.emeraldpay.dshackle.upstream.finalization.FinalizationType
 import org.apache.commons.collections4.Factory
 import org.springframework.cloud.sleuth.Tracer
@@ -60,50 +57,26 @@ open class EthereumCachingReader(
     private val tracer: Tracer,
 ) : CachingReader {
 
-    private val objectMapper: ObjectMapper = Global.objectMapper
     private val balanceCache = CurrentBlockCache<Address, Wei>()
     private val directReader = EthereumDirectReader(up, caches, balanceCache, callMethodsFactory, tracer)
 
-    private val extractBlock = Function<Result<BlockContainer>, BlockJson<TransactionRefJson>> { result ->
-        val block = result.data
-        val existing = block.getParsed(BlockJson::class.java)
-        if (existing != null) {
-            existing.withoutTransactionDetails()
-        } else {
-            objectMapper
-                .readValue(block.json, BlockJson::class.java)
-                .withoutTransactionDetails()
-        }
-    }
-
-    private val extractTx = Function<Result<TxContainer>, TransactionJsonSnapshot> { result ->
-        result.data.getParsed(TransactionJsonSnapshot::class.java)
-            ?: objectMapper.readValue(result.data.json, TransactionJsonSnapshot::class.java)
-    }
-
-    private val idToBlockHash = Function<BlockId, BlockHash> { id -> BlockHash.from(id.value) }
-    private val blockHashToId = Function<BlockHash, BlockId> { hash -> BlockId.from(hash) }
-
-    private val txHashToId = Function<TransactionId, TxId> { hash -> TxId.from(hash) }
-    private val idToTxHash = Function<TxId, TransactionId> { id -> TransactionId.from(id.value) }
-
-    private val blocksByIdAsCont = CompoundReader(
-        SpannedReader(CacheWithUpstreamIdReader(caches.getBlocksByHash()), tracer, CACHE_BLOCK_BY_HASH_READER),
-        SpannedReader(RekeyingReader(idToBlockHash, directReader.blockReader), tracer, DIRECT_QUORUM_RPC_READER),
-    )
-
     open fun blockByFinalization(): Reader<FinalizationType, Result<BlockContainer>> {
         return SpannedReader(directReader.blockByFinalizationReader, tracer, DIRECT_QUORUM_RPC_READER)
     }
 
-    open fun blocksByIdAsCont(): Reader<BlockId, Result<BlockContainer>> {
-        return blocksByIdAsCont
+    open fun blocksByIdAsCont(matcher: Selector.Matcher): Reader<BlockId, Result<BlockContainer>> {
+        val idToBlockHash = Function<BlockId, Request<BlockHash>> { id -> Request(BlockHash.from(id.value), matcher) }
+        return CompoundReader(
+            SpannedReader(CacheWithUpstreamIdReader(caches.getBlocksByHash()), tracer, CACHE_BLOCK_BY_HASH_READER),
+            SpannedReader(RekeyingReader(idToBlockHash, directReader.blockReader), tracer, DIRECT_QUORUM_RPC_READER),
+        )
     }
 
-    open fun blocksByHeightAsCont(): Reader<Long, Result<BlockContainer>> {
+    open fun blocksByHeightAsCont(matcher: Selector.Matcher): Reader<Long, Result<BlockContainer>> {
+        val numToRequest = Function<Long, Request<Long>> { num -> Request(num, matcher) }
         return CompoundReader(
             SpannedReader(CacheWithUpstreamIdReader(caches.getBlocksByHeight()), tracer, CACHE_BLOCK_BY_HEIGHT_READER),
-            SpannedReader(directReader.blockByHeightReader, tracer, DIRECT_QUORUM_RPC_READER),
+            SpannedReader(RekeyingReader(numToRequest, directReader.blockByHeightReader), tracer, DIRECT_QUORUM_RPC_READER),
         )
     }
 
@@ -111,7 +84,8 @@ open class EthereumCachingReader(
         return directReader.logsByHashReader
     }
 
-    open fun txByHashAsCont(): Reader<TxId, Result<TxContainer>> {
+    open fun txByHashAsCont(matcher: Selector.Matcher): Reader<TxId, Result<TxContainer>> {
+        val idToTxHash = Function<TxId, Request<TransactionId>> { id -> Request(TransactionId.from(id.value), matcher) }
         return CompoundReader(
             CacheWithUpstreamIdReader(SpannedReader(caches.getTxByHash(), tracer, CACHE_TX_BY_HASH_READER)),
             SpannedReader(RekeyingReader(idToTxHash, directReader.txReader), tracer, DIRECT_QUORUM_RPC_READER),
@@ -126,9 +100,9 @@ open class EthereumCachingReader(
         )
     }
 
-    fun receipts(): Reader<TxId, Result<ByteArray>> {
+    fun receipts(matcher: Selector.Matcher): Reader<TxId, Result<ByteArray>> {
         val requested = RekeyingReader(
-            { txid: TxId -> TransactionId.from(txid.value) },
+            { txid: TxId -> Request(TransactionId.from(txid.value), matcher) },
             directReader.receiptReader,
         )
         return CompoundReader(
diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumDirectReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumDirectReader.kt
index e22240740..68e03a051 100644
--- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumDirectReader.kt
+++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumDirectReader.kt
@@ -61,32 +61,31 @@ class EthereumDirectReader(
     private val objectMapper: ObjectMapper = Global.objectMapper
     var requestReaderFactory: RequestReaderFactory = RequestReaderFactory.default()
 
-    val blockReader: Reader<BlockHash, Result<BlockContainer>>
-    val blockByHeightReader: Reader<Long, Result<BlockContainer>>
-    val txReader: Reader<TransactionId, Result<TxContainer>>
+    val blockReader: Reader<Request<BlockHash>, Result<BlockContainer>>
+    val blockByHeightReader: Reader<Request<Long>, Result<BlockContainer>>
+    val txReader: Reader<Request<TransactionId>, Result<TxContainer>>
     val balanceReader: Reader<Address, Result<Wei>>
-    val receiptReader: Reader<TransactionId, Result<ByteArray>>
+    val receiptReader: Reader<Request<TransactionId>, Result<ByteArray>>
     val logsByHashReader: Reader<BlockId, Result<List<TransactionLogJson>>>
     val blockByFinalizationReader: Reader<FinalizationType, Result<BlockContainer>>
 
     init {
-        blockReader = object : Reader<BlockHash, Result<BlockContainer>> {
-            override fun read(key: BlockHash): Mono<Result<BlockContainer>> {
-                val request = ChainRequest("eth_getBlockByHash", ListParams(key.toHex(), false))
-                return readBlock(request, key.toHex())
+        blockReader = object : Reader<Request<BlockHash>, Result<BlockContainer>> {
+            override fun read(key: Request<BlockHash>): Mono<Result<BlockContainer>> {
+                val request = ChainRequest("eth_getBlockByHash", ListParams(key.requestBy.toHex(), false))
+                return readBlock(request, key.requestBy.toHex(), key.matcher)
             }
         }
-        blockByHeightReader = object : Reader<Long, Result<BlockContainer>> {
-            override fun read(key: Long): Mono<Result<BlockContainer>> {
-                val heightMatcher = Selector.HeightMatcher(key)
-                val request = ChainRequest("eth_getBlockByNumber", ListParams(HexQuantity.from(key).toHex(), false))
-                return readBlock(request, key.toString(), heightMatcher)
+        blockByHeightReader = object : Reader<Request<Long>, Result<BlockContainer>> {
+            override fun read(key: Request<Long>): Mono<Result<BlockContainer>> {
+                val request = ChainRequest("eth_getBlockByNumber", ListParams(HexQuantity.from(key.requestBy).toHex(), false))
+                return readBlock(request, key.toString(), key.matcher)
             }
         }
-        txReader = object : Reader<TransactionId, Result<TxContainer>> {
-            override fun read(key: TransactionId): Mono<Result<TxContainer>> {
-                val request = ChainRequest("eth_getTransactionByHash", ListParams(key.toHex()))
-                return readWithQuorum(request) // retries were removed because we use NotNullQuorum which handle errors too
+        txReader = object : Reader<Request<TransactionId>, Result<TxContainer>> {
+            override fun read(key: Request<TransactionId>): Mono<Result<TxContainer>> {
+                val request = ChainRequest("eth_getTransactionByHash", ListParams(key.requestBy.toHex()))
+                return readWithQuorum(request, key.matcher) // retries were removed because we use NotNullQuorum which handle errors too
                     .timeout(Duration.ofSeconds(5), Mono.error(TimeoutException("Tx not read $key")))
                     .flatMap { result ->
                         val tx = objectMapper.readValue(result.data, TransactionJsonSnapshot::class.java)
@@ -150,10 +149,10 @@ class EthereumDirectReader(
             }
         }
 
-        receiptReader = object : Reader<TransactionId, Result<ByteArray>> {
-            override fun read(key: TransactionId): Mono<Result<ByteArray>> {
-                val request = ChainRequest("eth_getTransactionReceipt", ListParams(key.toHex()))
-                return readWithQuorum(request)
+        receiptReader = object : Reader<Request<TransactionId>, Result<ByteArray>> {
+            override fun read(key: Request<TransactionId>): Mono<Result<ByteArray>> {
+                val request = ChainRequest("eth_getTransactionReceipt", ListParams(key.requestBy.toHex()))
+                return readWithQuorum(request, key.matcher)
                     .timeout(Duration.ofSeconds(5), Mono.error(TimeoutException("Receipt not read $key")))
                     .flatMap { result ->
                         val receipt = objectMapper.readValue(result.data, TransactionReceiptJson::class.java)
@@ -164,7 +163,7 @@ class EthereumDirectReader(
                             caches.cacheReceipt(
                                 Caches.Tag.REQUESTED,
                                 DefaultContainer(
-                                    txId = TxId.from(key),
+                                    txId = TxId.from(key.requestBy),
                                     blockId = BlockId.from(receipt.blockHash),
                                     height = receipt.blockNumber,
                                     json = result.data,
@@ -252,14 +251,10 @@ class EthereumDirectReader(
     ): Mono<Result<ByteArray>> {
         return Mono.just(requestReaderFactory)
             .map {
-                val requestMatcher = Selector.Builder()
-                    .withMatcher(matcher)
-                    .forMethod(request.method)
-                    .build()
                 it.create(
                     RequestReaderFactory.ReaderData(
                         up,
-                        Selector.UpstreamFilter(sort, requestMatcher),
+                        Selector.UpstreamFilter(sort, matcher),
                         callMethodsFactory.create().createQuorumFor(request.method),
                         null,
                         tracer,
@@ -276,4 +271,9 @@ class EthereumDirectReader(
         val data: T,
         val resolvedUpstreamData: List<Upstream.UpstreamSettingsData>,
     )
+
+    data class Request<T>(
+        val requestBy: T,
+        val matcher: Selector.Matcher,
+    )
 }
diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReader.kt
index df3176bef..d73ad9465 100644
--- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReader.kt
+++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReader.kt
@@ -23,6 +23,7 @@ import io.emeraldpay.dshackle.upstream.ChainRequest
 import io.emeraldpay.dshackle.upstream.ChainResponse
 import io.emeraldpay.dshackle.upstream.Head
 import io.emeraldpay.dshackle.upstream.LogsOracle
+import io.emeraldpay.dshackle.upstream.Selector
 import io.emeraldpay.dshackle.upstream.calls.CallMethods
 import io.emeraldpay.dshackle.upstream.ethereum.hex.HexQuantity
 import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException
@@ -87,7 +88,7 @@ class EthereumLocalReader(
                     } catch (e: IllegalArgumentException) {
                         throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "[0] must be transaction id")
                     }
-                    reader.txByHashAsCont()
+                    reader.txByHashAsCont(key.matcher)
                         .read(hash)
                         .map { ChainResponse(it.data.json, null, it.resolvedUpstreamData) }
                 }
@@ -106,14 +107,14 @@ class EthereumLocalReader(
                     if (withTx) {
                         null
                     } else {
-                        reader.blocksByIdAsCont().read(hash).map {
+                        reader.blocksByIdAsCont(key.matcher).read(hash).map {
                             ChainResponse(it.data.json, null, it.resolvedUpstreamData)
                         }
                     }
                 }
 
                 method == "eth_getBlockByNumber" -> {
-                    getBlockByNumber(params.list)
+                    getBlockByNumber(params.list, key.matcher)
                 }
 
                 method == "eth_getTransactionReceipt" -> {
@@ -126,7 +127,7 @@ class EthereumLocalReader(
                     } catch (e: IllegalArgumentException) {
                         throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "[0] must be transaction id")
                     }
-                    reader.receipts()
+                    reader.receipts(key.matcher)
                         .read(hash)
                         .map { ChainResponse(it.data, null, it.resolvedUpstreamData) }
                 }
@@ -141,7 +142,7 @@ class EthereumLocalReader(
         return null
     }
 
-    fun getBlockByNumber(params: List<Any?>): Mono<ChainResponse>? {
+    fun getBlockByNumber(params: List<Any?>, matcher: Selector.Matcher): Mono<ChainResponse>? {
         if (params.size != 2 || params[0] == null || params[1] == null) {
             throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "Must provide 2 parameters")
         }
@@ -189,7 +190,7 @@ class EthereumLocalReader(
             throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "[0] must be a block number")
         }
 
-        return reader.blocksByHeightAsCont()
+        return reader.blocksByHeightAsCont(matcher)
             .read(number).map { ChainResponse(it.data.json, null, it.resolvedUpstreamData) }
     }
 
diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReaderSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReaderSpec.groovy
index a8749ac1b..9de037b06 100644
--- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReaderSpec.groovy
+++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReaderSpec.groovy
@@ -94,7 +94,7 @@ class EthereumDirectReaderSpec extends Specification {
             }
         }
         when:
-        def act = reader.blockReader.read(BlockHash.from(hash1))
+        def act = reader.blockReader.read(new EthereumDirectReader.Request<BlockHash>(BlockHash.from(hash1), Selector.empty))
         then:
         StepVerifier.create(act)
                 .expectNextMatches { block ->
@@ -122,7 +122,7 @@ class EthereumDirectReaderSpec extends Specification {
             }
         }
         when:
-        def act = reader.blockReader.read(BlockHash.from(hash1))
+        def act = reader.blockReader.read(new EthereumDirectReader.Request<BlockHash>(BlockHash.from(hash1), Selector.empty))
         then:
         StepVerifier.create(act)
                 .expectComplete()
@@ -155,7 +155,7 @@ class EthereumDirectReaderSpec extends Specification {
             }
         }
         when:
-        def act = reader.blockByHeightReader.read(100)
+        def act = reader.blockByHeightReader.read(new EthereumDirectReader.Request<Long>(100L, Selector.empty))
         then:
         StepVerifier.create(act)
                 .expectNextMatches { block ->
@@ -220,7 +220,7 @@ class EthereumDirectReaderSpec extends Specification {
             }
         }
         when:
-        def act = reader.txReader.read(TransactionId.from(hash1))
+        def act = reader.txReader.read(new EthereumDirectReader.Request<TransactionId>(TransactionId.from(hash1), Selector.empty))
         then:
         StepVerifier.create(act)
                 .expectNextMatches { block ->
@@ -253,7 +253,7 @@ class EthereumDirectReaderSpec extends Specification {
             }
         }
         when:
-        def act = reader.receiptReader.read(TransactionId.from(hash1))
+        def act = reader.receiptReader.read(new EthereumDirectReader.Request<TransactionId>(TransactionId.from(hash1), Selector.empty))
             .block(Duration.ofSeconds(1))
             .with { new String(it.data) }
         then:
@@ -287,7 +287,7 @@ class EthereumDirectReaderSpec extends Specification {
             }
         }
         when:
-        def act = reader.receiptReader.read(TransactionId.from(hash1))
+        def act = reader.receiptReader.read(new EthereumDirectReader.Request<TransactionId>(TransactionId.from(hash1), Selector.empty))
                 .block(Duration.ofSeconds(1))
                 .with { new String(it.data) }
         then:
@@ -312,7 +312,7 @@ class EthereumDirectReaderSpec extends Specification {
             }
         }
         when:
-        def act = reader.txReader.read(TransactionId.from(hash1))
+        def act = reader.txReader.read(new EthereumDirectReader.Request<TransactionId>(TransactionId.from(hash1), Selector.empty))
         then:
         StepVerifier.create(act)
                 .expectComplete()
@@ -411,7 +411,7 @@ class EthereumDirectReaderSpec extends Specification {
             }
         }
         when:
-        def act = ethereumDirectReader.blockReader.read(BlockHash.from(hash1))
+        def act = ethereumDirectReader.blockReader.read(new EthereumDirectReader.Request<BlockHash>(BlockHash.from(hash1), Selector.empty))
         then:
         StepVerifier.create(act)
                 .expectNextMatches { block ->
@@ -451,7 +451,7 @@ class EthereumDirectReaderSpec extends Specification {
             }
         }
         when:
-        def act = ethereumDirectReader.blockByHeightReader.read(100)
+        def act = ethereumDirectReader.blockByHeightReader.read(new EthereumDirectReader.Request<Long>(100L, Selector.empty))
         then:
         StepVerifier.create(act)
                 .expectNextMatches { block ->
diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReaderSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReaderSpec.groovy
index 28f832cee..54b228a82 100644
--- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReaderSpec.groovy
+++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReaderSpec.groovy
@@ -8,6 +8,7 @@ import io.emeraldpay.dshackle.reader.Reader
 import io.emeraldpay.dshackle.test.TestingCommons
 import io.emeraldpay.dshackle.upstream.EmptyHead
 import io.emeraldpay.dshackle.upstream.Head
+import io.emeraldpay.dshackle.upstream.Selector
 import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods
 import io.emeraldpay.dshackle.upstream.ChainRequest
 import io.emeraldpay.dshackle.upstream.finalization.FinalizationData
@@ -69,9 +70,9 @@ class EthereumLocalReaderSpec extends Specification {
             1 * getCurrentHeight() >> 101L
         }
         def reader = Mock(EthereumCachingReader) {
-            _ * blocksByIdAsCont() >> new EmptyReader<>()
-            _ * txByHashAsCont() >> new EmptyReader<>()
-            1 * blocksByHeightAsCont() >> Mock(Reader) {
+            _ * blocksByIdAsCont(Selector.empty) >> new EmptyReader<>()
+            _ * txByHashAsCont(Selector.empty) >> new EmptyReader<>()
+            1 * blocksByHeightAsCont(Selector.empty) >> Mock(Reader) {
                 1 * read(101L) >> Mono.just(
                         new EthereumDirectReader.Result<>(TestingCommons.blockForEthereum(101L), List.of())
                 )
@@ -81,7 +82,7 @@ class EthereumLocalReaderSpec extends Specification {
         def router = new EthereumLocalReader(reader, methods, head, null)
 
         when:
-        def act = router.getBlockByNumber(["latest", false])
+        def act = router.getBlockByNumber(["latest", false], Selector.empty)
 
         then:
         act != null
@@ -97,9 +98,9 @@ class EthereumLocalReaderSpec extends Specification {
         setup:
         def head = Stub(Head) {}
         def reader = Mock(EthereumCachingReader) {
-            _ * blocksByIdAsCont() >> new EmptyReader<>()
-            _ * txByHashAsCont() >> new EmptyReader<>()
-            1 * blocksByHeightAsCont() >> Mock(Reader) {
+            _ * blocksByIdAsCont(Selector.empty) >> new EmptyReader<>()
+            _ * txByHashAsCont(Selector.empty) >> new EmptyReader<>()
+            1 * blocksByHeightAsCont(Selector.empty) >> Mock(Reader) {
                 1 * read(0L) >> Mono.just(
                         new EthereumDirectReader.Result<>(TestingCommons.blockForEthereum(0L), List.of())
                 )
@@ -109,7 +110,7 @@ class EthereumLocalReaderSpec extends Specification {
         def router = new EthereumLocalReader(reader, methods, head, null)
 
         when:
-        def act = router.getBlockByNumber(["earliest", false])
+        def act = router.getBlockByNumber(["earliest", false], Selector.empty)
 
         then:
         act != null
@@ -125,9 +126,9 @@ class EthereumLocalReaderSpec extends Specification {
         setup:
         def head = Stub(Head) {}
         def reader = Mock(EthereumCachingReader) {
-            _ * blocksByIdAsCont() >> new EmptyReader<>()
-            _ * txByHashAsCont() >> new EmptyReader<>()
-            1 * blocksByHeightAsCont() >> Mock(Reader) {
+            _ * blocksByIdAsCont(Selector.empty) >> new EmptyReader<>()
+            _ * txByHashAsCont(Selector.empty) >> new EmptyReader<>()
+            1 * blocksByHeightAsCont(Selector.empty) >> Mock(Reader) {
                 1 * read(74735L) >> Mono.just(
                         new EthereumDirectReader.Result<>(TestingCommons.blockForEthereum(74735L), List.of())
                 )
@@ -137,7 +138,7 @@ class EthereumLocalReaderSpec extends Specification {
         def router = new EthereumLocalReader(reader, methods, head, null)
 
         when:
-        def act = router.getBlockByNumber(["0x123ef", false])
+        def act = router.getBlockByNumber(["0x123ef", false], Selector.empty)
 
         then:
         act != null
@@ -183,15 +184,15 @@ class EthereumLocalReaderSpec extends Specification {
         setup:
         def head = Mock(Head)
         def reader = Mock(EthereumCachingReader) {
-            _ * blocksByIdAsCont() >> new EmptyReader<>()
-            _ * txByHashAsCont() >> new EmptyReader<>()
-            _ * blocksByHeightAsCont() >> new EmptyReader<>()
+            _ * blocksByIdAsCont(Selector.empty) >> new EmptyReader<>()
+            _ * txByHashAsCont(Selector.empty) >> new EmptyReader<>()
+            _ * blocksByHeightAsCont(Selector.empty) >> new EmptyReader<>()
         }
         def methods = new DefaultEthereumMethods(Chain.ETHEREUM__MAINNET, false)
         def router = new EthereumLocalReader(reader, methods, head, null)
 
         when:
-        def act = router.getBlockByNumber(["0x0", true])
+        def act = router.getBlockByNumber(["0x0", true], Selector.empty)
 
         then:
         act == null
diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy
index 6913efb25..c01a084e4 100644
--- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy
+++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy
@@ -23,6 +23,7 @@ import io.emeraldpay.dshackle.test.GenericUpstreamMock
 import io.emeraldpay.dshackle.test.TestingCommons
 import io.emeraldpay.dshackle.upstream.BlockValidator
 import io.emeraldpay.dshackle.upstream.DefaultUpstream
+import io.emeraldpay.dshackle.upstream.Selector
 import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson
 import io.emeraldpay.dshackle.upstream.forkchoice.AlwaysForkChoice
 import io.emeraldpay.dshackle.upstream.ChainRequest
@@ -132,7 +133,7 @@ class GenericWsHeadSpec extends Specification {
                     new WsSubscriptions.SubscribeData(Flux.error(new RuntimeException()), "id", new AtomicReference<String>("")),
                     new WsSubscriptions.SubscribeData(Flux.fromIterable([secondHeadBlock]), "id", new AtomicReference<String>(""))
             ]
-            1 * it.unsubscribe(new ChainRequest("eth_unsubscribe", new ListParams(""), 2, null, null, false)) >>
+            1 * it.unsubscribe(new ChainRequest("eth_unsubscribe", new ListParams(""), 2, null, null, false, Selector.empty)) >>
                     Mono.just(new ChainResponse("".bytes, null))
         }
 
@@ -452,7 +453,7 @@ class GenericWsHeadSpec extends Specification {
             1 * it.subscribe(_) >> new WsSubscriptions.SubscribeData(
                     Flux.error(new RuntimeException()), "id", new AtomicReference<String>(subId)
             )
-            1 * it.unsubscribe(new ChainRequest("eth_unsubscribe", new ListParams(subId), 2, null, null, false)) >>
+            1 * it.unsubscribe(new ChainRequest("eth_unsubscribe", new ListParams(subId), 2, null, null, false, Selector.empty)) >>
                     Mono.just(new ChainResponse("".bytes, null))
         }
 
diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplSpec.groovy
index 0bb9acc56..5ba90d3be 100644
--- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplSpec.groovy
+++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplSpec.groovy
@@ -21,6 +21,7 @@ import io.emeraldpay.dshackle.test.GenericUpstreamMock
 import io.emeraldpay.dshackle.test.TestingCommons
 import io.emeraldpay.dshackle.upstream.DefaultUpstream
 import io.emeraldpay.dshackle.upstream.ChainRequest
+import io.emeraldpay.dshackle.upstream.Selector
 import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
 import io.emeraldpay.dshackle.upstream.ethereum.domain.TransactionId
 import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcResponseError
@@ -59,7 +60,7 @@ class WsConnectionImplSpec extends Specification {
 
         when:
         Flux.from(ws.handle(wsApiMock.inbound, wsApiMock.outbound)).subscribe()
-        def act = ws.callRpc(new ChainRequest("eth_getTransactionByHash", new ListParams(["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"]), 15, null, null, false))
+        def act = ws.callRpc(new ChainRequest("eth_getTransactionByHash", new ListParams(["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"]), 15, null, null, false, Selector.empty))
 
         then:
         StepVerifier.create(act)
@@ -91,7 +92,7 @@ class WsConnectionImplSpec extends Specification {
 
         when:
         Flux.from(ws.handle(wsApiMock.inbound, wsApiMock.outbound)).subscribe()
-        def act = ws.callRpc(new ChainRequest("eth_getTransactionByHash", new ListParams(["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"]), 15, null, null, false))
+        def act = ws.callRpc(new ChainRequest("eth_getTransactionByHash", new ListParams(["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"]), 15, null, null, false, Selector.empty))
 
         then:
         StepVerifier.create(act)
@@ -125,7 +126,7 @@ class WsConnectionImplSpec extends Specification {
 
         when:
         Flux.from(ws.handle(wsApiMock.inbound, wsApiMock.outbound)).subscribe()
-        def act = ws.callRpc(new ChainRequest("eth_getTransactionByHash", new ListParams(["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"]), 15, null, null, false))
+        def act = ws.callRpc(new ChainRequest("eth_getTransactionByHash", new ListParams(["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"]), 15, null, null, false, Selector.empty))
 
         then:
         StepVerifier.create(act)