Skip to content

Commit

Permalink
Add support for unidirectional NIOPipeBootstrap (#2560)
Browse files Browse the repository at this point in the history
* Add support for unidirectional `NIOPipeBootstrap`

# Motivation
In some scenarios, it is useful to only have either an input or output side for a `PipeChannel`. This fixes #2444.

# Modification
This PR adds new methods to `NIOPipeBootstrap` that make either the input or the output optional. Furthermore, I am intentionally breaking the API for the new async methods since those haven't shipped yet to reflect the same API there.

# Result
It is now possible to bootstrap a `PipeChannel` with either the input or output side closed.

* Docs and naming
  • Loading branch information
FranzBusch authored Oct 24, 2023
1 parent 2683889 commit 935dbdf
Show file tree
Hide file tree
Showing 6 changed files with 455 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@ internal final class NIOAsyncChannelOutboundWriterHandler<OutboundOut: Sendable>
self.sink?.setWritability(to: context.channel.isWritable)
context.fireChannelWritabilityChanged()
}

@inlinable
func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
switch event {
case ChannelEvent.outputClosed:
self.sink?.finish()
default:
break
}

context.fireUserInboundEventTriggered(event)
}
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
Expand Down
6 changes: 1 addition & 5 deletions Sources/NIOCrashTester/OutputGrepper.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ internal struct OutputGrepper {

internal static func make(group: EventLoopGroup) -> OutputGrepper {
let processToChannel = Pipe()
let deadPipe = Pipe() // just so we have an output...

let eventLoop = group.next()
let outputPromise = eventLoop.makePromise(of: ProgramOutput.self)
Expand All @@ -34,13 +33,10 @@ internal struct OutputGrepper {
channel.pipeline.addHandlers([ByteToMessageHandler(NewlineFramer()),
GrepHandler(promise: outputPromise)])
}
.takingOwnershipOfDescriptors(input: dup(processToChannel.fileHandleForReading.fileDescriptor),
output: dup(deadPipe.fileHandleForWriting.fileDescriptor))
.takingOwnershipOfDescriptor(input: dup(processToChannel.fileHandleForReading.fileDescriptor))
let processOutputPipe = NIOFileHandle(descriptor: dup(processToChannel.fileHandleForWriting.fileDescriptor))
processToChannel.fileHandleForReading.closeFile()
processToChannel.fileHandleForWriting.closeFile()
deadPipe.fileHandleForReading.closeFile()
deadPipe.fileHandleForWriting.closeFile()
channelFuture.cascadeFailure(to: outputPromise)
return OutputGrepper(result: outputPromise.futureResult,
processOutputPipe: processOutputPipe)
Expand Down
229 changes: 158 additions & 71 deletions Sources/NIOPosix/Bootstrap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2055,53 +2055,63 @@ public final class NIOPipeBootstrap {
/// - output: The _Unix file descriptor_ for the output (ie. the write side).
/// - Returns: an `EventLoopFuture<Channel>` to deliver the `Channel`.
public func takingOwnershipOfDescriptors(input: CInt, output: CInt) -> EventLoopFuture<Channel> {
precondition(input >= 0 && output >= 0 && input != output,
"illegal file descriptor pair. The file descriptors \(input), \(output) " +
"must be distinct and both positive integers.")
let eventLoop = group.next()
do {
try self.validateFileDescriptorIsNotAFile(input)
try self.validateFileDescriptorIsNotAFile(output)
} catch {
return eventLoop.makeFailedFuture(error)
}
self._takingOwnershipOfDescriptors(input: input, output: output)
}

let channelInitializer = self.channelInitializer ?? { _ in eventLoop.makeSucceededFuture(()) }
let channel: PipeChannel
do {
let inputFH = NIOFileHandle(descriptor: input)
let outputFH = NIOFileHandle(descriptor: output)
channel = try PipeChannel(eventLoop: eventLoop as! SelectableEventLoop,
inputPipe: inputFH,
outputPipe: outputFH)
} catch {
return eventLoop.makeFailedFuture(error)
}
/// Create the `PipeChannel` with the provided input file descriptor.
///
/// The input file descriptor must be distinct.
///
/// - Note: If this method returns a succeeded future, SwiftNIO will close `input` when the `Channel`
/// becomes inactive. You _must not_ do any further operations to `input`, including `close`.
/// If this method returns a failed future, you still own the file descriptor and are responsible for
/// closing them.
///
/// - Parameters:
/// - input: The _Unix file descriptor_ for the input (ie. the read side).
/// - Returns: an `EventLoopFuture<Channel>` to deliver the `Channel`.
public func takingOwnershipOfDescriptor(
input: CInt
) -> EventLoopFuture<Channel> {
self._takingOwnershipOfDescriptors(input: input, output: nil)
}

func setupChannel() -> EventLoopFuture<Channel> {
eventLoop.assertInEventLoop()
return self._channelOptions.applyAllChannelOptions(to: channel).flatMap {
channelInitializer(channel)
}.flatMap {
eventLoop.assertInEventLoop()
let promise = eventLoop.makePromise(of: Void.self)
channel.registerAlreadyConfigured0(promise: promise)
return promise.futureResult
}.map {
channel
}.flatMapError { error in
channel.close0(error: error, mode: .all, promise: nil)
return channel.eventLoop.makeFailedFuture(error)
}
}
/// Create the `PipeChannel` with the provided output file descriptor.
///
/// The output file descriptor must be distinct.
///
/// - Note: If this method returns a succeeded future, SwiftNIO will close `output` when the `Channel`
/// becomes inactive. You _must not_ do any further operations to `output`, including `close`.
/// If this method returns a failed future, you still own the file descriptor and are responsible for
/// closing them.
///
/// - Parameters:
/// - output: The _Unix file descriptor_ for the output (ie. the write side).
/// - Returns: an `EventLoopFuture<Channel>` to deliver the `Channel`.
public func takingOwnershipOfDescriptor(
output: CInt
) -> EventLoopFuture<Channel> {
self._takingOwnershipOfDescriptors(input: nil, output: output)
}

if eventLoop.inEventLoop {
return setupChannel()
} else {
return eventLoop.flatSubmit {
setupChannel()
private func _takingOwnershipOfDescriptors(input: CInt?, output: CInt?) -> EventLoopFuture<Channel> {
let channelInitializer: @Sendable (Channel) -> EventLoopFuture<Channel> = {
let eventLoop = self.group.next()
let channelInitializer = self.channelInitializer
return { channel in
if let channelInitializer = channelInitializer {
return channelInitializer(channel).map { channel }
} else {
return eventLoop.makeSucceededFuture(channel)
}
}
}

}()
return self._takingOwnershipOfDescriptors(
input: input,
output: output,
channelInitializer: channelInitializer
)
}

@available(*, deprecated, renamed: "takingOwnershipOfDescriptor(inputOutput:)")
Expand Down Expand Up @@ -2154,9 +2164,7 @@ extension NIOPipeBootstrap {

/// Create the `PipeChannel` with the provided input and output file descriptors.
///
/// The input and output file descriptors must be distinct. If you have a single file descriptor, consider using
/// `ClientBootstrap.withConnectedSocket(descriptor:)` if it's a socket or
/// `NIOPipeBootstrap.takingOwnershipOfDescriptor` if it is not a socket.
/// The input and output file descriptors must be distinct.
///
/// - Note: If this method returns a succeeded future, SwiftNIO will close `input` and `output`
/// when the `Channel` becomes inactive. You _must not_ do any further operations `input` or
Expand All @@ -2179,61 +2187,140 @@ extension NIOPipeBootstrap {
try await self._takingOwnershipOfDescriptors(
input: input,
output: output,
channelInitializer: channelInitializer,
postRegisterTransformation: { $0.makeSucceededFuture($1) }
channelInitializer: channelInitializer
)
}

/// Create the `PipeChannel` with the provided input file descriptor.
///
/// The input file descriptor must be distinct.
///
/// - Note: If this method returns a succeeded future, SwiftNIO will close `input` when the `Channel`
/// becomes inactive. You _must not_ do any further operations to `input`, including `close`.
/// If this method returns a failed future, you still own the file descriptor and are responsible for
/// closing them.
///
/// - Parameters:
/// - input: The _Unix file descriptor_ for the input (ie. the read side).
/// - channelInitializer: A closure to initialize the channel. The return value of this closure is returned from the `connect`
/// method.
/// - Returns: The result of the channel initializer.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
func _takingOwnershipOfDescriptors<ChannelInitializerResult, PostRegistrationTransformationResult: Sendable>(
public func takingOwnershipOfDescriptor<Output: Sendable>(
input: CInt,
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
) async throws -> Output {
try await self._takingOwnershipOfDescriptors(
input: input,
output: nil,
channelInitializer: channelInitializer
)
}

/// Create the `PipeChannel` with the provided output file descriptor.
///
/// The output file descriptor must be distinct.
///
/// - Note: If this method returns a succeeded future, SwiftNIO will close `output` when the `Channel`
/// becomes inactive. You _must not_ do any further operations to `output`, including `close`.
/// If this method returns a failed future, you still own the file descriptor and are responsible for
/// closing them.
///
/// - Parameters:
/// - output: The _Unix file descriptor_ for the output (ie. the write side).
/// - channelInitializer: A closure to initialize the channel. The return value of this closure is returned from the `connect`
/// method.
/// - Returns: The result of the channel initializer.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public func takingOwnershipOfDescriptor<Output: Sendable>(
output: CInt,
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<ChannelInitializerResult>,
postRegisterTransformation: @escaping @Sendable (EventLoop, ChannelInitializerResult) -> EventLoopFuture<PostRegistrationTransformationResult>
) async throws -> PostRegistrationTransformationResult {
precondition(input >= 0 && output >= 0 && input != output,
"illegal file descriptor pair. The file descriptors \(input), \(output) " +
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<Output>
) async throws -> Output {
try await self._takingOwnershipOfDescriptors(
input: nil,
output: output,
channelInitializer: channelInitializer
)
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
func _takingOwnershipOfDescriptors<ChannelInitializerResult: Sendable>(
input: CInt?,
output: CInt?,
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<ChannelInitializerResult>
) async throws -> ChannelInitializerResult {
try await self._takingOwnershipOfDescriptors(
input: input,
output: output,
channelInitializer: channelInitializer
).get()
}

func _takingOwnershipOfDescriptors<ChannelInitializerResult: Sendable>(
input: CInt?,
output: CInt?,
channelInitializer: @escaping @Sendable (Channel) -> EventLoopFuture<ChannelInitializerResult>
) -> EventLoopFuture<ChannelInitializerResult> {
precondition(input ?? 0 >= 0 && output ?? 0 >= 0 && input != output,
"illegal file descriptor pair. The file descriptors \(String(describing: input)), \(String(describing: output)) " +
"must be distinct and both positive integers.")
precondition(!(input == nil && output == nil), "Either input or output has to be set")
let eventLoop = group.next()
let channelOptions = self._channelOptions
try self.validateFileDescriptorIsNotAFile(input)
try self.validateFileDescriptorIsNotAFile(output)

let channelInitializer = { (channel: Channel) -> EventLoopFuture<ChannelInitializerResult> in
let initializer = self.channelInitializer ?? { _ in eventLoop.makeSucceededFuture(()) }
return initializer(channel).flatMap { channelInitializer(channel) }
let channel: PipeChannel
let inputFileHandle: NIOFileHandle?
let outputFileHandle: NIOFileHandle?
do {
if let input = input {
try self.validateFileDescriptorIsNotAFile(input)
}
if let output = output {
try self.validateFileDescriptorIsNotAFile(output)
}

inputFileHandle = input.flatMap { NIOFileHandle(descriptor: $0) }
outputFileHandle = output.flatMap { NIOFileHandle(descriptor: $0) }
channel = try PipeChannel(
eventLoop: eventLoop as! SelectableEventLoop,
inputPipe: inputFileHandle,
outputPipe: outputFileHandle
)
} catch {
return eventLoop.makeFailedFuture(error)
}

let inputFileHandle = NIOFileHandle(descriptor: input)
let outputFileHandle = NIOFileHandle(descriptor: output)
let channel = try PipeChannel(
eventLoop: eventLoop as! SelectableEventLoop,
inputPipe: inputFileHandle,
outputPipe: outputFileHandle
)

@Sendable
func setupChannel() -> EventLoopFuture<PostRegistrationTransformationResult> {
func setupChannel() -> EventLoopFuture<ChannelInitializerResult> {
eventLoop.assertInEventLoop()
return channelOptions.applyAllChannelOptions(to: channel).flatMap { _ -> EventLoopFuture<ChannelInitializerResult> in
channelInitializer(channel)
}.flatMap { result in
eventLoop.assertInEventLoop()
let promise = eventLoop.makePromise(of: Void.self)
channel.registerAlreadyConfigured0(promise: promise)
return promise.futureResult.flatMap { postRegisterTransformation(eventLoop, result) }
return promise.futureResult.map { result }
}.flatMap { result -> EventLoopFuture<ChannelInitializerResult> in
if inputFileHandle == nil {
return channel.close(mode: .input).map { result }
}
if outputFileHandle == nil {
return channel.close(mode: .output).map { result }
}
return channel.selectableEventLoop.makeSucceededFuture(result)
}.flatMapError { error in
channel.close0(error: error, mode: .all, promise: nil)
return channel.eventLoop.makeFailedFuture(error)
}
}

if eventLoop.inEventLoop {
return try await setupChannel().get()
return setupChannel()
} else {
return try await eventLoop.flatSubmit {
return eventLoop.flatSubmit {
setupChannel()
}.get()
}
}
}
}
Expand Down
Loading

0 comments on commit 935dbdf

Please sign in to comment.