Skip to content

Commit

Permalink
problem: doesn't pass error details from upstream
Browse files Browse the repository at this point in the history
fix: #42
  • Loading branch information
splix committed Aug 14, 2020
1 parent 14f106d commit dee17bc
Show file tree
Hide file tree
Showing 30 changed files with 325 additions and 103 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ dependencies {
testImplementation "io.projectreactor:reactor-test:$reactorVersion"
testImplementation 'org.objenesis:objenesis:3.1'
testImplementation 'org.mock-server:mockserver-netty:5.10'
testImplementation "nl.jqno.equalsverifier:equalsverifier:3.3"
}

compileKotlin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class ProxyServer(
.addAllItems(call.items)
.build()
val jsons = nativeCall
.nativeCall(Mono.just(request))
.nativeCallResult(Mono.just(request))
.transform(writeRpcJson.toJsons(call))
return if (call.type == ProxyCall.RpcType.SINGLE) {
jsons.next()
Expand Down
19 changes: 12 additions & 7 deletions src/main/kotlin/io/emeraldpay/dshackle/proxy/WriteRpcJson.kt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ open class WriteRpcJson() {
/**
* Convert Dshackle protobuf based responses to JSON RPC formatted as strings
*/
open fun toJsons(call: ProxyCall): Function<Flux<BlockchainOuterClass.NativeCallReplyItem>, Flux<String>> {
open fun toJsons(call: ProxyCall): Function<Flux<NativeCall.CallResult>, Flux<String>> {
return Function { flux ->
flux
.flatMap { response ->
Expand Down Expand Up @@ -70,19 +70,24 @@ open class WriteRpcJson() {
}
}

open fun toJson(call: ProxyCall, response: BlockchainOuterClass.NativeCallReplyItem): String? {
val id = call.ids[response.id] ?: return null;
val json = if (response.succeed) {
JsonRpcResponse.ok(response.payload.toByteArray(), JsonRpcResponse.Id.from(id))
open fun toJson(call: ProxyCall, response: NativeCall.CallResult): String? {
val id = call.ids[response.id]?.let {
JsonRpcResponse.Id.from(it)
} ?: return null;
val json = if (response.isError()) {
val error = response.error!!
error.upstreamError?.let { upstreamError ->
JsonRpcResponse.error(upstreamError, id)
} ?: JsonRpcResponse.error(-32002, error.message, id)
} else {
JsonRpcResponse.error(-32002, response.errorMessage, JsonRpcResponse.Id.from(id))
JsonRpcResponse.ok(response.result!!, id)
}
return objectMapper.writeValueAsString(json)
}

fun toJson(call: ProxyCall, error: NativeCall.CallFailure): String? {
val id = call.ids[error.id] ?: return null;
val json = JsonRpcResponse.error(-32002, error.reason.message ?: "", JsonRpcResponse.Id.from(id))
val json = JsonRpcResponse.error(-32003, error.reason.message ?: "", JsonRpcResponse.Id.from(id))
return objectMapper.writeValueAsString(json)
}

Expand Down
10 changes: 9 additions & 1 deletion src/main/kotlin/io/emeraldpay/dshackle/quorum/AlwaysQuorum.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ package io.emeraldpay.dshackle.quorum

import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcError
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException
import io.infinitape.etherjar.rpc.RpcException

open class AlwaysQuorum: CallQuorum {

private var resolved = false
private var result: ByteArray? = null
private var rpcError: JsonRpcError? = null

override fun init(head: Head) {
}
Expand All @@ -42,10 +45,15 @@ open class AlwaysQuorum: CallQuorum {
return true
}

override fun record(error: RpcException, upstream: Upstream) {
override fun record(error: JsonRpcException, upstream: Upstream) {
this.rpcError = error.error
}

override fun getResult(): ByteArray? {
return result
}

override fun getError(): JsonRpcError? {
return rpcError
}
}
5 changes: 4 additions & 1 deletion src/main/kotlin/io/emeraldpay/dshackle/quorum/CallQuorum.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package io.emeraldpay.dshackle.quorum

import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcError
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException
import io.infinitape.etherjar.domain.TransactionId
import io.infinitape.etherjar.rpc.RpcException
import io.infinitape.etherjar.rpc.json.BlockJson
Expand All @@ -34,8 +36,9 @@ interface CallQuorum {
fun isFailed(): Boolean

fun record(response: ByteArray, upstream: Upstream): Boolean
fun record(error: RpcException, upstream: Upstream)
fun record(error: JsonRpcException, upstream: Upstream)
fun getResult(): ByteArray?
fun getError(): JsonRpcError?

companion object {
fun untilResolved(cq: CallQuorum): Predicate<Any> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package io.emeraldpay.dshackle.quorum
import com.fasterxml.jackson.databind.ObjectMapper
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException
import io.infinitape.etherjar.rpc.JacksonRpcConverter
import io.infinitape.etherjar.rpc.RpcException

Expand Down Expand Up @@ -55,7 +56,7 @@ open class NonEmptyQuorum(
tries++
}

override fun record(error: RpcException, upstream: Upstream) {
override fun record(error: JsonRpcException, upstream: Upstream) {
tries++
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ package io.emeraldpay.dshackle.quorum

import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcError
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException
import io.infinitape.etherjar.rpc.RpcException
import java.util.concurrent.atomic.AtomicReference

class NotLaggingQuorum(val maxLag: Long = 0): CallQuorum {

private val result: AtomicReference<ByteArray> = AtomicReference()
private val failed = AtomicReference(false)
private var rpcError: JsonRpcError? = null

override fun init(head: Head) {
}
Expand All @@ -46,7 +49,8 @@ class NotLaggingQuorum(val maxLag: Long = 0): CallQuorum {
return false
}

override fun record(error: RpcException, upstream: Upstream) {
override fun record(error: JsonRpcException, upstream: Upstream) {
this.rpcError = error.error
val lagging = upstream.getLag() > maxLag
if (!lagging && result.get() == null) {
failed.set(true)
Expand All @@ -56,4 +60,8 @@ class NotLaggingQuorum(val maxLag: Long = 0): CallQuorum {
override fun getResult(): ByteArray {
return result.get()
}

override fun getError(): JsonRpcError? {
return rpcError
}
}
16 changes: 12 additions & 4 deletions src/main/kotlin/io/emeraldpay/dshackle/quorum/QuorumRpcReader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package io.emeraldpay.dshackle.quorum

import io.emeraldpay.dshackle.reader.Reader
import io.emeraldpay.dshackle.upstream.ApiSource
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import io.infinitape.etherjar.rpc.RpcException
Expand Down Expand Up @@ -56,8 +57,10 @@ class QuorumRpcReader(

val defaultResult: Mono<Result> = Mono.just(quorum).flatMap { q ->
if (q.isFailed()) {
//TODO record and return actual error details
Mono.error<Result>(RpcException(-32000, "Upstream error"))
Mono.error<Result>(
q.getError()?.asException(JsonRpcResponse.IntId(1))
?: RpcException(-32000, "Unknown Upstream error")
)
} else {
log.warn("Empty result for ${key.method} as ${q}")
Mono.empty<Result>()
Expand All @@ -74,9 +77,14 @@ class QuorumRpcReader(
.flatMap { response ->
response.requireResult()
.onErrorResume { err ->
if (err is RpcException) {
if (err is RpcException || err is JsonRpcException) {
// on error notify quorum, it may use error message or other details
quorum.record(err, api)
val cleanErr: JsonRpcException = when (err) {
is RpcException -> JsonRpcException.from(err)
is JsonRpcException -> err
else -> throw IllegalStateException("Cannot convert from exception", err)
}
quorum.record(cleanErr, api)
// it it's failed after that, then we don't need more calls, stop api source
if (quorum.isFailed()) {
apis.resolve()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package io.emeraldpay.dshackle.quorum
import com.fasterxml.jackson.databind.ObjectMapper
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcError
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException
import io.infinitape.etherjar.rpc.JacksonRpcConverter
import io.infinitape.etherjar.rpc.RpcException
import org.slf4j.LoggerFactory
Expand All @@ -28,6 +30,7 @@ abstract class ValueAwareQuorum<T>(
): CallQuorum {

private val log = LoggerFactory.getLogger(ValueAwareQuorum::class.java)
private var rpcError: JsonRpcError? = null

fun extractValue(response: ByteArray, clazz: Class<T>): T? {
return Global.objectMapper.readValue(response.inputStream(), clazz)
Expand All @@ -45,12 +48,16 @@ abstract class ValueAwareQuorum<T>(
return isResolved();
}

override fun record(error: RpcException, upstream: Upstream) {
recordError(null, error.rpcMessage, upstream)
override fun record(error: JsonRpcException, upstream: Upstream) {
this.rpcError = error.error
recordError(null, error.error.message, upstream)
}

abstract fun recordValue(response: ByteArray, responseValue: T?, upstream: Upstream)

abstract fun recordError(response: ByteArray?, errorMessage: String?, upstream: Upstream)

override fun getError(): JsonRpcError? {
return rpcError
}
}
21 changes: 14 additions & 7 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeCall.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import io.emeraldpay.dshackle.upstream.*
import io.emeraldpay.dshackle.quorum.AlwaysQuorum
import io.emeraldpay.dshackle.quorum.CallQuorum
import io.emeraldpay.dshackle.quorum.QuorumReaderFactory
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcError
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import io.emeraldpay.grpc.Chain
Expand All @@ -48,6 +50,12 @@ open class NativeCall(
var quorumReaderFactory: QuorumReaderFactory = QuorumReaderFactory.default()

open fun nativeCall(requestMono: Mono<BlockchainOuterClass.NativeCallRequest>): Flux<BlockchainOuterClass.NativeCallReplyItem> {
return nativeCallResult(requestMono)
.map(this::buildResponse)
.onErrorResume(this::processException)
}

open fun nativeCallResult(requestMono: Mono<BlockchainOuterClass.NativeCallRequest>): Flux<CallResult> {
return requestMono.flatMapMany(this::prepareCall)
.map(this::parseParams)
.parallel()
Expand All @@ -56,8 +64,6 @@ open class NativeCall(
.doOnError { e -> log.warn("Error during native call: ${e.message}") }
}
.sequential()
.map(this::buildResponse)
.onErrorResume(this::processException)
}

fun parseParams(it: CallContext<RawCallDetails>): CallContext<ParsedCallDetails> {
Expand Down Expand Up @@ -201,13 +207,14 @@ open class NativeCall(

open class CallFailure(val id: Int, val reason: Throwable) : Exception("Failed to call $id: ${reason.message}")

open class CallError(val id: Int, val message: String) {
open class CallError(val id: Int, val message: String, val upstreamError: JsonRpcError?) {
companion object {
fun from(t: Throwable): CallError {
return when (t) {
is RpcException -> CallError(t.code, t.rpcMessage)
is CallFailure -> CallError(t.id, t.reason.message ?: "Upstream Error")
else -> CallError(1, t.message ?: "Upstream Error")
is JsonRpcException -> CallError(t.id.asInt(), t.error.message, t.error)
is RpcException -> CallError(t.code, t.rpcMessage, null)
is CallFailure -> CallError(t.id, t.reason.message ?: "Upstream Error", null)
else -> CallError(1, t.message ?: "Upstream Error", null)
}
}
}
Expand All @@ -220,7 +227,7 @@ open class NativeCall(
}

fun fail(id: Int, errorCore: Int, errorMessage: String): CallResult {
return CallResult(id, null, CallError(errorCore, errorMessage))
return CallResult(id, null, CallError(errorCore, errorMessage, null))
}

fun fail(id: Int, error: Throwable): CallResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class EthereumRpcHead(
.timeout(Defaults.timeout, Mono.error(Exception("Block number not received")))
.flatMap {
if (it.error != null) {
Mono.error(it.error.asException())
Mono.error(it.error.asException(null))
} else {
val value = it.getResultAsProcessedString()
Mono.just(HexQuantity.from(value))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright (c) 2020 EmeraldPay, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.emeraldpay.dshackle.upstream.rpcclient

import io.infinitape.etherjar.rpc.RpcException

class JsonRpcError(val code: Int, val message: String, val details: Any?) {

constructor(code: Int, message: String) : this(code, message, null)

companion object {
@JvmStatic
fun from(err: RpcException): JsonRpcError {
return JsonRpcError(
err.code, err.rpcMessage, err.details
)
}
}

fun asException(id: JsonRpcResponse.Id?): JsonRpcException {
return JsonRpcException(id ?: JsonRpcResponse.IntId(-1), this)
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is JsonRpcError) return false

if (code != other.code) return false
if (message != other.message) return false
if (details != other.details) return false

return true
}

override fun hashCode(): Int {
var result = code
result = 31 * result + message.hashCode()
result = 31 * result + (details?.hashCode() ?: 0)
return result
}


}
Loading

0 comments on commit dee17bc

Please sign in to comment.