Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed May 29, 2024
1 parent 0f89b16 commit 0d3dbfa
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 72 deletions.
11 changes: 5 additions & 6 deletions benchmarks/rsocket-kotlin/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ benchmark {

register("ktorTcp") {
include("KtorTcpRSocketKotlinBenchmark")
param("payloadSize", "0")
param("dispatcher", "IO", "DEFAULT", "UNCONFINED")
param("selectorDispatcher", "IO", "1", "2", "4", "8", "l1", "l2", "l4", "l8")
param("payloadSize", "0", "64")
param("dispatcher", "DEFAULT")
param("selectorDispatcher", "IO")
}
register("ktorTcpPayloadSize") {
include("KtorTcpRSocketKotlinBenchmark")
Expand All @@ -115,12 +115,11 @@ benchmark {

register("nettyTcp") {
include("NettyTcpRSocketKotlinBenchmark")
param("payloadSize", "0")
param("shareGroup", "true", "false")
param("payloadSize", "0", "64")
}
register("nettyQuic") {
include("NettyQuicRSocketKotlinBenchmark")
param("payloadSize", "0")
param("payloadSize", "0", "64")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.rsocket.kotlin.benchmarks.kotlin

import io.ktor.network.selector.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.ktor.tcp.*
import kotlinx.benchmark.*
Expand All @@ -40,44 +39,36 @@ class KtorTcpRSocketKotlinBenchmark : RSocketKotlinBenchmark() {
private val dispatcherV by lazy {
when (dispatcher) {
"DEFAULT" -> Dispatchers.Default
"IO" -> Dispatchers.IO
"UNCONFINED" -> Dispatchers.Unconfined
else -> error("wrong parameter 'dispatcher=$dispatcher'")
}
}

private val selectorDispatcherV by lazy {
when (selectorDispatcher) {
"DEFAULT" -> Dispatchers.Default
"IO" -> Dispatchers.IO
"1" -> newSingleThreadContext("selectorDispatcher")
"2" -> newFixedThreadPoolContext(2, "selectorDispatcher")
"4" -> newFixedThreadPoolContext(4, "selectorDispatcher")
"8" -> newFixedThreadPoolContext(8, "selectorDispatcher")
"l1" -> Dispatchers.IO.limitedParallelism(1)
"l2" -> Dispatchers.IO.limitedParallelism(2)
"l4" -> Dispatchers.IO.limitedParallelism(4)
"l8" -> Dispatchers.IO.limitedParallelism(8)
else -> error("wrong parameter 'selectorDispatcher=$selectorDispatcher'")
}
}
// private val selectorDispatcherV by lazy {
// when (selectorDispatcher) {
// "DEFAULT" -> Dispatchers.Default
// "IO" -> Dispatchers.IO
// "2" -> newFixedThreadPoolContext(2, "selectorDispatcher")
// else -> error("wrong parameter 'selectorDispatcher=$selectorDispatcher'")
// }
// }

private val selector by lazy {
SelectorManager(selectorDispatcherV)
}
// private val selector by lazy {
// SelectorManager(selectorDispatcherV)
// }

override val serverTarget: RSocketServerTarget<*> by lazy {
KtorTcpServerTransport(benchJob) {
dispatcher(dispatcherV)
selectorManager(selector, manage = false)
}.target(port = 9000)
// dispatcher(dispatcherV)
// selectorManager(selector, manage = false)
}.target()
}

override val clientTarget: RSocketClientTarget by lazy {
KtorTcpClientTransport(benchJob) {
dispatcher(dispatcherV)
selectorManager(selector, manage = false)
}.target("0.0.0.0", port = 9000)
override fun clientTarget(serverInstance: RSocketServerInstance): RSocketClientTarget {
return KtorTcpClientTransport(benchJob) {
// dispatcher(dispatcherV)
// selectorManager(selector, manage = false)
}.target((serverInstance as KtorTcpServerInstance).localAddress)
}

@Setup
Expand All @@ -88,13 +79,13 @@ class KtorTcpRSocketKotlinBenchmark : RSocketKotlinBenchmark() {
@TearDown
override fun cleanup() {
super.cleanup()
selector.close()
if (
selectorDispatcherV != Dispatchers.Default &&
selectorDispatcherV != Dispatchers.IO &&
selectorDispatcherV is CloseableCoroutineDispatcher
) {
(selectorDispatcherV as CloseableCoroutineDispatcher).close()
}
// selector.close()
// if (
// selectorDispatcherV != Dispatchers.Default &&
// selectorDispatcherV != Dispatchers.IO &&
// selectorDispatcherV is CloseableCoroutineDispatcher
// ) {
// (selectorDispatcherV as CloseableCoroutineDispatcher).close()
// }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ class LocalRSocketKotlinBenchmark : RSocketKotlinBenchmark() {
}.target("local")
}

override val clientTarget: RSocketClientTarget by lazy {
LocalClientTransport(benchJob) {
override fun clientTarget(serverInstance: RSocketServerInstance): RSocketClientTarget {
return LocalClientTransport(benchJob) {
dispatcher(dispatcherV)
}.target("local")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import kotlin.random.*

@OptIn(ExperimentalStreamsApi::class)
abstract class RSocketKotlinBenchmark : RSocketBenchmark<Payload, Blackhole>() {
protected abstract val clientTarget: RSocketClientTarget
protected abstract fun clientTarget(serverInstance: RSocketServerInstance): RSocketClientTarget
protected abstract val serverTarget: RSocketServerTarget<*>

private val requestStrategy = PrefetchStrategy(64, 0)
Expand All @@ -56,7 +56,7 @@ abstract class RSocketKotlinBenchmark : RSocketBenchmark<Payload, Blackhole>() {
payload = createPayload(payloadSize)
payloadsFlow = flow { repeat(5000) { emit(createPayloadCopy()) } }

RSocketServer().startServer(serverTarget) {
val serverInstance = RSocketServer().startServer(serverTarget) {
RSocketRequestHandler {
requestResponse {
it.close()
Expand All @@ -72,7 +72,7 @@ abstract class RSocketKotlinBenchmark : RSocketBenchmark<Payload, Blackhole>() {
}
}
}
client = RSocketConnector().connect(clientTarget)
client = RSocketConnector().connect(clientTarget(serverInstance))
}

override fun cleanup(): Unit = runBlocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,19 @@ class NettyQuicRSocketKotlinBenchmark : RSocketKotlinBenchmark() {
codec {
tokenHandler(InsecureQuicTokenHandler.INSTANCE)
}
}.target(port = 9009)
}.target("127.0.0.1")
}

override val clientTarget: RSocketClientTarget by lazy {
NettyQuicClientTransport(benchJob) {
override fun clientTarget(serverInstance: RSocketServerInstance): RSocketClientTarget {
return NettyQuicClientTransport(benchJob) {
eventLoopGroup(sharedGroup, manage = false)
ssl {
trustManager(InsecureTrustManagerFactory.INSTANCE)
applicationProtocols(*protos)
}
}.target("127.0.0.1", port = 9009)
}.target(
(serverInstance as NettyQuicServerInstance).localAddress
)
}

@Setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,14 @@ class NettyTcpRSocketKotlinBenchmark : RSocketKotlinBenchmark() {
@Param("0")
override var payloadSize: Int = 0

@Param("true", "false")
var shareGroup: Boolean = true

private val sharedGroup by lazy {
if (shareGroup) NioEventLoopGroup() else null
}

override val serverTarget: RSocketServerTarget<*> by lazy {
NettyTcpServerTransport(benchJob) {
if (sharedGroup != null) {
eventLoopGroup(sharedGroup!!, manage = false)
}
}.target(port = 9000)
NettyTcpServerTransport(benchJob).target()
}

override val clientTarget: RSocketClientTarget by lazy {
NettyTcpClientTransport(benchJob) {
if (sharedGroup != null) {
eventLoopGroup(sharedGroup!!, manage = false)
}
}.target("0.0.0.0", port = 9000)
override fun clientTarget(serverInstance: RSocketServerInstance): RSocketClientTarget {
return NettyTcpClientTransport(benchJob).target(
(serverInstance as NettyTcpServerInstance).localAddress
)
}

@Setup
Expand All @@ -63,6 +50,5 @@ class NettyTcpRSocketKotlinBenchmark : RSocketKotlinBenchmark() {
@TearDown
override fun cleanup() {
super.cleanup()
sharedGroup?.shutdownGracefully()?.await(1000)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private class NettyTcpClientTransportBuilderImpl : NettyTcpClientTransportBuilde
}

return NettyTcpClientTransportImpl(
coroutineContext = context.supervisorContext() + bootstrap.config().group().asCoroutineDispatcher(),
coroutineContext = context.supervisorContext() + Dispatchers.Default, //bootstrap.config().group().asCoroutineDispatcher(),
sslContext = sslContext,
bootstrap = bootstrap,
manageBootstrap = manageEventLoopGroup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private class NettyTcpServerTransportBuilderImpl : NettyTcpServerTransportBuilde
}

return NettyTcpServerTransportImpl(
coroutineContext = context.supervisorContext() + bootstrap.config().childGroup().asCoroutineDispatcher(),
coroutineContext = context.supervisorContext() + Dispatchers.Default, //bootstrap.config().childGroup().asCoroutineDispatcher(),
bootstrap = bootstrap,
sslContext = sslContext,
manageBootstrap = manageEventLoopGroup
Expand Down

0 comments on commit 0d3dbfa

Please sign in to comment.