Skip to content

Commit

Permalink
problem: some nodes may respond with a JSON RPC setting a non-200 sta…
Browse files Browse the repository at this point in the history
…tus code but Dshackle ignores that JSON (#184)
  • Loading branch information
splix committed Oct 4, 2022
1 parent 752b273 commit db9b331
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,20 @@ import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpHeaders
import io.netty.handler.ssl.SslContextBuilder
import io.netty.resolver.DefaultAddressResolverGroup
import org.apache.commons.lang3.time.StopWatch
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.util.function.Tuple2
import reactor.util.function.Tuples
import java.io.ByteArrayInputStream
import java.security.KeyStore
import java.security.cert.CertificateFactory
import java.security.cert.X509Certificate
import java.util.Base64
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import java.util.function.Function

/**
* JSON RPC client
Expand Down Expand Up @@ -84,52 +88,95 @@ class JsonRpcHttpClient(
this.httpClient = build
}

fun execute(request: ByteArray): Mono<ByteArray> {
fun execute(request: ByteArray): Mono<Tuple2<Int, ByteArray>> {
val response = httpClient
.post()
.uri(target)
.send(Mono.just(request).map { Unpooled.wrappedBuffer(it) })

return response.response { header, bytes ->
if (header.status().code() != 200) {
Mono.error(
JsonRpcException(
JsonRpcResponse.NumberId(-2),
JsonRpcError(
RpcResponseError.CODE_UPSTREAM_INVALID_RESPONSE,
"HTTP Code: ${header.status().code()}"
)
)
)
} else {
bytes.aggregate().asByteArray()
val statusCode = header.status().code()
bytes.aggregate().asByteArray().map {
Tuples.of(statusCode, it)
}
}.single()
}

override fun read(key: JsonRpcRequest): Mono<JsonRpcResponse> {
var startTime: Long = 0
val startTime = StopWatch()
return Mono.just(key)
.map(JsonRpcRequest::toJson)
.doOnNext {
startTime = System.nanoTime()
}
.doOnNext { startTime.start() }
.flatMap(this@JsonRpcHttpClient::execute)
.doOnNext {
if (startTime > 0) {
val now = System.nanoTime()
metrics.timer.record(now - startTime, TimeUnit.NANOSECONDS)
if (startTime.isStarted) {
metrics.timer.record(startTime.nanoTime, TimeUnit.NANOSECONDS)
}
}
.transform(asJsonRpcResponse(key))
.transform(convertErrors(key))
.transform(throwIfError())
}

/**
* The subscribers expect to catch an exception if the response contains JSON RPC Error. Convert it here to JsonRpcException
*/
private fun throwIfError(): Function<Mono<JsonRpcResponse>, Mono<JsonRpcResponse>> {
return Function { resp ->
resp.flatMap {
if (it.hasError()) {
Mono.error(JsonRpcException(it.id, it.error!!))
} else {
Mono.just(it)
}
}
.map(parser::parse)
.onErrorResume { t ->
}
}

/**
* Convert internal exceptions to standard JsonRpcException
*/
private fun convertErrors(key: JsonRpcRequest): Function<Mono<JsonRpcResponse>, Mono<JsonRpcResponse>> {
return Function { resp ->
resp.onErrorResume { t ->
val err = when (t) {
is RpcException -> JsonRpcResponse.error(t.code, t.rpcMessage)
is JsonRpcException -> JsonRpcResponse.error(t.error, JsonRpcResponse.NumberId(1))
else -> JsonRpcResponse.error(1, t.message ?: t.javaClass.name)
is RpcException -> JsonRpcException.from(t)
is JsonRpcException -> t
else -> JsonRpcException(key.id, t.message ?: t.javaClass.name)
}
// here we're measure the internal errors, not upstream errors
metrics.fails.increment()
Mono.just(err)
Mono.error(err)
}
}
}

/**
* Process response from the upstream and convert it to JsonRpcResponse.
* The input is a pair of (Http Status Code, Http Response Body)
*/
private fun asJsonRpcResponse(key: JsonRpcRequest): Function<Mono<Tuple2<Int, ByteArray>>, Mono<JsonRpcResponse>> {
return Function { resp ->
resp.map {
val parsed = parser.parse(it.t2)
val statusCode = it.t1
if (statusCode != 200) {
if (parsed.hasError() && parsed.error!!.code != RpcResponseError.CODE_UPSTREAM_INVALID_RESPONSE) {
// extracted the error details from the HTTP Body
parsed
} else {
// here we got a valid response with ERROR as HTTP Status Code. We assume that HTTP Status has
// a higher priority so return an error here anyway
JsonRpcResponse.error(
RpcResponseError.CODE_UPSTREAM_INVALID_RESPONSE,
"HTTP Code: $statusCode",
JsonRpcResponse.NumberId(key.id)
)
}
} else {
parsed
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.mockserver.model.HttpRequest
import org.mockserver.model.HttpResponse
import org.mockserver.model.MediaType
import org.springframework.util.SocketUtils
import reactor.core.Exceptions
import reactor.test.StepVerifier
import spock.lang.Specification

Expand Down Expand Up @@ -84,7 +85,7 @@ class JsonRpcHttpClientSpec extends Specification {
.withBody("pong")
)
when:
def act = client.execute("ping".bytes).map { new String(it) }
def act = client.execute("ping".bytes).map { new String(it.t2) }
then:
StepVerifier.create(act)
.expectNext("pong")
Expand All @@ -111,13 +112,44 @@ class JsonRpcHttpClientSpec extends Specification {
.withBody("pong")
)
when:
def act = client.execute("ping".bytes).map { new String(it) }
def act = client.read(
new JsonRpcRequest("ping", [])
).block(Duration.ofSeconds(1))
then:
StepVerifier.create(act)
.expectErrorMatches { t ->
t instanceof JsonRpcException && t.error.code == RpcResponseError.CODE_UPSTREAM_INVALID_RESPONSE
}
.verify(Duration.ofSeconds(1))
def t = thrown(RuntimeException) // reactor.core.Exceptions$ReactiveException
t.cause instanceof JsonRpcException
with(((JsonRpcException)t.cause).error) {
code == RpcResponseError.CODE_UPSTREAM_INVALID_RESPONSE
message == "HTTP Code: 500"
}
}

def "Tries to extract message if HTTP error if it still contains a JSON RPC message"() {
setup:
def client = new JsonRpcHttpClient("localhost:${port}", metrics, null, null)

mockServer.when(
HttpRequest.request()
).respond(
HttpResponse.response()
.withStatusCode(500)
.withBody('{' +
'"jsonrpc": "2.0", ' +
'"id": 1, ' +
'"error": {"code": -32603, "message": "Something happened"}' +
'}')
)
when:
def act = client.read(
new JsonRpcRequest("ping", [])
).block(Duration.ofSeconds(1))
then:
def t = thrown(RuntimeException) // reactor.core.Exceptions$ReactiveException
t.cause instanceof JsonRpcException
with(((JsonRpcException)t.cause).error) {
code == -32603
message == "Something happened"
}
}

}

0 comments on commit db9b331

Please sign in to comment.