Skip to content

Commit

Permalink
Fix thread-safety issues in TCPThroughputBenchmark (#2537)
Browse files Browse the repository at this point in the history
Motivation:

Several thread-safety issues were missed in code review. This patch fixes them.

Modifications:

- Removed the use of an unstructured Task, replaced with eventLoop.execute to
  ServerHandler's EventLoop.
- Stopped ClientHandler reaching into the benchmark object without any
  synchronization, used promises and event loop hops instead.

Result:

Thread safety is back
  • Loading branch information
Lukasa authored Oct 25, 2023
1 parent 935dbdf commit 54c85cb
Showing 1 changed file with 39 additions and 18 deletions.
57 changes: 39 additions & 18 deletions Sources/NIOPerformanceTester/TCPThroughputBenchmark.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@ final class TCPThroughputBenchmark: Benchmark {
private var clientChannel: Channel!

private var message: ByteBuffer!
private var isDonePromise: EventLoopPromise<Void>!
private var serverEventLoop: EventLoop!

final class ServerHandler: ChannelInboundHandler {
public typealias InboundIn = ByteBuffer
public typealias OutboundOut = ByteBuffer

private var channel: Channel!
private var context: ChannelHandlerContext!

public func channelActive(context: ChannelHandlerContext) {
self.channel = context.channel
self.context = context
}

public func send(_ message: ByteBuffer, times count: Int) {
for _ in 0..<count {
_ = self.channel.writeAndFlush(message.slice())
_ = self.context.writeAndFlush(self.wrapOutboundOut(message.slice()))
}
}
}
Expand All @@ -70,19 +70,30 @@ final class TCPThroughputBenchmark: Benchmark {
public typealias InboundIn = ByteBuffer
public typealias OutboundOut = ByteBuffer

private let benchmark: TCPThroughputBenchmark
private var messagesReceived: Int
private var expectedMessages: Int?
private var completionPromise: EventLoopPromise<Void>?

init(_ benchmark: TCPThroughputBenchmark) {
self.benchmark = benchmark
init() {
self.messagesReceived = 0
}

func prepareRun(expectedMessages: Int, promise: EventLoopPromise<Void>) {
self.expectedMessages = expectedMessages
self.completionPromise = promise
}

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
self.messagesReceived += 1
if (self.benchmark.messages == self.messagesReceived) {
self.benchmark.isDonePromise.succeed()

if (self.expectedMessages == self.messagesReceived) {
let promise = self.completionPromise

self.messagesReceived = 0
self.expectedMessages = nil
self.completionPromise = nil

promise!.succeed()
}
}
}
Expand All @@ -95,12 +106,12 @@ final class TCPThroughputBenchmark: Benchmark {
func setUp() throws {
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 4)

let connectionEstablished: EventLoopPromise<Void> = self.group.next().makePromise()
let connectionEstablished: EventLoopPromise<EventLoop> = self.group.next().makePromise()

self.serverChannel = try ServerBootstrap(group: self.group)
.childChannelInitializer { channel in
self.serverHandler = ServerHandler()
connectionEstablished.succeed()
connectionEstablished.succeed(channel.eventLoop)
return channel.pipeline.addHandler(self.serverHandler)
}
.bind(host: "127.0.0.1", port: 0)
Expand All @@ -109,7 +120,7 @@ final class TCPThroughputBenchmark: Benchmark {
self.clientChannel = try ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.addHandler(ByteToMessageHandler(StreamDecoder())).flatMap { _ in
channel.pipeline.addHandler(ClientHandler(self))
channel.pipeline.addHandler(ClientHandler())
}
}
.connect(to: serverChannel.localAddress!)
Expand All @@ -122,7 +133,7 @@ final class TCPThroughputBenchmark: Benchmark {
}
self.message = message

try connectionEstablished.futureResult.wait()
self.serverEventLoop = try connectionEstablished.futureResult.wait()
}

func tearDown() {
Expand All @@ -132,12 +143,22 @@ final class TCPThroughputBenchmark: Benchmark {
}

func run() throws -> Int {
self.isDonePromise = self.group.next().makePromise()
Task {
self.serverHandler.send(self.message, times: self.messages)
let isDonePromise = self.clientChannel.eventLoop.makePromise(of: Void.self)
let clientChannel = self.clientChannel!
let expectedMessages = self.messages

try clientChannel.eventLoop.submit {
try clientChannel.pipeline.syncOperations.handler(type: ClientHandler.self).prepareRun(expectedMessages: expectedMessages, promise: isDonePromise)
}.wait()

let serverHandler = self.serverHandler!
let message = self.message!
let messages = self.messages

self.serverEventLoop.execute {
serverHandler.send(message, times: messages)
}
try self.isDonePromise.futureResult.wait()
self.isDonePromise = nil
try isDonePromise.futureResult.wait()
return 0
}
}

0 comments on commit 54c85cb

Please sign in to comment.