Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

problem: standard HTTP Queue fails under a high load #281

Merged
merged 1 commit into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.commons.lang3.time.StopWatch
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider
import reactor.util.function.Tuple2
import reactor.util.function.Tuples
import java.io.ByteArrayInputStream
Expand All @@ -51,6 +52,12 @@ class JsonRpcHttpClient(

companion object {
private val log = LoggerFactory.getLogger(JsonRpcHttpClient::class.java)

// default connection pool has only 1000 of pending connections, which is not always enough
private val connectionProvider = ConnectionProvider.builder("json-rpc-pool")
.maxConnections(500)
.pendingAcquireMaxCount(5000)
.build()
}

private val parser = ResponseRpcParser()
Expand All @@ -61,7 +68,7 @@ class JsonRpcHttpClient(
override var onHttpError: Consumer<Int>? = null

init {
var build = HttpClient.create()
var build = HttpClient.create(connectionProvider)
.resolver(DefaultAddressResolverGroup.INSTANCE)
.compress(compress)
.doOnChannelInit(metrics.onChannelInit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,27 @@ import io.emeraldpay.dshackle.test.TestingCommonsKotlin
import io.emeraldpay.etherjar.rpc.RpcResponseError
import io.kotest.assertions.throwables.shouldThrowAny
import io.kotest.core.spec.style.ShouldSpec
import io.kotest.matchers.comparables.shouldBeGreaterThan
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.instanceOf
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.DistributionSummary
import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.Timer
import org.mockserver.configuration.Configuration
import org.mockserver.integration.ClientAndServer
import org.mockserver.model.Delay
import org.mockserver.model.HttpRequest
import org.mockserver.model.HttpResponse
import org.mockserver.model.MediaType
import org.springframework.util.SocketUtils
import reactor.core.Exceptions
import reactor.core.scheduler.Schedulers
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

class JsonRpcHttpClientTest : ShouldSpec({

Expand Down Expand Up @@ -122,4 +130,81 @@ class JsonRpcHttpClientTest : ShouldSpec({
it.message shouldBe "Something happened"
}
}

should("Handle many requests") {
if (System.getenv("CI") == "true") {
// skip on CI, it's too slow
println("Skipping test on CI")
return@should
}
mockServer?.stop()
mockServer = ClientAndServer.startClientAndServer(
Configuration().also {
it.actionHandlerThreadCount(100)
it.nioEventLoopThreadCount(100)
},
port
)
val client = JsonRpcHttpClient("localhost:$port", metrics, null, null)
val resp = """
{
"jsonrpc": "2.0",
"result": 100,
"id": 1
}
""".trimIndent()
mockServer!!.`when`(HttpRequest.request())
.respond(
HttpResponse.response()
.withDelay(Delay.milliseconds(1500))
.withStatusCode(200)
.withBody(resp)
)

// val executor = Executors.newFixedThreadPool(2000)
// val count = AtomicInteger(0)
// repeat(2000) { id ->
// executor.execute {
// try {
// client.read(JsonRpcRequest("test", emptyList()))
// .subscribeOn(Schedulers.immediate())
// .block(Duration.ofSeconds(5))
// count.incrementAndGet()
// } catch (t: Throwable) {
// val e = Exceptions.unwrap(t)
// if (e.message.equals("Connection prematurely closed BEFORE response")) {
// // ignore, may happen when the Executor is closing
// } else {
// System.err.println("Error for call ${id}: ${e.message}")
// }
// }
// }
// }
// executor.shutdown()
// executor.awaitTermination(60, TimeUnit.SECONDS)

val wait = CountDownLatch(2000)
val executor = Schedulers.fromExecutor(Executors.newFixedThreadPool(2000))
val count = AtomicInteger(0)
repeat(2000) { id ->
client.read(JsonRpcRequest("test", emptyList()))
.subscribeOn(executor)
.doFinally { wait.countDown() }
.doOnError { e ->
if (e.message.equals("Connection prematurely closed BEFORE response")) {
// ignore, may happen when the Executor is closing
} else {
System.err.println("Error for call $id: ${e.message}")
}
}
.subscribe {
count.incrementAndGet()
}
}
wait.await(60, TimeUnit.SECONDS)

println("Count: ${count.get()}")
// a few connection may fail, especially when running on a CI, so check that at least majority of requests were successful
count.get() shouldBeGreaterThan 1950
}
})
Loading