Skip to content

Commit

Permalink
Merge pull request #20 from gaetanzanella/feature/tasks
Browse files Browse the repository at this point in the history
Async API
  • Loading branch information
gaetanzanella authored Apr 16, 2023
2 parents 6e1c4dd + d89e9ce commit 2d8d28a
Show file tree
Hide file tree
Showing 22 changed files with 898 additions and 146 deletions.
37 changes: 8 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,50 +23,29 @@ let connection = SSHConnection(
)
)

connection.start(withTimeout: 3.0) { result in
switch result {
case .success:
// Handle connection
case .failure:
// Handle failure
}
}
try await connection.start()
```

Once connected, you can start executing concrete SSH operations on child communication channels. As `SSH Client` means to be a high level interface, you do not directly interact with them. Instead you use interfaces dedicated to your use case.

- SSH shell
```swift
connection.requestShell(withTimeout: 3.0) { result in
switch result {
case .success(let shell):
// Start shell operations
...
}
let shell = try await connection.requestShell()
for try await chunk in shell.data {
// ...
}
```

- SFTP client
```swift
connection.requestSFTPClient(withTimeout: 3.0) { result in
switch result {
case .success(let client):
// Start sftp operations
...
}
}
let sftpClient = try await connection.requestSFTPClient()
// sftp operations
```

- SSH commands
```swift
connection.execute("echo Hello\n", withTimeout: 3.0) { result in
switch result {
case .success(let response):
// Handle response
case .failure:
// Handle failure
}
}
let response = try await connection.execute("echo Hello\n")
// Handle response
```

You keep track of the connection state, using the dedicated `stateUpdateHandler` property:
Expand Down
63 changes: 63 additions & 0 deletions Sources/SSHClient/Async/Completion+Async.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@

import Foundation

func withCheckedResultContinuation<T>(_ operation: (_ completion: @escaping (Result<T, Error>) -> Void) -> Void) async throws -> T {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<T, Error>) in
operation { result in
switch result {
case .success(let success):
continuation.resume(returning: success)
case .failure(let failure):
continuation.resume(throwing: failure)
}
}
}
}

func withTaskCancellationHandler<T>(_ operation: (_ completion: @escaping (Result<T, Error>) -> Void) -> SSHTask) async throws -> T {
let action = TaskAction()
return try await withTaskCancellationHandler(operation: {
try await withCheckedResultContinuation { completion in
let task = operation(completion)
action.setTask(task)
}
}, onCancel: {
action.cancel()
})
}

// inspired by https://github.com/swift-server/async-http-client/blob/main/Sources/AsyncHTTPClient/AsyncAwait/HTTPClient%2Bexecute.swift#L155
actor TaskAction {
enum State {
case initialized
case task(SSHTask)
case ended
}

private var state: State = .initialized

nonisolated func setTask(_ task: SSHTask) {
Task {
await _setTask(task)
}
}

nonisolated func cancel() {
Task {
await _cancel()
}
}

private func _setTask(_ task: SSHTask) {
state = .task(task)
}

private func _cancel() {
switch state {
case .ended, .initialized:
break
case .task(let task):
task.cancel()
}
}
}
108 changes: 108 additions & 0 deletions Sources/SSHClient/Async/SFTPClient+Async.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@

import Foundation

public extension SFTPFile {
func readAttributes() async throws -> SFTPFileAttributes {
try await withCheckedResultContinuation { completion in
readAttributes(completion: completion)
}
}

func read(from offset: UInt64 = 0,
length: UInt32 = .max) async throws -> Data {
try await withCheckedResultContinuation { completion in
read(from: offset, length: length, completion: completion)
}
}

func write(_ data: Data,
at offset: UInt64 = 0) async throws {
try await withCheckedResultContinuation { completion in
write(data, at: offset, completion: completion)
}
}

func close() async throws {
try await withCheckedResultContinuation { completion in
close(completion: completion)
}
}
}

public extension SFTPClient {
func openFile(filePath: String,
flags: SFTPOpenFileFlags,
attributes: SFTPFileAttributes = .none) async throws -> SFTPFile {
try await withCheckedResultContinuation { completion in
openFile(
filePath: filePath,
flags: flags,
attributes: attributes,
completion: completion
)
}
}

func withFile(filePath: String,
flags: SFTPOpenFileFlags,
attributes: SFTPFileAttributes = .none,
_ closure: @escaping (SFTPFile) async -> Void) async throws {
try await withCheckedResultContinuation { completion in
withFile(
filePath: filePath,
flags: flags,
attributes: attributes, { file, close in
Task {
await closure(file)
close()
}
},
completion: completion
)
}
}

func listDirectory(atPath path: String) async throws -> [SFTPPathComponent] {
try await withCheckedResultContinuation { completion in
listDirectory(atPath: path, completion: completion)
}
}

func getAttributes(at filePath: String) async throws -> SFTPFileAttributes {
try await withCheckedResultContinuation { completion in
getAttributes(at: filePath, completion: completion)
}
}

func createDirectory(atPath path: String,
attributes: SFTPFileAttributes = .none) async throws {
try await withCheckedResultContinuation { completion in
createDirectory(atPath: path, attributes: attributes, completion: completion)
}
}

func moveItem(atPath current: String,
toPath destination: String) async throws {
try await withCheckedResultContinuation { completion in
moveItem(atPath: current, toPath: destination, completion: completion)
}
}

func removeDirectory(atPath path: String) async throws {
try await withCheckedResultContinuation { completion in
removeDirectory(atPath: path, completion: completion)
}
}

func removeFile(atPath path: String) async throws {
try await withCheckedResultContinuation { completion in
removeFile(atPath: path, completion: completion)
}
}

func close() async {
await withCheckedContinuation { continuation in
close(completion: continuation.resume)
}
}
}
88 changes: 88 additions & 0 deletions Sources/SSHClient/Async/SSHConnection+Async.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@

import Foundation

public extension SSHConnection {
typealias AsyncSSHCommandResponse = AsyncThrowingStream<SSHCommandResponseChunk, Error>

func start(withTimeout timeout: TimeInterval? = nil) async throws {
try await withCheckedResultContinuation { completion in
start(withTimeout: timeout, completion: completion)
}
}

func cancel() async {
await withCheckedContinuation { continuation in
cancel(completion: continuation.resume)
}
}

func execute(_ command: SSHCommand,
withTimeout timeout: TimeInterval? = nil) async throws -> SSHCommandResponse {
try await withTaskCancellationHandler { completion in
execute(command, withTimeout: timeout, completion: completion)
}
}

func requestShell(withTimeout timeout: TimeInterval? = nil) async throws -> SSHShell {
try await withTaskCancellationHandler { completion in
requestShell(withTimeout: timeout, completion: completion)
}
}

func requestSFTPClient(withTimeout timeout: TimeInterval? = nil) async throws -> SFTPClient {
try await withTaskCancellationHandler { completion in
requestSFTPClient(withTimeout: timeout, completion: completion)
}
}

func stream(_ command: SSHCommand,
withTimeout timeout: TimeInterval? = nil) async throws -> AsyncSSHCommandResponse {
try await withTaskCancellationHandler { completion in
enum State {
case initializing
case streaming(AsyncSSHCommandResponse.Continuation)
}
let action = TaskAction()
// Each callback are executed on the internal serial ssh connection queue.
// This is thread safe to modify the state inside them.
var state: State = .initializing
let stream = { (responseChunk: SSHCommandResponseChunk) in
switch state {
case .initializing:
let response = AsyncSSHCommandResponse { continuation in
state = .streaming(continuation)
continuation.onTermination = { _ in
action.cancel()
}
continuation.yield(responseChunk)
}
completion(.success(response))
case .streaming(let continuation):
continuation.yield(responseChunk)
}
}
let resultTask = execute(
command,
withTimeout: timeout
) { chunk in
stream(.chunk(chunk))
} onStatus: { st in
stream(.status(st))
} completion: { result in
switch state {
case .initializing:
completion(.failure(SSHConnectionError.unknown))
case .streaming(let continuation):
switch result {
case .success:
continuation.finish()
case .failure(let error):
continuation.finish(throwing: error)
}
}
}
action.setTask(resultTask)
return resultTask
}
}
}
37 changes: 37 additions & 0 deletions Sources/SSHClient/Async/SSHShell+Async.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//
// SSHShell+async.swift
// Atomics
//
// Created by Gaetan Zanella on 10/04/2023.
//

import Foundation

public extension SSHShell {
typealias AsyncBytes = AsyncThrowingStream<Data, Error>

var data: AsyncBytes {
AsyncBytes { continuation in
let readID = addReadListener { continuation.yield($0) }
let closeID = addCloseListener { error in
continuation.finish(throwing: error)
}
continuation.onTermination = { [weak self] _ in
self?.removeReadListener(readID)
self?.removeCloseListener(closeID)
}
}
}

func write(_ data: Data) async throws {
try await withCheckedResultContinuation { completion in
write(data, completion: completion)
}
}

func close() async throws {
try await withCheckedResultContinuation { completion in
close(completion: completion)
}
}
}
Loading

0 comments on commit 2d8d28a

Please sign in to comment.