From e8a76944e9816baa9d758f51384ea353af6f2b9e Mon Sep 17 00:00:00 2001 From: Igor Artamonov Date: Fri, 19 Jan 2024 17:53:59 -0500 Subject: [PATCH] problem: standard HTTP Queue fails under a high load with slow upstreams b/c it's limited to 1000 requests in queue solution: increase default queue to 5000 requests --- .../upstream/rpcclient/JsonRpcHttpClient.kt | 9 +- .../rpcclient/JsonRpcHttpClientTest.kt | 85 +++++++++++++++++++ 2 files changed, 93 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt index 1934536bc..1525b3061 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClient.kt @@ -28,6 +28,7 @@ import org.apache.commons.lang3.time.StopWatch import org.slf4j.LoggerFactory import reactor.core.publisher.Mono import reactor.netty.http.client.HttpClient +import reactor.netty.resources.ConnectionProvider import reactor.util.function.Tuple2 import reactor.util.function.Tuples import java.io.ByteArrayInputStream @@ -51,6 +52,12 @@ class JsonRpcHttpClient( companion object { private val log = LoggerFactory.getLogger(JsonRpcHttpClient::class.java) + + // default connection pool has only 1000 of pending connections, which is not always enough + private val connectionProvider = ConnectionProvider.builder("json-rpc-pool") + .maxConnections(500) + .pendingAcquireMaxCount(5000) + .build() } private val parser = ResponseRpcParser() @@ -61,7 +68,7 @@ class JsonRpcHttpClient( override var onHttpError: Consumer? = null init { - var build = HttpClient.create() + var build = HttpClient.create(connectionProvider) .resolver(DefaultAddressResolverGroup.INSTANCE) .compress(compress) .doOnChannelInit(metrics.onChannelInit) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClientTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClientTest.kt index 6ff45e31c..12ddb3ea3 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClientTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcHttpClientTest.kt @@ -6,19 +6,27 @@ import io.emeraldpay.dshackle.test.TestingCommonsKotlin import io.emeraldpay.etherjar.rpc.RpcResponseError import io.kotest.assertions.throwables.shouldThrowAny import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.comparables.shouldBeGreaterThan import io.kotest.matchers.shouldBe import io.kotest.matchers.types.instanceOf import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.DistributionSummary import io.micrometer.core.instrument.Tag import io.micrometer.core.instrument.Timer +import org.mockserver.configuration.Configuration import org.mockserver.integration.ClientAndServer +import org.mockserver.model.Delay import org.mockserver.model.HttpRequest import org.mockserver.model.HttpResponse import org.mockserver.model.MediaType import org.springframework.util.SocketUtils import reactor.core.Exceptions +import reactor.core.scheduler.Schedulers import java.time.Duration +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger class JsonRpcHttpClientTest : ShouldSpec({ @@ -122,4 +130,81 @@ class JsonRpcHttpClientTest : ShouldSpec({ it.message shouldBe "Something happened" } } + + should("Handle many requests") { + if (System.getenv("CI") == "true") { + // skip on CI, it's too slow + println("Skipping test on CI") + return@should + } + mockServer?.stop() + mockServer = ClientAndServer.startClientAndServer( + Configuration().also { + it.actionHandlerThreadCount(100) + it.nioEventLoopThreadCount(100) + }, + port + ) + val client = JsonRpcHttpClient("localhost:$port", metrics, null, null) + val resp = """ + { + "jsonrpc": "2.0", + "result": 100, + "id": 1 + } + """.trimIndent() + mockServer!!.`when`(HttpRequest.request()) + .respond( + HttpResponse.response() + .withDelay(Delay.milliseconds(1500)) + .withStatusCode(200) + .withBody(resp) + ) + +// val executor = Executors.newFixedThreadPool(2000) +// val count = AtomicInteger(0) +// repeat(2000) { id -> +// executor.execute { +// try { +// client.read(JsonRpcRequest("test", emptyList())) +// .subscribeOn(Schedulers.immediate()) +// .block(Duration.ofSeconds(5)) +// count.incrementAndGet() +// } catch (t: Throwable) { +// val e = Exceptions.unwrap(t) +// if (e.message.equals("Connection prematurely closed BEFORE response")) { +// // ignore, may happen when the Executor is closing +// } else { +// System.err.println("Error for call ${id}: ${e.message}") +// } +// } +// } +// } +// executor.shutdown() +// executor.awaitTermination(60, TimeUnit.SECONDS) + + val wait = CountDownLatch(2000) + val executor = Schedulers.fromExecutor(Executors.newFixedThreadPool(2000)) + val count = AtomicInteger(0) + repeat(2000) { id -> + client.read(JsonRpcRequest("test", emptyList())) + .subscribeOn(executor) + .doFinally { wait.countDown() } + .doOnError { e -> + if (e.message.equals("Connection prematurely closed BEFORE response")) { + // ignore, may happen when the Executor is closing + } else { + System.err.println("Error for call $id: ${e.message}") + } + } + .subscribe { + count.incrementAndGet() + } + } + wait.await(60, TimeUnit.SECONDS) + + println("Count: ${count.get()}") + // a few connection may fail, especially when running on a CI, so check that at least majority of requests were successful + count.get() shouldBeGreaterThan 1950 + } })