diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
index d6380d009..358e48d2a 100644
--- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
+++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
@@ -442,7 +442,7 @@ open class NativeCall(
return ctx.upstream.getLocalReader()
.flatMap { api ->
SpannedReader(api, tracer, LOCAL_READER)
- .read(ctx.payload.toChainRequest(ctx.nonce, ctx.forwardedSelector, false))
+ .read(ctx.payload.toChainRequest(ctx.nonce, ctx.forwardedSelector, false, ctx.upstreamFilter.matcher))
.map {
val result = it.getResult()
val resolvedUpstreamData = it.resolvedUpstreamData.ifEmpty {
@@ -799,7 +799,16 @@ open class NativeCall(
selector: BlockchainOuterClass.Selector?,
streamRequest: Boolean,
): ChainRequest {
- return ChainRequest(method, params, nonce, selector, streamRequest)
+ return toChainRequest(nonce, selector, streamRequest, Selector.empty)
+ }
+
+ fun toChainRequest(
+ nonce: Long?,
+ selector: BlockchainOuterClass.Selector?,
+ streamRequest: Boolean,
+ matcher: Selector.Matcher,
+ ): ChainRequest {
+ return ChainRequest(method, params, nonce, selector, streamRequest, matcher)
}
}
}
diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainRequest.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainRequest.kt
index be155b5a5..a7b9cc597 100644
--- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainRequest.kt
+++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainRequest.kt
@@ -30,6 +30,7 @@ data class ChainRequest(
val nonce: Long?,
val selector: BlockchainOuterClass.Selector?,
val isStreamed: Boolean = false,
+ val matcher: Selector.Matcher = Selector.empty,
) {
@JvmOverloads constructor(
@@ -38,7 +39,14 @@ data class ChainRequest(
nonce: Long? = null,
selectors: BlockchainOuterClass.Selector? = null,
isStreamed: Boolean = false,
- ) : this(method, params, 1, nonce, selectors, isStreamed)
+ matcher: Selector.Matcher = Selector.empty,
+ ) : this(method, params, 1, nonce, selectors, isStreamed, matcher)
+
+ constructor(
+ method: String,
+ params: CallParams,
+ matcher: Selector.Matcher,
+ ) : this(method, params, 1, null, null, false, matcher)
fun toJson(): ByteArray {
return params.toJson(id, method)
diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt
index 282bbd4af..8dece4201 100644
--- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt
+++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReader.kt
@@ -15,8 +15,6 @@
*/
package io.emeraldpay.dshackle.upstream.ethereum
-import com.fasterxml.jackson.databind.ObjectMapper
-import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.cache.Caches
import io.emeraldpay.dshackle.cache.CurrentBlockCache
import io.emeraldpay.dshackle.commons.CACHE_BLOCK_BY_HASH_READER
@@ -34,16 +32,15 @@ import io.emeraldpay.dshackle.reader.RekeyingReader
import io.emeraldpay.dshackle.reader.SpannedReader
import io.emeraldpay.dshackle.upstream.CachingReader
import io.emeraldpay.dshackle.upstream.Multistream
+import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.calls.CallMethods
+import io.emeraldpay.dshackle.upstream.ethereum.EthereumDirectReader.Request
import io.emeraldpay.dshackle.upstream.ethereum.EthereumDirectReader.Result
import io.emeraldpay.dshackle.upstream.ethereum.domain.Address
import io.emeraldpay.dshackle.upstream.ethereum.domain.BlockHash
import io.emeraldpay.dshackle.upstream.ethereum.domain.TransactionId
import io.emeraldpay.dshackle.upstream.ethereum.domain.Wei
-import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson
-import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionJsonSnapshot
import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionLogJson
-import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionRefJson
import io.emeraldpay.dshackle.upstream.finalization.FinalizationType
import org.apache.commons.collections4.Factory
import org.springframework.cloud.sleuth.Tracer
@@ -60,50 +57,26 @@ open class EthereumCachingReader(
private val tracer: Tracer,
) : CachingReader {
- private val objectMapper: ObjectMapper = Global.objectMapper
private val balanceCache = CurrentBlockCache
()
private val directReader = EthereumDirectReader(up, caches, balanceCache, callMethodsFactory, tracer)
- private val extractBlock = Function, BlockJson> { result ->
- val block = result.data
- val existing = block.getParsed(BlockJson::class.java)
- if (existing != null) {
- existing.withoutTransactionDetails()
- } else {
- objectMapper
- .readValue(block.json, BlockJson::class.java)
- .withoutTransactionDetails()
- }
- }
-
- private val extractTx = Function, TransactionJsonSnapshot> { result ->
- result.data.getParsed(TransactionJsonSnapshot::class.java)
- ?: objectMapper.readValue(result.data.json, TransactionJsonSnapshot::class.java)
- }
-
- private val idToBlockHash = Function { id -> BlockHash.from(id.value) }
- private val blockHashToId = Function { hash -> BlockId.from(hash) }
-
- private val txHashToId = Function { hash -> TxId.from(hash) }
- private val idToTxHash = Function { id -> TransactionId.from(id.value) }
-
- private val blocksByIdAsCont = CompoundReader(
- SpannedReader(CacheWithUpstreamIdReader(caches.getBlocksByHash()), tracer, CACHE_BLOCK_BY_HASH_READER),
- SpannedReader(RekeyingReader(idToBlockHash, directReader.blockReader), tracer, DIRECT_QUORUM_RPC_READER),
- )
-
open fun blockByFinalization(): Reader> {
return SpannedReader(directReader.blockByFinalizationReader, tracer, DIRECT_QUORUM_RPC_READER)
}
- open fun blocksByIdAsCont(): Reader> {
- return blocksByIdAsCont
+ open fun blocksByIdAsCont(matcher: Selector.Matcher): Reader> {
+ val idToBlockHash = Function> { id -> Request(BlockHash.from(id.value), matcher) }
+ return CompoundReader(
+ SpannedReader(CacheWithUpstreamIdReader(caches.getBlocksByHash()), tracer, CACHE_BLOCK_BY_HASH_READER),
+ SpannedReader(RekeyingReader(idToBlockHash, directReader.blockReader), tracer, DIRECT_QUORUM_RPC_READER),
+ )
}
- open fun blocksByHeightAsCont(): Reader> {
+ open fun blocksByHeightAsCont(matcher: Selector.Matcher): Reader> {
+ val numToRequest = Function> { num -> Request(num, matcher) }
return CompoundReader(
SpannedReader(CacheWithUpstreamIdReader(caches.getBlocksByHeight()), tracer, CACHE_BLOCK_BY_HEIGHT_READER),
- SpannedReader(directReader.blockByHeightReader, tracer, DIRECT_QUORUM_RPC_READER),
+ SpannedReader(RekeyingReader(numToRequest, directReader.blockByHeightReader), tracer, DIRECT_QUORUM_RPC_READER),
)
}
@@ -111,7 +84,8 @@ open class EthereumCachingReader(
return directReader.logsByHashReader
}
- open fun txByHashAsCont(): Reader> {
+ open fun txByHashAsCont(matcher: Selector.Matcher): Reader> {
+ val idToTxHash = Function> { id -> Request(TransactionId.from(id.value), matcher) }
return CompoundReader(
CacheWithUpstreamIdReader(SpannedReader(caches.getTxByHash(), tracer, CACHE_TX_BY_HASH_READER)),
SpannedReader(RekeyingReader(idToTxHash, directReader.txReader), tracer, DIRECT_QUORUM_RPC_READER),
@@ -126,9 +100,9 @@ open class EthereumCachingReader(
)
}
- fun receipts(): Reader> {
+ fun receipts(matcher: Selector.Matcher): Reader> {
val requested = RekeyingReader(
- { txid: TxId -> TransactionId.from(txid.value) },
+ { txid: TxId -> Request(TransactionId.from(txid.value), matcher) },
directReader.receiptReader,
)
return CompoundReader(
diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumDirectReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumDirectReader.kt
index e22240740..68e03a051 100644
--- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumDirectReader.kt
+++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumDirectReader.kt
@@ -61,32 +61,31 @@ class EthereumDirectReader(
private val objectMapper: ObjectMapper = Global.objectMapper
var requestReaderFactory: RequestReaderFactory = RequestReaderFactory.default()
- val blockReader: Reader>
- val blockByHeightReader: Reader>
- val txReader: Reader>
+ val blockReader: Reader, Result>
+ val blockByHeightReader: Reader, Result>
+ val txReader: Reader, Result>
val balanceReader: Reader>
- val receiptReader: Reader>
+ val receiptReader: Reader, Result>
val logsByHashReader: Reader>>
val blockByFinalizationReader: Reader>
init {
- blockReader = object : Reader> {
- override fun read(key: BlockHash): Mono> {
- val request = ChainRequest("eth_getBlockByHash", ListParams(key.toHex(), false))
- return readBlock(request, key.toHex())
+ blockReader = object : Reader, Result> {
+ override fun read(key: Request): Mono> {
+ val request = ChainRequest("eth_getBlockByHash", ListParams(key.requestBy.toHex(), false))
+ return readBlock(request, key.requestBy.toHex(), key.matcher)
}
}
- blockByHeightReader = object : Reader> {
- override fun read(key: Long): Mono> {
- val heightMatcher = Selector.HeightMatcher(key)
- val request = ChainRequest("eth_getBlockByNumber", ListParams(HexQuantity.from(key).toHex(), false))
- return readBlock(request, key.toString(), heightMatcher)
+ blockByHeightReader = object : Reader, Result> {
+ override fun read(key: Request): Mono> {
+ val request = ChainRequest("eth_getBlockByNumber", ListParams(HexQuantity.from(key.requestBy).toHex(), false))
+ return readBlock(request, key.toString(), key.matcher)
}
}
- txReader = object : Reader> {
- override fun read(key: TransactionId): Mono> {
- val request = ChainRequest("eth_getTransactionByHash", ListParams(key.toHex()))
- return readWithQuorum(request) // retries were removed because we use NotNullQuorum which handle errors too
+ txReader = object : Reader, Result> {
+ override fun read(key: Request): Mono> {
+ val request = ChainRequest("eth_getTransactionByHash", ListParams(key.requestBy.toHex()))
+ return readWithQuorum(request, key.matcher) // retries were removed because we use NotNullQuorum which handle errors too
.timeout(Duration.ofSeconds(5), Mono.error(TimeoutException("Tx not read $key")))
.flatMap { result ->
val tx = objectMapper.readValue(result.data, TransactionJsonSnapshot::class.java)
@@ -150,10 +149,10 @@ class EthereumDirectReader(
}
}
- receiptReader = object : Reader> {
- override fun read(key: TransactionId): Mono> {
- val request = ChainRequest("eth_getTransactionReceipt", ListParams(key.toHex()))
- return readWithQuorum(request)
+ receiptReader = object : Reader, Result> {
+ override fun read(key: Request): Mono> {
+ val request = ChainRequest("eth_getTransactionReceipt", ListParams(key.requestBy.toHex()))
+ return readWithQuorum(request, key.matcher)
.timeout(Duration.ofSeconds(5), Mono.error(TimeoutException("Receipt not read $key")))
.flatMap { result ->
val receipt = objectMapper.readValue(result.data, TransactionReceiptJson::class.java)
@@ -164,7 +163,7 @@ class EthereumDirectReader(
caches.cacheReceipt(
Caches.Tag.REQUESTED,
DefaultContainer(
- txId = TxId.from(key),
+ txId = TxId.from(key.requestBy),
blockId = BlockId.from(receipt.blockHash),
height = receipt.blockNumber,
json = result.data,
@@ -252,14 +251,10 @@ class EthereumDirectReader(
): Mono> {
return Mono.just(requestReaderFactory)
.map {
- val requestMatcher = Selector.Builder()
- .withMatcher(matcher)
- .forMethod(request.method)
- .build()
it.create(
RequestReaderFactory.ReaderData(
up,
- Selector.UpstreamFilter(sort, requestMatcher),
+ Selector.UpstreamFilter(sort, matcher),
callMethodsFactory.create().createQuorumFor(request.method),
null,
tracer,
@@ -276,4 +271,9 @@ class EthereumDirectReader(
val data: T,
val resolvedUpstreamData: List,
)
+
+ data class Request(
+ val requestBy: T,
+ val matcher: Selector.Matcher,
+ )
}
diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReader.kt
index df3176bef..d73ad9465 100644
--- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReader.kt
+++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReader.kt
@@ -23,6 +23,7 @@ import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.ChainResponse
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.LogsOracle
+import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.ethereum.hex.HexQuantity
import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException
@@ -87,7 +88,7 @@ class EthereumLocalReader(
} catch (e: IllegalArgumentException) {
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "[0] must be transaction id")
}
- reader.txByHashAsCont()
+ reader.txByHashAsCont(key.matcher)
.read(hash)
.map { ChainResponse(it.data.json, null, it.resolvedUpstreamData) }
}
@@ -106,14 +107,14 @@ class EthereumLocalReader(
if (withTx) {
null
} else {
- reader.blocksByIdAsCont().read(hash).map {
+ reader.blocksByIdAsCont(key.matcher).read(hash).map {
ChainResponse(it.data.json, null, it.resolvedUpstreamData)
}
}
}
method == "eth_getBlockByNumber" -> {
- getBlockByNumber(params.list)
+ getBlockByNumber(params.list, key.matcher)
}
method == "eth_getTransactionReceipt" -> {
@@ -126,7 +127,7 @@ class EthereumLocalReader(
} catch (e: IllegalArgumentException) {
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "[0] must be transaction id")
}
- reader.receipts()
+ reader.receipts(key.matcher)
.read(hash)
.map { ChainResponse(it.data, null, it.resolvedUpstreamData) }
}
@@ -141,7 +142,7 @@ class EthereumLocalReader(
return null
}
- fun getBlockByNumber(params: List): Mono? {
+ fun getBlockByNumber(params: List, matcher: Selector.Matcher): Mono? {
if (params.size != 2 || params[0] == null || params[1] == null) {
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "Must provide 2 parameters")
}
@@ -189,7 +190,7 @@ class EthereumLocalReader(
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "[0] must be a block number")
}
- return reader.blocksByHeightAsCont()
+ return reader.blocksByHeightAsCont(matcher)
.read(number).map { ChainResponse(it.data.json, null, it.resolvedUpstreamData) }
}
diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReaderSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReaderSpec.groovy
index a8749ac1b..9de037b06 100644
--- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReaderSpec.groovy
+++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumCachingReaderSpec.groovy
@@ -94,7 +94,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
- def act = reader.blockReader.read(BlockHash.from(hash1))
+ def act = reader.blockReader.read(new EthereumDirectReader.Request(BlockHash.from(hash1), Selector.empty))
then:
StepVerifier.create(act)
.expectNextMatches { block ->
@@ -122,7 +122,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
- def act = reader.blockReader.read(BlockHash.from(hash1))
+ def act = reader.blockReader.read(new EthereumDirectReader.Request(BlockHash.from(hash1), Selector.empty))
then:
StepVerifier.create(act)
.expectComplete()
@@ -155,7 +155,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
- def act = reader.blockByHeightReader.read(100)
+ def act = reader.blockByHeightReader.read(new EthereumDirectReader.Request(100L, Selector.empty))
then:
StepVerifier.create(act)
.expectNextMatches { block ->
@@ -220,7 +220,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
- def act = reader.txReader.read(TransactionId.from(hash1))
+ def act = reader.txReader.read(new EthereumDirectReader.Request(TransactionId.from(hash1), Selector.empty))
then:
StepVerifier.create(act)
.expectNextMatches { block ->
@@ -253,7 +253,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
- def act = reader.receiptReader.read(TransactionId.from(hash1))
+ def act = reader.receiptReader.read(new EthereumDirectReader.Request(TransactionId.from(hash1), Selector.empty))
.block(Duration.ofSeconds(1))
.with { new String(it.data) }
then:
@@ -287,7 +287,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
- def act = reader.receiptReader.read(TransactionId.from(hash1))
+ def act = reader.receiptReader.read(new EthereumDirectReader.Request(TransactionId.from(hash1), Selector.empty))
.block(Duration.ofSeconds(1))
.with { new String(it.data) }
then:
@@ -312,7 +312,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
- def act = reader.txReader.read(TransactionId.from(hash1))
+ def act = reader.txReader.read(new EthereumDirectReader.Request(TransactionId.from(hash1), Selector.empty))
then:
StepVerifier.create(act)
.expectComplete()
@@ -411,7 +411,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
- def act = ethereumDirectReader.blockReader.read(BlockHash.from(hash1))
+ def act = ethereumDirectReader.blockReader.read(new EthereumDirectReader.Request(BlockHash.from(hash1), Selector.empty))
then:
StepVerifier.create(act)
.expectNextMatches { block ->
@@ -451,7 +451,7 @@ class EthereumDirectReaderSpec extends Specification {
}
}
when:
- def act = ethereumDirectReader.blockByHeightReader.read(100)
+ def act = ethereumDirectReader.blockByHeightReader.read(new EthereumDirectReader.Request(100L, Selector.empty))
then:
StepVerifier.create(act)
.expectNextMatches { block ->
diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReaderSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReaderSpec.groovy
index 28f832cee..54b228a82 100644
--- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReaderSpec.groovy
+++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumLocalReaderSpec.groovy
@@ -8,6 +8,7 @@ import io.emeraldpay.dshackle.reader.Reader
import io.emeraldpay.dshackle.test.TestingCommons
import io.emeraldpay.dshackle.upstream.EmptyHead
import io.emeraldpay.dshackle.upstream.Head
+import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods
import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.finalization.FinalizationData
@@ -69,9 +70,9 @@ class EthereumLocalReaderSpec extends Specification {
1 * getCurrentHeight() >> 101L
}
def reader = Mock(EthereumCachingReader) {
- _ * blocksByIdAsCont() >> new EmptyReader<>()
- _ * txByHashAsCont() >> new EmptyReader<>()
- 1 * blocksByHeightAsCont() >> Mock(Reader) {
+ _ * blocksByIdAsCont(Selector.empty) >> new EmptyReader<>()
+ _ * txByHashAsCont(Selector.empty) >> new EmptyReader<>()
+ 1 * blocksByHeightAsCont(Selector.empty) >> Mock(Reader) {
1 * read(101L) >> Mono.just(
new EthereumDirectReader.Result<>(TestingCommons.blockForEthereum(101L), List.of())
)
@@ -81,7 +82,7 @@ class EthereumLocalReaderSpec extends Specification {
def router = new EthereumLocalReader(reader, methods, head, null)
when:
- def act = router.getBlockByNumber(["latest", false])
+ def act = router.getBlockByNumber(["latest", false], Selector.empty)
then:
act != null
@@ -97,9 +98,9 @@ class EthereumLocalReaderSpec extends Specification {
setup:
def head = Stub(Head) {}
def reader = Mock(EthereumCachingReader) {
- _ * blocksByIdAsCont() >> new EmptyReader<>()
- _ * txByHashAsCont() >> new EmptyReader<>()
- 1 * blocksByHeightAsCont() >> Mock(Reader) {
+ _ * blocksByIdAsCont(Selector.empty) >> new EmptyReader<>()
+ _ * txByHashAsCont(Selector.empty) >> new EmptyReader<>()
+ 1 * blocksByHeightAsCont(Selector.empty) >> Mock(Reader) {
1 * read(0L) >> Mono.just(
new EthereumDirectReader.Result<>(TestingCommons.blockForEthereum(0L), List.of())
)
@@ -109,7 +110,7 @@ class EthereumLocalReaderSpec extends Specification {
def router = new EthereumLocalReader(reader, methods, head, null)
when:
- def act = router.getBlockByNumber(["earliest", false])
+ def act = router.getBlockByNumber(["earliest", false], Selector.empty)
then:
act != null
@@ -125,9 +126,9 @@ class EthereumLocalReaderSpec extends Specification {
setup:
def head = Stub(Head) {}
def reader = Mock(EthereumCachingReader) {
- _ * blocksByIdAsCont() >> new EmptyReader<>()
- _ * txByHashAsCont() >> new EmptyReader<>()
- 1 * blocksByHeightAsCont() >> Mock(Reader) {
+ _ * blocksByIdAsCont(Selector.empty) >> new EmptyReader<>()
+ _ * txByHashAsCont(Selector.empty) >> new EmptyReader<>()
+ 1 * blocksByHeightAsCont(Selector.empty) >> Mock(Reader) {
1 * read(74735L) >> Mono.just(
new EthereumDirectReader.Result<>(TestingCommons.blockForEthereum(74735L), List.of())
)
@@ -137,7 +138,7 @@ class EthereumLocalReaderSpec extends Specification {
def router = new EthereumLocalReader(reader, methods, head, null)
when:
- def act = router.getBlockByNumber(["0x123ef", false])
+ def act = router.getBlockByNumber(["0x123ef", false], Selector.empty)
then:
act != null
@@ -183,15 +184,15 @@ class EthereumLocalReaderSpec extends Specification {
setup:
def head = Mock(Head)
def reader = Mock(EthereumCachingReader) {
- _ * blocksByIdAsCont() >> new EmptyReader<>()
- _ * txByHashAsCont() >> new EmptyReader<>()
- _ * blocksByHeightAsCont() >> new EmptyReader<>()
+ _ * blocksByIdAsCont(Selector.empty) >> new EmptyReader<>()
+ _ * txByHashAsCont(Selector.empty) >> new EmptyReader<>()
+ _ * blocksByHeightAsCont(Selector.empty) >> new EmptyReader<>()
}
def methods = new DefaultEthereumMethods(Chain.ETHEREUM__MAINNET, false)
def router = new EthereumLocalReader(reader, methods, head, null)
when:
- def act = router.getBlockByNumber(["0x0", true])
+ def act = router.getBlockByNumber(["0x0", true], Selector.empty)
then:
act == null
diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy
index 6913efb25..c01a084e4 100644
--- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy
+++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy
@@ -23,6 +23,7 @@ import io.emeraldpay.dshackle.test.GenericUpstreamMock
import io.emeraldpay.dshackle.test.TestingCommons
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.DefaultUpstream
+import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson
import io.emeraldpay.dshackle.upstream.forkchoice.AlwaysForkChoice
import io.emeraldpay.dshackle.upstream.ChainRequest
@@ -132,7 +133,7 @@ class GenericWsHeadSpec extends Specification {
new WsSubscriptions.SubscribeData(Flux.error(new RuntimeException()), "id", new AtomicReference("")),
new WsSubscriptions.SubscribeData(Flux.fromIterable([secondHeadBlock]), "id", new AtomicReference(""))
]
- 1 * it.unsubscribe(new ChainRequest("eth_unsubscribe", new ListParams(""), 2, null, null, false)) >>
+ 1 * it.unsubscribe(new ChainRequest("eth_unsubscribe", new ListParams(""), 2, null, null, false, Selector.empty)) >>
Mono.just(new ChainResponse("".bytes, null))
}
@@ -452,7 +453,7 @@ class GenericWsHeadSpec extends Specification {
1 * it.subscribe(_) >> new WsSubscriptions.SubscribeData(
Flux.error(new RuntimeException()), "id", new AtomicReference(subId)
)
- 1 * it.unsubscribe(new ChainRequest("eth_unsubscribe", new ListParams(subId), 2, null, null, false)) >>
+ 1 * it.unsubscribe(new ChainRequest("eth_unsubscribe", new ListParams(subId), 2, null, null, false, Selector.empty)) >>
Mono.just(new ChainResponse("".bytes, null))
}
diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplSpec.groovy
index 0bb9acc56..5ba90d3be 100644
--- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplSpec.groovy
+++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplSpec.groovy
@@ -21,6 +21,7 @@ import io.emeraldpay.dshackle.test.GenericUpstreamMock
import io.emeraldpay.dshackle.test.TestingCommons
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.ChainRequest
+import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.rpcclient.ListParams
import io.emeraldpay.dshackle.upstream.ethereum.domain.TransactionId
import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcResponseError
@@ -59,7 +60,7 @@ class WsConnectionImplSpec extends Specification {
when:
Flux.from(ws.handle(wsApiMock.inbound, wsApiMock.outbound)).subscribe()
- def act = ws.callRpc(new ChainRequest("eth_getTransactionByHash", new ListParams(["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"]), 15, null, null, false))
+ def act = ws.callRpc(new ChainRequest("eth_getTransactionByHash", new ListParams(["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"]), 15, null, null, false, Selector.empty))
then:
StepVerifier.create(act)
@@ -91,7 +92,7 @@ class WsConnectionImplSpec extends Specification {
when:
Flux.from(ws.handle(wsApiMock.inbound, wsApiMock.outbound)).subscribe()
- def act = ws.callRpc(new ChainRequest("eth_getTransactionByHash", new ListParams(["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"]), 15, null, null, false))
+ def act = ws.callRpc(new ChainRequest("eth_getTransactionByHash", new ListParams(["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"]), 15, null, null, false, Selector.empty))
then:
StepVerifier.create(act)
@@ -125,7 +126,7 @@ class WsConnectionImplSpec extends Specification {
when:
Flux.from(ws.handle(wsApiMock.inbound, wsApiMock.outbound)).subscribe()
- def act = ws.callRpc(new ChainRequest("eth_getTransactionByHash", new ListParams(["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"]), 15, null, null, false))
+ def act = ws.callRpc(new ChainRequest("eth_getTransactionByHash", new ListParams(["0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200"]), 15, null, null, false, Selector.empty))
then:
StepVerifier.create(act)