diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt index fb425ebf3..4b01cedb5 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt @@ -24,9 +24,12 @@ import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpHeaders import io.netty.handler.ssl.SslContextBuilder import io.netty.resolver.DefaultAddressResolverGroup +import org.apache.commons.lang3.time.StopWatch import org.slf4j.LoggerFactory import reactor.core.publisher.Mono import reactor.netty.http.client.HttpClient +import reactor.util.function.Tuple2 +import reactor.util.function.Tuples import java.io.ByteArrayInputStream import java.security.KeyStore import java.security.cert.CertificateFactory @@ -34,6 +37,7 @@ import java.security.cert.X509Certificate import java.util.Base64 import java.util.concurrent.TimeUnit import java.util.function.Consumer +import java.util.function.Function /** * JSON RPC client @@ -84,52 +88,95 @@ class JsonRpcHttpClient( this.httpClient = build } - fun execute(request: ByteArray): Mono { + fun execute(request: ByteArray): Mono> { val response = httpClient .post() .uri(target) .send(Mono.just(request).map { Unpooled.wrappedBuffer(it) }) return response.response { header, bytes -> - if (header.status().code() != 200) { - Mono.error( - JsonRpcException( - JsonRpcResponse.NumberId(-2), - JsonRpcError( - RpcResponseError.CODE_UPSTREAM_INVALID_RESPONSE, - "HTTP Code: ${header.status().code()}" - ) - ) - ) - } else { - bytes.aggregate().asByteArray() + val statusCode = header.status().code() + bytes.aggregate().asByteArray().map { + Tuples.of(statusCode, it) } }.single() } override fun read(key: JsonRpcRequest): Mono { - var startTime: Long = 0 + val startTime = StopWatch() return Mono.just(key) .map(JsonRpcRequest::toJson) - .doOnNext { - startTime = System.nanoTime() - } + .doOnNext { startTime.start() } .flatMap(this@JsonRpcHttpClient::execute) .doOnNext { - if (startTime > 0) { - val now = System.nanoTime() - metrics.timer.record(now - startTime, TimeUnit.NANOSECONDS) + if (startTime.isStarted) { + metrics.timer.record(startTime.nanoTime, TimeUnit.NANOSECONDS) + } + } + .transform(asJsonRpcResponse(key)) + .transform(convertErrors(key)) + .transform(throwIfError()) + } + + /** + * The subscribers expect to catch an exception if the response contains JSON RPC Error. Convert it here to JsonRpcException + */ + private fun throwIfError(): Function, Mono> { + return Function { resp -> + resp.flatMap { + if (it.hasError()) { + Mono.error(JsonRpcException(it.id, it.error!!)) + } else { + Mono.just(it) } } - .map(parser::parse) - .onErrorResume { t -> + } + } + + /** + * Convert internal exceptions to standard JsonRpcException + */ + private fun convertErrors(key: JsonRpcRequest): Function, Mono> { + return Function { resp -> + resp.onErrorResume { t -> val err = when (t) { - is RpcException -> JsonRpcResponse.error(t.code, t.rpcMessage) - is JsonRpcException -> JsonRpcResponse.error(t.error, JsonRpcResponse.NumberId(1)) - else -> JsonRpcResponse.error(1, t.message ?: t.javaClass.name) + is RpcException -> JsonRpcException.from(t) + is JsonRpcException -> t + else -> JsonRpcException(key.id, t.message ?: t.javaClass.name) } + // here we're measure the internal errors, not upstream errors metrics.fails.increment() - Mono.just(err) + Mono.error(err) } + } + } + + /** + * Process response from the upstream and convert it to JsonRpcResponse. + * The input is a pair of (Http Status Code, Http Response Body) + */ + private fun asJsonRpcResponse(key: JsonRpcRequest): Function>, Mono> { + return Function { resp -> + resp.map { + val parsed = parser.parse(it.t2) + val statusCode = it.t1 + if (statusCode != 200) { + if (parsed.hasError() && parsed.error!!.code != RpcResponseError.CODE_UPSTREAM_INVALID_RESPONSE) { + // extracted the error details from the HTTP Body + parsed + } else { + // here we got a valid response with ERROR as HTTP Status Code. We assume that HTTP Status has + // a higher priority so return an error here anyway + JsonRpcResponse.error( + RpcResponseError.CODE_UPSTREAM_INVALID_RESPONSE, + "HTTP Code: $statusCode", + JsonRpcResponse.NumberId(key.id) + ) + } + } else { + parsed + } + } + } } } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClientSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClientSpec.groovy index 1e02cdc15..549e81fe5 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClientSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClientSpec.groovy @@ -26,6 +26,7 @@ import org.mockserver.model.HttpRequest import org.mockserver.model.HttpResponse import org.mockserver.model.MediaType import org.springframework.util.SocketUtils +import reactor.core.Exceptions import reactor.test.StepVerifier import spock.lang.Specification @@ -84,7 +85,7 @@ class JsonRpcHttpClientSpec extends Specification { .withBody("pong") ) when: - def act = client.execute("ping".bytes).map { new String(it) } + def act = client.execute("ping".bytes).map { new String(it.t2) } then: StepVerifier.create(act) .expectNext("pong") @@ -111,13 +112,44 @@ class JsonRpcHttpClientSpec extends Specification { .withBody("pong") ) when: - def act = client.execute("ping".bytes).map { new String(it) } + def act = client.read( + new JsonRpcRequest("ping", []) + ).block(Duration.ofSeconds(1)) then: - StepVerifier.create(act) - .expectErrorMatches { t -> - t instanceof JsonRpcException && t.error.code == RpcResponseError.CODE_UPSTREAM_INVALID_RESPONSE - } - .verify(Duration.ofSeconds(1)) + def t = thrown(RuntimeException) // reactor.core.Exceptions$ReactiveException + t.cause instanceof JsonRpcException + with(((JsonRpcException)t.cause).error) { + code == RpcResponseError.CODE_UPSTREAM_INVALID_RESPONSE + message == "HTTP Code: 500" + } + } + + def "Tries to extract message if HTTP error if it still contains a JSON RPC message"() { + setup: + def client = new JsonRpcHttpClient("localhost:${port}", metrics, null, null) + + mockServer.when( + HttpRequest.request() + ).respond( + HttpResponse.response() + .withStatusCode(500) + .withBody('{' + + '"jsonrpc": "2.0", ' + + '"id": 1, ' + + '"error": {"code": -32603, "message": "Something happened"}' + + '}') + ) + when: + def act = client.read( + new JsonRpcRequest("ping", []) + ).block(Duration.ofSeconds(1)) + then: + def t = thrown(RuntimeException) // reactor.core.Exceptions$ReactiveException + t.cause instanceof JsonRpcException + with(((JsonRpcException)t.cause).error) { + code == -32603 + message == "Something happened" + } } }