Skip to content

Commit

Permalink
Retain a ref to NIOAsyncWriter until channel active (#2703)
Browse files Browse the repository at this point in the history
Motivation:

NIOAsyncChannel requires users to explicitly close it, this is typically
done by calling `executeThenClose`. If a `NIOAsyncChannel` isn't closed
then its outbound writer will hit a precondition failure on `deinit`.
Not calling `executeThenClose` is a programmer error.

However there are some sharp edges: if NIO never returns the
`NIOAsyncChannel` to the caller (e.g. if a connect attempt fails) then
nothing will finish the writer and precondition will fail in the deinit.
Working around this from a user perspective is non-obvious and requires
keep tracking of all `NIOAsyncChannel`s created from a connect attempt
and closing the unused ones.

We still want to maintain the precondition when users don't close the
channel, one way of achieving this is by defining a point in time at
which NIO hands responsibility of the channel to the user.

Modifications:

- Retain the writer in the outbound writer handler until channel active
- On successful connect attempts, the channel becomes active and the
  connected channel is returned to the caller.
- On failed attempts channel active isn't called so the writer is
  retained until the handler is removed from the pipeline at which point it is
  finished.

Result:

Failed connect attempts don't result in precondition failures when using
NIOAsyncChannel.
  • Loading branch information
glbrntt authored Apr 22, 2024
1 parent bfc1a2c commit 359c461
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 4 deletions.
2 changes: 2 additions & 0 deletions Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ public struct NIOAsyncChannelOutboundWriter<OutboundOut: Sendable>: Sendable {
finishOnDeinit: closeOnDeinit,
delegate: .init(handler: handler)
)

handler.sink = writer.sink
handler.writer = writer.writer

try channel.pipeline.syncOperations.addHandler(handler)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ internal final class NIOAsyncChannelOutboundWriterHandler<OutboundOut: Sendable>
@usableFromInline
var sink: Sink?

/// The writer of the ``NIOAsyncWriter``.
///
/// The reference is retained until `channelActive` is fired. This avoids situations
/// where `deinit` is called on the unfinished writer because the `Channel` was never returned
/// to the caller (e.g. because a connect failed or or happy-eyeballs created multiple
/// channels).
///
/// Effectively `channelActive` is used at the point in time at which NIO cedes ownership of
/// the writer to the caller.
@usableFromInline
var writer: Writer?

/// The channel handler context.
@usableFromInline
var context: ChannelHandlerContext?
Expand Down Expand Up @@ -126,6 +138,14 @@ internal final class NIOAsyncChannelOutboundWriterHandler<OutboundOut: Sendable>
func handlerRemoved(context: ChannelHandlerContext) {
self.context = nil
self.sink?.finish()
self.writer = nil
}

@inlinable
func channelActive(context: ChannelHandlerContext) {
// Drop the writer ref, the caller is responsible for it now.
self.writer = nil
context.fireChannelActive()
}

@inlinable
Expand Down
44 changes: 42 additions & 2 deletions Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -542,9 +542,9 @@ final class AsyncChannelBootstrapTests: XCTestCase {
port: channel.channel.localAddress!.port!,
proposedALPN: .unknown
)
await XCTAssertThrowsError(
await XCTAssertThrowsError {
try await failedProtocolNegotiation.get()
)
}

// Let's check that we can still open a new connection
let stringNegotiationResult = try await self.makeClientChannelWithProtocolNegotiation(
Expand Down Expand Up @@ -575,6 +575,26 @@ final class AsyncChannelBootstrapTests: XCTestCase {
}
}

func testClientBootstrap_connectFails() async throws {
// Beyond verifying the connect throws, this test allows us to check that 'NIOAsyncChannel'
// doesn't crash on deinit when we never return it to the user.
await XCTAssertThrowsError {
try await ClientBootstrap(
group: .singletonMultiThreadedEventLoopGroup
).connect(unixDomainSocketPath: "testClientBootstrapConnectFails") { channel in
return channel.eventLoop.makeCompletedFuture {
return try NIOAsyncChannel(
wrappingChannelSynchronously: channel,
configuration: .init(
inboundType: ByteBuffer.self,
outboundType: ByteBuffer.self
)
)
}
}
}
}

// MARK: Datagram Bootstrap

func testDatagramBootstrap_withAsyncChannel_andHostPort() async throws {
Expand Down Expand Up @@ -663,6 +683,26 @@ final class AsyncChannelBootstrapTests: XCTestCase {
}
}

func testDatagramBootstrap_connectFails() async throws {
// Beyond verifying the connect throws, this test allows us to check that 'NIOAsyncChannel'
// doesn't crash on deinit when we never return it to the user.
await XCTAssertThrowsError {
try await DatagramBootstrap(
group: .singletonMultiThreadedEventLoopGroup
).connect(unixDomainSocketPath: "testDatagramBootstrapConnectFails") { channel in
return channel.eventLoop.makeCompletedFuture {
return try NIOAsyncChannel(
wrappingChannelSynchronously: channel,
configuration: .init(
inboundType: AddressedEnvelope<ByteBuffer>.self,
outboundType: AddressedEnvelope<ByteBuffer>.self
)
)
}
}
}
}

// MARK: - Pipe Bootstrap

func testPipeBootstrap() async throws {
Expand Down
2 changes: 1 addition & 1 deletion Tests/NIOPosixTests/NIOThreadPoolTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,6 @@ class NIOThreadPoolTest: XCTestCase {
let future = threadPool.runIfActive(eventLoop: eventLoop) {
XCTFail("This shouldn't run because the pool is shutdown.")
}
await XCTAssertThrowsError(try await future.get())
await XCTAssertThrowsError { try await future.get() }
}
}
2 changes: 1 addition & 1 deletion Tests/NIOPosixTests/XCTest+AsyncAwait.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import XCTest

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func XCTAssertThrowsError<T>(
_ expression: @autoclosure () async throws -> T,
_ expression: () async throws -> T,
file: StaticString = #filePath,
line: UInt = #line,
verify: (Error) -> Void = { _ in }
Expand Down

0 comments on commit 359c461

Please sign in to comment.