Skip to content

Commit

Permalink
Add BroadcastReader instead of BroadcastQuorum (#263)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Aug 1, 2023
1 parent 6f576fc commit 674c69f
Show file tree
Hide file tree
Showing 12 changed files with 497 additions and 207 deletions.
5 changes: 2 additions & 3 deletions src/main/kotlin/io/emeraldpay/dshackle/commons/constatnts.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ const val SPAN_STATUS_MESSAGE = "status.message"
const val SPAN_READER_RESULT = "reader.result"
const val SPAN_REQUEST_API_TYPE = "request.api.type"
const val SPAN_REQUEST_UPSTREAM_ID = "request.upstreamId"
const val SPAN_RESPONSE_UPSTREAM_ID = "response.upstreamId"
const val SPAN_REQUEST_ID = "request.id"
const val SPAN_NO_RESPONSE_MESSAGE = "no-response.message"

const val LOCAL_READER = "localReader"
const val REMOTE_QUORUM_RPC_READER = "remoteQuorumRpcReader"
const val RPC_READER = "rpcReader"
const val API_READER = "apiReader"
const val CACHE_BLOCK_BY_HASH_READER = "cacheBlockByHashReader"
const val DIRECT_QUORUM_RPC_READER = "directQuorumRpcReader"
const val CACHE_HEIGHT_BY_HASH_READER = "cacheHeightByHashReader"
const val CACHE_BLOCK_BY_HEIGHT_READER = "cacheBlockByHeightReader"
const val CACHE_TX_BY_HASH_READER = "cacheTxByHashReader"
const val CACHE_RECEIPTS_READER = "cacheReceiptsReader"
const val BROADCAST_READER = "broadcastReader"

This file was deleted.

32 changes: 6 additions & 26 deletions src/main/kotlin/io/emeraldpay/dshackle/quorum/QuorumRpcReader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import io.emeraldpay.dshackle.commons.API_READER
import io.emeraldpay.dshackle.commons.SPAN_NO_RESPONSE_MESSAGE
import io.emeraldpay.dshackle.commons.SPAN_REQUEST_API_TYPE
import io.emeraldpay.dshackle.commons.SPAN_REQUEST_UPSTREAM_ID
import io.emeraldpay.dshackle.reader.RpcReader
import io.emeraldpay.dshackle.reader.SpannedReader
import io.emeraldpay.dshackle.upstream.ApiSource
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcError
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
Expand All @@ -47,9 +47,9 @@ import java.util.function.Function
class QuorumRpcReader(
private val apiControl: ApiSource,
private val quorum: CallQuorum,
private val signer: ResponseSigner?,
signer: ResponseSigner?,
private val tracer: Tracer
) : QuorumReader {
) : RpcReader(signer) {

companion object {
private val log = LoggerFactory.getLogger(QuorumRpcReader::class.java)
Expand Down Expand Up @@ -158,12 +158,7 @@ class QuorumRpcReader(
private fun withSignatureAndUpstream(api: Upstream, key: JsonRpcRequest, response: JsonRpcResponse): Function<Mono<ByteArray>, Mono<Tuple2<ByteArray, Optional<ResponseSigner.Signature>>>> {
return Function { src ->
src.map {
val signature = response.providedSignature
?: if (key.nonce != null) {
signer?.sign(key.nonce, response.getResult(), api.getId())
} else {
null
}
val signature = getSignature(key, response, api.getId())
Tuples.of(it, Optional.ofNullable(signature))
}
}
Expand All @@ -176,14 +171,7 @@ class QuorumRpcReader(
// when the call failed with an error we want to notify the quorum because
// it may use the error message or other details
//
val cleanErr: JsonRpcException = when (err) {
is RpcException -> JsonRpcException.from(err)
is JsonRpcException -> err
else -> JsonRpcException(
JsonRpcResponse.NumberId(key.id),
JsonRpcError(-32603, "Unhandled internal error: ${err.javaClass}: ${err.message}")
)
}
val cleanErr: JsonRpcException = getError(key, err)
quorum.record(cleanErr, null, api,)
// if it's failed after that, then we don't need more calls, stop api source
if (quorum.isFailed()) {
Expand All @@ -202,8 +190,7 @@ class QuorumRpcReader(
return Mono.just(quorum).flatMap { q ->
if (q.isFailed()) {
val resolvedBy = resolvedBy()?.getId()
val err = q.getError()?.asException(JsonRpcResponse.NumberId(key.id), resolvedBy)
?: JsonRpcException(JsonRpcResponse.NumberId(key.id), JsonRpcError(-32603, "Unhandled Upstream error"), resolvedBy)
val err = handleError(q.getError(), key.id, resolvedBy)
log.warn("Quorum is failed. Method ${key.method}, message ${err.message}")
Mono.error(err)
} else {
Expand All @@ -229,11 +216,4 @@ class QuorumRpcReader(
}
} ?: Mono.error(RpcException(1, "Quorum [$q] is not resolved [isResolved - ${q.isResolved()}]"))
}

class Result(
val value: ByteArray,
val signature: ResponseSigner.Signature?,
val quorum: Int,
val resolvedBy: Upstream?
)
}
118 changes: 118 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/reader/BroadcastReader.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.emeraldpay.dshackle.reader

import io.emeraldpay.dshackle.commons.BROADCAST_READER
import io.emeraldpay.dshackle.commons.SPAN_REQUEST_UPSTREAM_ID
import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcError
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import io.emeraldpay.dshackle.upstream.signature.ResponseSigner
import org.slf4j.LoggerFactory
import org.springframework.cloud.sleuth.Tracer
import reactor.core.publisher.Mono
import java.util.concurrent.atomic.AtomicInteger

class BroadcastReader(
private val upstreams: List<Upstream>,
matcher: Selector.Matcher,
signer: ResponseSigner?,
private val tracer: Tracer
) : RpcReader(signer) {
private val internalMatcher = Selector.MultiMatcher(
listOf(Selector.AvailabilityMatcher(), matcher)
)

companion object {
private val log = LoggerFactory.getLogger(BroadcastReader::class.java)
}

override fun attempts(): AtomicInteger {
return AtomicInteger(1)
}

override fun read(key: JsonRpcRequest): Mono<Result> {
return Mono.just(upstreams)
.map { ups ->
ups.filter { internalMatcher.matches(it) }.map { execute(key, it) }
}.flatMap {
Mono.zip(it) { responses ->
analyzeResponses(
key,
getJsonRpcResponses(key.method, responses)
)
}.onErrorResume { err ->
log.error("Broadcast error: ${err.message}")
Mono.error(handleError(null, 0, null))
}.flatMap { broadcastResult ->
if (broadcastResult.result != null) {
Mono.just(
Result(broadcastResult.result, broadcastResult.signature, 0, null)
)
} else {
val err = handleError(broadcastResult.error, key.id, null)
Mono.error(err)
}
}
}
}

private fun analyzeResponses(key: JsonRpcRequest, jsonRpcResponses: List<BroadcastResponse>): BroadcastResult {
val errors = mutableListOf<JsonRpcError>()
jsonRpcResponses.forEach {
val response = it.jsonRpcResponse
if (response.hasResult()) {
val signature = getSignature(key, response, it.upstreamId)
return BroadcastResult(response.getResult(), null, signature)
} else if (response.hasError()) {
errors.add(response.error!!)
}
}

val error = errors.takeIf { it.isNotEmpty() }?.get(0)

return BroadcastResult(error)
}

private fun getJsonRpcResponses(method: String, responses: Array<Any>) =
responses
.map { response ->
(response as BroadcastResponse)
.also { r ->
if (r.jsonRpcResponse.hasResult()) {
log.info(
"Response for $method from upstream ${r.upstreamId}: ${String(r.jsonRpcResponse.getResult())}"
)
}
}
}

private fun execute(
key: JsonRpcRequest,
upstream: Upstream
): Mono<BroadcastResponse> =
SpannedReader(
upstream.getIngressReader(), tracer, BROADCAST_READER, mapOf(SPAN_REQUEST_UPSTREAM_ID to upstream.getId())
)
.read(key)
.map { BroadcastResponse(it, upstream.getId()) }
.onErrorResume {
log.warn("Error during execution ${key.method} from upstream ${upstream.getId()} with message - ${it.message}")
Mono.just(
BroadcastResponse(JsonRpcResponse(null, getError(key, it).error), upstream.getId())
)
}

private class BroadcastResponse(
val jsonRpcResponse: JsonRpcResponse,
val upstreamId: String
)

private class BroadcastResult(
val result: ByteArray?,
val error: JsonRpcError?,
val signature: ResponseSigner.Signature?
) {
constructor(error: JsonRpcError?) : this(null, error, null)
}
}
81 changes: 81 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/reader/RpcReaderFactory.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.emeraldpay.dshackle.reader

import io.emeraldpay.dshackle.quorum.CallQuorum
import io.emeraldpay.dshackle.quorum.QuorumRpcReader
import io.emeraldpay.dshackle.reader.RpcReader.Result
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcError
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import io.emeraldpay.dshackle.upstream.signature.ResponseSigner
import io.emeraldpay.etherjar.rpc.RpcException
import org.springframework.cloud.sleuth.Tracer
import java.util.concurrent.atomic.AtomicInteger

abstract class RpcReader(
private val signer: ResponseSigner?,
) : Reader<JsonRpcRequest, Result> {
abstract fun attempts(): AtomicInteger

protected fun getError(key: JsonRpcRequest, err: Throwable) =
when (err) {
is RpcException -> JsonRpcException.from(err)
is JsonRpcException -> err
else -> JsonRpcException(
JsonRpcResponse.NumberId(key.id),
JsonRpcError(-32603, "Unhandled internal error: ${err.javaClass}: ${err.message}")
)
}

protected fun handleError(error: JsonRpcError?, id: Int, resolvedBy: String?) =
error?.asException(JsonRpcResponse.NumberId(id), resolvedBy)
?: JsonRpcException(JsonRpcResponse.NumberId(id), JsonRpcError(-32603, "Unhandled Upstream error"), resolvedBy)

protected fun getSignature(key: JsonRpcRequest, response: JsonRpcResponse, upstreamId: String) =
response.providedSignature
?: if (key.nonce != null) {
signer?.sign(key.nonce, response.getResult(), upstreamId)
} else {
null
}

class Result(
val value: ByteArray,
val signature: ResponseSigner.Signature?,
val quorum: Int,
val resolvedBy: Upstream?
)
}

interface RpcReaderFactory {

companion object {
fun default(): RpcReaderFactory {
return Default()
}
}

fun create(data: RpcReaderData): RpcReader

class Default : RpcReaderFactory {
override fun create(data: RpcReaderData): RpcReader {
if (data.method == "eth_sendRawTransaction") {
return BroadcastReader(data.multistream.getAll(), data.matcher, data.signer, data.tracer)
}
val apis = data.multistream.getApiSource(data.matcher)
return QuorumRpcReader(apis, data.quorum, data.signer, data.tracer)
}
}

data class RpcReaderData(
val multistream: Multistream,
val method: String,
val matcher: Selector.Matcher,
val quorum: CallQuorum,
val signer: ResponseSigner?,
val tracer: Tracer
)
}
Loading

0 comments on commit 674c69f

Please sign in to comment.