Skip to content

Commit

Permalink
Pass all matchers to direct reader (#581)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Oct 23, 2024
1 parent 7503107 commit 697db61
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 107 deletions.
13 changes: 11 additions & 2 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ open class NativeCall(
return ctx.upstream.getLocalReader()
.flatMap { api ->
SpannedReader(api, tracer, LOCAL_READER)
.read(ctx.payload.toChainRequest(ctx.nonce, ctx.forwardedSelector, false))
.read(ctx.payload.toChainRequest(ctx.nonce, ctx.forwardedSelector, false, ctx.upstreamFilter.matcher))
.map {
val result = it.getResult()
val resolvedUpstreamData = it.resolvedUpstreamData.ifEmpty {
Expand Down Expand Up @@ -799,7 +799,16 @@ open class NativeCall(
selector: BlockchainOuterClass.Selector?,
streamRequest: Boolean,
): ChainRequest {
return ChainRequest(method, params, nonce, selector, streamRequest)
return toChainRequest(nonce, selector, streamRequest, Selector.empty)
}

fun toChainRequest(
nonce: Long?,
selector: BlockchainOuterClass.Selector?,
streamRequest: Boolean,
matcher: Selector.Matcher,
): ChainRequest {
return ChainRequest(method, params, nonce, selector, streamRequest, matcher)
}
}
}
10 changes: 9 additions & 1 deletion src/main/kotlin/io/emeraldpay/dshackle/upstream/ChainRequest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ data class ChainRequest(
val nonce: Long?,
val selector: BlockchainOuterClass.Selector?,
val isStreamed: Boolean = false,
val matcher: Selector.Matcher = Selector.empty,
) {

@JvmOverloads constructor(
Expand All @@ -38,7 +39,14 @@ data class ChainRequest(
nonce: Long? = null,
selectors: BlockchainOuterClass.Selector? = null,
isStreamed: Boolean = false,
) : this(method, params, 1, nonce, selectors, isStreamed)
matcher: Selector.Matcher = Selector.empty,
) : this(method, params, 1, nonce, selectors, isStreamed, matcher)

constructor(
method: String,
params: CallParams,
matcher: Selector.Matcher,
) : this(method, params, 1, null, null, false, matcher)

fun toJson(): ByteArray {
return params.toJson(id, method)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.emeraldpay.dshackle.upstream.ethereum

import com.fasterxml.jackson.databind.ObjectMapper
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.cache.Caches
import io.emeraldpay.dshackle.cache.CurrentBlockCache
import io.emeraldpay.dshackle.commons.CACHE_BLOCK_BY_HASH_READER
Expand All @@ -34,16 +32,15 @@ import io.emeraldpay.dshackle.reader.RekeyingReader
import io.emeraldpay.dshackle.reader.SpannedReader
import io.emeraldpay.dshackle.upstream.CachingReader
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.ethereum.EthereumDirectReader.Request
import io.emeraldpay.dshackle.upstream.ethereum.EthereumDirectReader.Result
import io.emeraldpay.dshackle.upstream.ethereum.domain.Address
import io.emeraldpay.dshackle.upstream.ethereum.domain.BlockHash
import io.emeraldpay.dshackle.upstream.ethereum.domain.TransactionId
import io.emeraldpay.dshackle.upstream.ethereum.domain.Wei
import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson
import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionJsonSnapshot
import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionLogJson
import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionRefJson
import io.emeraldpay.dshackle.upstream.finalization.FinalizationType
import org.apache.commons.collections4.Factory
import org.springframework.cloud.sleuth.Tracer
Expand All @@ -60,58 +57,35 @@ open class EthereumCachingReader(
private val tracer: Tracer,
) : CachingReader {

private val objectMapper: ObjectMapper = Global.objectMapper
private val balanceCache = CurrentBlockCache<Address, Wei>()
private val directReader = EthereumDirectReader(up, caches, balanceCache, callMethodsFactory, tracer)

private val extractBlock = Function<Result<BlockContainer>, BlockJson<TransactionRefJson>> { result ->
val block = result.data
val existing = block.getParsed(BlockJson::class.java)
if (existing != null) {
existing.withoutTransactionDetails()
} else {
objectMapper
.readValue(block.json, BlockJson::class.java)
.withoutTransactionDetails()
}
}

private val extractTx = Function<Result<TxContainer>, TransactionJsonSnapshot> { result ->
result.data.getParsed(TransactionJsonSnapshot::class.java)
?: objectMapper.readValue(result.data.json, TransactionJsonSnapshot::class.java)
}

private val idToBlockHash = Function<BlockId, BlockHash> { id -> BlockHash.from(id.value) }
private val blockHashToId = Function<BlockHash, BlockId> { hash -> BlockId.from(hash) }

private val txHashToId = Function<TransactionId, TxId> { hash -> TxId.from(hash) }
private val idToTxHash = Function<TxId, TransactionId> { id -> TransactionId.from(id.value) }

private val blocksByIdAsCont = CompoundReader(
SpannedReader(CacheWithUpstreamIdReader(caches.getBlocksByHash()), tracer, CACHE_BLOCK_BY_HASH_READER),
SpannedReader(RekeyingReader(idToBlockHash, directReader.blockReader), tracer, DIRECT_QUORUM_RPC_READER),
)

open fun blockByFinalization(): Reader<FinalizationType, Result<BlockContainer>> {
return SpannedReader(directReader.blockByFinalizationReader, tracer, DIRECT_QUORUM_RPC_READER)
}

open fun blocksByIdAsCont(): Reader<BlockId, Result<BlockContainer>> {
return blocksByIdAsCont
open fun blocksByIdAsCont(matcher: Selector.Matcher): Reader<BlockId, Result<BlockContainer>> {
val idToBlockHash = Function<BlockId, Request<BlockHash>> { id -> Request(BlockHash.from(id.value), matcher) }
return CompoundReader(
SpannedReader(CacheWithUpstreamIdReader(caches.getBlocksByHash()), tracer, CACHE_BLOCK_BY_HASH_READER),
SpannedReader(RekeyingReader(idToBlockHash, directReader.blockReader), tracer, DIRECT_QUORUM_RPC_READER),
)
}

open fun blocksByHeightAsCont(): Reader<Long, Result<BlockContainer>> {
open fun blocksByHeightAsCont(matcher: Selector.Matcher): Reader<Long, Result<BlockContainer>> {
val numToRequest = Function<Long, Request<Long>> { num -> Request(num, matcher) }
return CompoundReader(
SpannedReader(CacheWithUpstreamIdReader(caches.getBlocksByHeight()), tracer, CACHE_BLOCK_BY_HEIGHT_READER),
SpannedReader(directReader.blockByHeightReader, tracer, DIRECT_QUORUM_RPC_READER),
SpannedReader(RekeyingReader(numToRequest, directReader.blockByHeightReader), tracer, DIRECT_QUORUM_RPC_READER),
)
}

open fun logsByHash(): Reader<BlockId, Result<List<TransactionLogJson>>> {
return directReader.logsByHashReader
}

open fun txByHashAsCont(): Reader<TxId, Result<TxContainer>> {
open fun txByHashAsCont(matcher: Selector.Matcher): Reader<TxId, Result<TxContainer>> {
val idToTxHash = Function<TxId, Request<TransactionId>> { id -> Request(TransactionId.from(id.value), matcher) }
return CompoundReader(
CacheWithUpstreamIdReader(SpannedReader(caches.getTxByHash(), tracer, CACHE_TX_BY_HASH_READER)),
SpannedReader(RekeyingReader(idToTxHash, directReader.txReader), tracer, DIRECT_QUORUM_RPC_READER),
Expand All @@ -126,9 +100,9 @@ open class EthereumCachingReader(
)
}

fun receipts(): Reader<TxId, Result<ByteArray>> {
fun receipts(matcher: Selector.Matcher): Reader<TxId, Result<ByteArray>> {
val requested = RekeyingReader(
{ txid: TxId -> TransactionId.from(txid.value) },
{ txid: TxId -> Request(TransactionId.from(txid.value), matcher) },
directReader.receiptReader,
)
return CompoundReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,32 +61,31 @@ class EthereumDirectReader(
private val objectMapper: ObjectMapper = Global.objectMapper
var requestReaderFactory: RequestReaderFactory = RequestReaderFactory.default()

val blockReader: Reader<BlockHash, Result<BlockContainer>>
val blockByHeightReader: Reader<Long, Result<BlockContainer>>
val txReader: Reader<TransactionId, Result<TxContainer>>
val blockReader: Reader<Request<BlockHash>, Result<BlockContainer>>
val blockByHeightReader: Reader<Request<Long>, Result<BlockContainer>>
val txReader: Reader<Request<TransactionId>, Result<TxContainer>>
val balanceReader: Reader<Address, Result<Wei>>
val receiptReader: Reader<TransactionId, Result<ByteArray>>
val receiptReader: Reader<Request<TransactionId>, Result<ByteArray>>
val logsByHashReader: Reader<BlockId, Result<List<TransactionLogJson>>>
val blockByFinalizationReader: Reader<FinalizationType, Result<BlockContainer>>

init {
blockReader = object : Reader<BlockHash, Result<BlockContainer>> {
override fun read(key: BlockHash): Mono<Result<BlockContainer>> {
val request = ChainRequest("eth_getBlockByHash", ListParams(key.toHex(), false))
return readBlock(request, key.toHex())
blockReader = object : Reader<Request<BlockHash>, Result<BlockContainer>> {
override fun read(key: Request<BlockHash>): Mono<Result<BlockContainer>> {
val request = ChainRequest("eth_getBlockByHash", ListParams(key.requestBy.toHex(), false))
return readBlock(request, key.requestBy.toHex(), key.matcher)
}
}
blockByHeightReader = object : Reader<Long, Result<BlockContainer>> {
override fun read(key: Long): Mono<Result<BlockContainer>> {
val heightMatcher = Selector.HeightMatcher(key)
val request = ChainRequest("eth_getBlockByNumber", ListParams(HexQuantity.from(key).toHex(), false))
return readBlock(request, key.toString(), heightMatcher)
blockByHeightReader = object : Reader<Request<Long>, Result<BlockContainer>> {
override fun read(key: Request<Long>): Mono<Result<BlockContainer>> {
val request = ChainRequest("eth_getBlockByNumber", ListParams(HexQuantity.from(key.requestBy).toHex(), false))
return readBlock(request, key.toString(), key.matcher)
}
}
txReader = object : Reader<TransactionId, Result<TxContainer>> {
override fun read(key: TransactionId): Mono<Result<TxContainer>> {
val request = ChainRequest("eth_getTransactionByHash", ListParams(key.toHex()))
return readWithQuorum(request) // retries were removed because we use NotNullQuorum which handle errors too
txReader = object : Reader<Request<TransactionId>, Result<TxContainer>> {
override fun read(key: Request<TransactionId>): Mono<Result<TxContainer>> {
val request = ChainRequest("eth_getTransactionByHash", ListParams(key.requestBy.toHex()))
return readWithQuorum(request, key.matcher) // retries were removed because we use NotNullQuorum which handle errors too
.timeout(Duration.ofSeconds(5), Mono.error(TimeoutException("Tx not read $key")))
.flatMap { result ->
val tx = objectMapper.readValue(result.data, TransactionJsonSnapshot::class.java)
Expand Down Expand Up @@ -150,10 +149,10 @@ class EthereumDirectReader(
}
}

receiptReader = object : Reader<TransactionId, Result<ByteArray>> {
override fun read(key: TransactionId): Mono<Result<ByteArray>> {
val request = ChainRequest("eth_getTransactionReceipt", ListParams(key.toHex()))
return readWithQuorum(request)
receiptReader = object : Reader<Request<TransactionId>, Result<ByteArray>> {
override fun read(key: Request<TransactionId>): Mono<Result<ByteArray>> {
val request = ChainRequest("eth_getTransactionReceipt", ListParams(key.requestBy.toHex()))
return readWithQuorum(request, key.matcher)
.timeout(Duration.ofSeconds(5), Mono.error(TimeoutException("Receipt not read $key")))
.flatMap { result ->
val receipt = objectMapper.readValue(result.data, TransactionReceiptJson::class.java)
Expand All @@ -164,7 +163,7 @@ class EthereumDirectReader(
caches.cacheReceipt(
Caches.Tag.REQUESTED,
DefaultContainer(
txId = TxId.from(key),
txId = TxId.from(key.requestBy),
blockId = BlockId.from(receipt.blockHash),
height = receipt.blockNumber,
json = result.data,
Expand Down Expand Up @@ -252,14 +251,10 @@ class EthereumDirectReader(
): Mono<Result<ByteArray>> {
return Mono.just(requestReaderFactory)
.map {
val requestMatcher = Selector.Builder()
.withMatcher(matcher)
.forMethod(request.method)
.build()
it.create(
RequestReaderFactory.ReaderData(
up,
Selector.UpstreamFilter(sort, requestMatcher),
Selector.UpstreamFilter(sort, matcher),
callMethodsFactory.create().createQuorumFor(request.method),
null,
tracer,
Expand All @@ -276,4 +271,9 @@ class EthereumDirectReader(
val data: T,
val resolvedUpstreamData: List<Upstream.UpstreamSettingsData>,
)

data class Request<T>(
val requestBy: T,
val matcher: Selector.Matcher,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.emeraldpay.dshackle.upstream.ChainRequest
import io.emeraldpay.dshackle.upstream.ChainResponse
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.LogsOracle
import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.ethereum.hex.HexQuantity
import io.emeraldpay.dshackle.upstream.ethereum.rpc.RpcException
Expand Down Expand Up @@ -87,7 +88,7 @@ class EthereumLocalReader(
} catch (e: IllegalArgumentException) {
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "[0] must be transaction id")
}
reader.txByHashAsCont()
reader.txByHashAsCont(key.matcher)
.read(hash)
.map { ChainResponse(it.data.json, null, it.resolvedUpstreamData) }
}
Expand All @@ -106,14 +107,14 @@ class EthereumLocalReader(
if (withTx) {
null
} else {
reader.blocksByIdAsCont().read(hash).map {
reader.blocksByIdAsCont(key.matcher).read(hash).map {
ChainResponse(it.data.json, null, it.resolvedUpstreamData)
}
}
}

method == "eth_getBlockByNumber" -> {
getBlockByNumber(params.list)
getBlockByNumber(params.list, key.matcher)
}

method == "eth_getTransactionReceipt" -> {
Expand All @@ -126,7 +127,7 @@ class EthereumLocalReader(
} catch (e: IllegalArgumentException) {
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "[0] must be transaction id")
}
reader.receipts()
reader.receipts(key.matcher)
.read(hash)
.map { ChainResponse(it.data, null, it.resolvedUpstreamData) }
}
Expand All @@ -141,7 +142,7 @@ class EthereumLocalReader(
return null
}

fun getBlockByNumber(params: List<Any?>): Mono<ChainResponse>? {
fun getBlockByNumber(params: List<Any?>, matcher: Selector.Matcher): Mono<ChainResponse>? {
if (params.size != 2 || params[0] == null || params[1] == null) {
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "Must provide 2 parameters")
}
Expand Down Expand Up @@ -189,7 +190,7 @@ class EthereumLocalReader(
throw RpcException(RpcResponseError.CODE_INVALID_METHOD_PARAMS, "[0] must be a block number")
}

return reader.blocksByHeightAsCont()
return reader.blocksByHeightAsCont(matcher)
.read(number).map { ChainResponse(it.data.json, null, it.resolvedUpstreamData) }
}

Expand Down
Loading

0 comments on commit 697db61

Please sign in to comment.