Skip to content

Commit

Permalink
problem: standard HTTP Queue fails under a high load with slow upstre…
Browse files Browse the repository at this point in the history
…ams b/c it's limited to 1000 requests in queue

solution: increase default queue to 5000 requests
  • Loading branch information
splix committed Jan 19, 2024
1 parent 6fd701b commit e8a7694
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.commons.lang3.time.StopWatch
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider
import reactor.util.function.Tuple2
import reactor.util.function.Tuples
import java.io.ByteArrayInputStream
Expand All @@ -51,6 +52,12 @@ class JsonRpcHttpClient(

companion object {
private val log = LoggerFactory.getLogger(JsonRpcHttpClient::class.java)

// default connection pool has only 1000 of pending connections, which is not always enough
private val connectionProvider = ConnectionProvider.builder("json-rpc-pool")
.maxConnections(500)
.pendingAcquireMaxCount(5000)
.build()
}

private val parser = ResponseRpcParser()
Expand All @@ -61,7 +68,7 @@ class JsonRpcHttpClient(
override var onHttpError: Consumer<Int>? = null

init {
var build = HttpClient.create()
var build = HttpClient.create(connectionProvider)
.resolver(DefaultAddressResolverGroup.INSTANCE)
.compress(compress)
.doOnChannelInit(metrics.onChannelInit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,27 @@ import io.emeraldpay.dshackle.test.TestingCommonsKotlin
import io.emeraldpay.etherjar.rpc.RpcResponseError
import io.kotest.assertions.throwables.shouldThrowAny
import io.kotest.core.spec.style.ShouldSpec
import io.kotest.matchers.comparables.shouldBeGreaterThan
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.instanceOf
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.DistributionSummary
import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.Timer
import org.mockserver.configuration.Configuration
import org.mockserver.integration.ClientAndServer
import org.mockserver.model.Delay
import org.mockserver.model.HttpRequest
import org.mockserver.model.HttpResponse
import org.mockserver.model.MediaType
import org.springframework.util.SocketUtils
import reactor.core.Exceptions
import reactor.core.scheduler.Schedulers
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

class JsonRpcHttpClientTest : ShouldSpec({

Expand Down Expand Up @@ -122,4 +130,81 @@ class JsonRpcHttpClientTest : ShouldSpec({
it.message shouldBe "Something happened"
}
}

should("Handle many requests") {
if (System.getenv("CI") == "true") {
// skip on CI, it's too slow
println("Skipping test on CI")
return@should
}
mockServer?.stop()
mockServer = ClientAndServer.startClientAndServer(
Configuration().also {
it.actionHandlerThreadCount(100)
it.nioEventLoopThreadCount(100)
},
port
)
val client = JsonRpcHttpClient("localhost:$port", metrics, null, null)
val resp = """
{
"jsonrpc": "2.0",
"result": 100,
"id": 1
}
""".trimIndent()
mockServer!!.`when`(HttpRequest.request())
.respond(
HttpResponse.response()
.withDelay(Delay.milliseconds(1500))
.withStatusCode(200)
.withBody(resp)
)

// val executor = Executors.newFixedThreadPool(2000)
// val count = AtomicInteger(0)
// repeat(2000) { id ->
// executor.execute {
// try {
// client.read(JsonRpcRequest("test", emptyList()))
// .subscribeOn(Schedulers.immediate())
// .block(Duration.ofSeconds(5))
// count.incrementAndGet()
// } catch (t: Throwable) {
// val e = Exceptions.unwrap(t)
// if (e.message.equals("Connection prematurely closed BEFORE response")) {
// // ignore, may happen when the Executor is closing
// } else {
// System.err.println("Error for call ${id}: ${e.message}")
// }
// }
// }
// }
// executor.shutdown()
// executor.awaitTermination(60, TimeUnit.SECONDS)

val wait = CountDownLatch(2000)
val executor = Schedulers.fromExecutor(Executors.newFixedThreadPool(2000))
val count = AtomicInteger(0)
repeat(2000) { id ->
client.read(JsonRpcRequest("test", emptyList()))
.subscribeOn(executor)
.doFinally { wait.countDown() }
.doOnError { e ->
if (e.message.equals("Connection prematurely closed BEFORE response")) {
// ignore, may happen when the Executor is closing
} else {
System.err.println("Error for call $id: ${e.message}")
}
}
.subscribe {
count.incrementAndGet()
}
}
wait.await(60, TimeUnit.SECONDS)

println("Count: ${count.get()}")
// a few connection may fail, especially when running on a CI, so check that at least majority of requests were successful
count.get() shouldBeGreaterThan 1950
}
})

0 comments on commit e8a7694

Please sign in to comment.