Skip to content

Commit

Permalink
Update package
Browse files Browse the repository at this point in the history
  • Loading branch information
vmanot committed Oct 28, 2024
1 parent 453be6b commit 07731ee
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 126 deletions.
33 changes: 26 additions & 7 deletions Sources/Merge/Intramodular/Process/SystemShell.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,41 @@ extension SystemShell {
input: String? = nil,
environment: Environment = .zsh
) async throws -> Process.RunResult {
let (launchPath, arguments) = try await environment.resolve(command: command)
let process = try await _AsyncProcess(
command: command,
input: input,
environment: environmentVariables,
currentDirectoryURL: currentDirectoryURL,
options: options
)

let process = try _AsyncProcess(
return try await process.run()
}
}

extension _AsyncProcess {
public convenience init(
command: String,
input: String? = nil,
shell: SystemShell.Environment = .zsh,
environment: [String: String]? = nil,
currentDirectoryURL: URL? = nil,
options: [_AsyncProcess.Option]?
) async throws {
let (launchPath, arguments) = try await shell.resolve(command: command)

try self.init(
executableURL: URL(fileURLWithPath: launchPath),
arguments: arguments,
environment: self.environmentVariables.merging(environmentVariables, uniquingKeysWith: { $1 }),
currentDirectoryURL: currentDirectoryURL ?? self.currentDirectoryURL,
environment: environment ?? ProcessInfo.processInfo.environment,
currentDirectoryURL: currentDirectoryURL,
options: options
)

if let input = input?.data(using: .utf8), !input.isEmpty, let handle = process.standardInputPipe?.fileHandleForWriting {
if let input = input?.data(using: .utf8), !input.isEmpty, let handle = standardInputPipe?.fileHandleForWriting {
try? handle.write(contentsOf: input)
try? handle.close()
}

return try await process.run()
}
}
#else
Expand Down
244 changes: 125 additions & 119 deletions Sources/Merge/Intramodular/Process/_AsyncProcess.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import System
@available(macOS 11.0, iOS 14.0, watchOS 7.0, tvOS 14.0, *)
@available(macCatalyst, unavailable)
public class _AsyncProcess: Logging {
public let options: [Option]
public let options: Set<Option>
#if os(macOS)
public let process: Process
#endif
Expand All @@ -27,8 +27,6 @@ public class _AsyncProcess: Logging {
package var _standardOutputPipe: Pipe?
package var _standardErrorPipe: Pipe?

private var standardOutputCache = ""

@_OSUnfairLocked
private var _resolvedRunResult: Result<_ProcessRunResult, Error>?

Expand All @@ -40,12 +38,14 @@ public class _AsyncProcess: Logging {
existingProcess: Process?,
options: [_AsyncProcess.Option]
) throws {
let options: Set<_AsyncProcess.Option> = Set(options ?? [])

if let existingProcess {
assert(!existingProcess.isRunning)

self.process = existingProcess
} else {
if Set(options).isSuperset(of: [._useAppleScript, ._useAppleScript]) {
if options.isSuperset(of: [._useAppleScript, ._useAppleScript]) {
throw Never.Reason.unsupported
} else if options.contains(._useAuthorizationExecuteWithPrivileges) {
self.process = _SecAuthorizedProcess()
Expand All @@ -56,7 +56,7 @@ public class _AsyncProcess: Logging {
}
}

self.options = options
self.options = Set(options)

_registerAndSetUpIO(existingProcess: existingProcess)
}
Expand All @@ -68,9 +68,9 @@ public class _AsyncProcess: Logging {
currentDirectoryURL: URL?,
options: [_AsyncProcess.Option]? = nil
) throws {
let options: [_AsyncProcess.Option] = options ?? []
let options: Set<_AsyncProcess.Option> = Set(options ?? [])

if Set(options).isSuperset(of: [._useAppleScript, ._useAppleScript]) {
if options.isSuperset(of: [._useAppleScript, ._useAppleScript]) {
self.process = _SecAuthorizedProcess()
} else if options.contains(._useAuthorizationExecuteWithPrivileges) {
self.process = _SecAuthorizedProcess()
Expand Down Expand Up @@ -106,7 +106,40 @@ public class _AsyncProcess: Logging {

#if os(macOS) || targetEnvironment(macCatalyst)
extension _AsyncProcess {
var standardInputPipe: Pipe? {
public var isRunning: Bool {
state == .running
}

public var state: State {
if process.isRunning {
return .running
}

var terminationReason: ProcessTerminationError?

if process is _SecAuthorizedProcess {
if !processDidStart.isOpen {
return .notLaunch
}
} else {
if !processDidStart.isOpen && process.processIdentifier == 0 {
return .notLaunch
}
}

terminationReason = ProcessTerminationError(_from: process)

if let terminationReason = terminationReason {
return .terminated(
status: Int(process.terminationStatus),
reason: terminationReason.reason
)
}

return .notLaunch
}

public var standardInputPipe: Pipe? {
if state == .notLaunch, _standardInputPipe == nil {
_standardInputPipe = Pipe()
process.standardInput = _standardInputPipe
Expand Down Expand Up @@ -231,6 +264,79 @@ extension _AsyncProcess {

assert(!process.isRunning)
}

private func _run() async throws {
do {
_dumpCallStackIfNeeded()

guard !isWaiting else {
return
}

isWaiting = true

func readData() async throws {
if !options.contains(._useAuthorizationExecuteWithPrivileges) {
try await _readStdoutStderrUntilEnd()
} else {
try await _readStdoutStderrUntilEnd(ignoreStderr: true)
}
}

let readStdoutStderrTask = Task<Void, Error>.detached(priority: .high) {
try await readData()
}

try? await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
@MutexProtected
var didResume: Bool = false

process.terminationHandler = { (process: Process) in
Task<Void, Never>.detached(priority: .userInitiated) {
await Task.yield()

if let terminationError = process.terminationError {
continuation.resume(throwing: terminationError)
} else {
assert(!process.isRunning)

continuation.resume()
}

$didResume.assignedValue = true
}
}

do {
_willRunRightAfterThis()

try process.run()
} catch {
continuation.resume(throwing: error)
}

_didJustExit(didResume: { didResume })
}

try await readStdoutStderrTask.value

assert(!process.isRunning)

do {
try _standardOutputPipe?.fileHandleForReading.close()
try _standardErrorPipe?.fileHandleForReading.close()
try _standardInputPipe?.fileHandleForWriting.close()
} catch {
runtimeIssue("Failed to close a pipe.")
}

_stashRunResultAndTeardown(error: nil)
} catch {
_stashRunResultAndTeardown(error: error)

throw error
}
}

private func _readStdoutStderrUntilEnd(
ignoreStderr: Bool = false
Expand Down Expand Up @@ -351,11 +457,21 @@ extension _AsyncProcess {

let pipeName: Process.PipeName = try self.name(of: pipe)

let forwardStdoutStderr: Bool = options.contains(._forwardStdoutStderr)

switch pipeName {
case .standardOutput:
_publishers.standardOutputPublisher.send(data)

if forwardStdoutStderr {
FileHandle.standardOutput.write(data)
}
case .standardError:
_publishers.standardErrorPublisher.send(data)

if forwardStdoutStderr {
FileHandle.standardOutput.write(data)
}
default:
break
}
Expand All @@ -373,80 +489,7 @@ extension _AsyncProcess {
self._standardErrorString += dataAsString
}
}

private func _run() async throws {
do {
_dumpCallStackIfNeeded()

guard !isWaiting else {
return
}

isWaiting = true

func readData() async throws {
if !options.contains(._useAuthorizationExecuteWithPrivileges) {
try await _readStdoutStderrUntilEnd()
} else {
try await _readStdoutStderrUntilEnd(ignoreStderr: true)
}
}

let readStdoutStderrTask = Task<Void, Error>.detached(priority: .high) {
try await readData()
}

try? await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
@MutexProtected
var didResume: Bool = false

process.terminationHandler = { (process: Process) in
Task<Void, Never>.detached(priority: .userInitiated) {
await Task.yield()

if let terminationError = process.terminationError {
continuation.resume(throwing: terminationError)
} else {
assert(!process.isRunning)

continuation.resume()
}

$didResume.assignedValue = true
}
}

do {
_willRunRightAfterThis()

try process.run()
} catch {
continuation.resume(throwing: error)
}

_didJustExit(didResume: { didResume })
}

try await readStdoutStderrTask.value

assert(!process.isRunning)

do {
try _standardOutputPipe?.fileHandleForReading.close()
try _standardErrorPipe?.fileHandleForReading.close()
try _standardInputPipe?.fileHandleForWriting.close()
} catch {
runtimeIssue("Failed to close a pipe.")
}

_stashRunResultAndTeardown(error: nil)
} catch {
_stashRunResultAndTeardown(error: error)

throw error
}
}

private func _dumpCallStackIfNeeded() {
do {
if Thread._isMainThread {
Expand Down Expand Up @@ -550,44 +593,6 @@ extension _AsyncProcess {
}
#endif

#if os(macOS) || targetEnvironment(macCatalyst)
@available(macCatalyst, unavailable)
extension _AsyncProcess {
public var isRunning: Bool {
state == .running
}

public var state: State {
if process.isRunning {
return .running
}

var terminationReason: ProcessTerminationError?

if process is _SecAuthorizedProcess {
if !processDidStart.isOpen {
return .notLaunch
}
} else {
if !processDidStart.isOpen && process.processIdentifier == 0 {
return .notLaunch
}
}

terminationReason = ProcessTerminationError(_from: process)

if let terminationReason = terminationReason {
return .terminated(
status: Int(process.terminationStatus),
reason: terminationReason.reason
)
}

return .notLaunch
}
}
#endif

// MARK: - Initializers

#if os(macOS) || targetEnvironment(macCatalyst)
Expand Down Expand Up @@ -669,6 +674,7 @@ extension _AsyncProcess {
public enum Option: Hashable {
case _useAppleScript
case _useAuthorizationExecuteWithPrivileges
case _forwardStdoutStderr
}
}

Expand Down

0 comments on commit 07731ee

Please sign in to comment.