From 82602c8adec78ecc8f7c5efa76269c5a52fd1e7a Mon Sep 17 00:00:00 2001 From: "Dr. Brandon Wiley" Date: Fri, 5 Nov 2021 14:53:45 -0500 Subject: [PATCH] Now with protocols --- Package.swift | 3 +- Sources/Transmission/Connection.swift | 392 +---------------- Sources/Transmission/Listener.swift | 52 +-- .../Transmission/TransmissionConnection.swift | 408 ++++++++++++++++++ .../Transmission/TransmissionListener.swift | 60 +++ .../TransmissionTests/TransmissionTests.swift | 4 +- 6 files changed, 486 insertions(+), 433 deletions(-) create mode 100644 Sources/Transmission/TransmissionConnection.swift create mode 100644 Sources/Transmission/TransmissionListener.swift diff --git a/Package.swift b/Package.swift index 77ac202..ca84513 100644 --- a/Package.swift +++ b/Package.swift @@ -52,13 +52,14 @@ let package = Package( .package(url: "https://github.com/OperatorFoundation/Datable", from: "3.1.1"), .package(url: "https://github.com/OperatorFoundation/Transport", from: "2.3.8"), .package(url: "https://github.com/OperatorFoundation/TransmissionLinux", from: "0.4.0"), + .package(url: "https://github.com/OperatorFoundation/SwiftQueue", from: "0.1.1") ], targets: [ // Targets are the basic building blocks of a package. A target can define a module or a test suite. // Targets can depend on other targets in this package, and on products in packages which this package depends on. .target( name: "Transmission", - dependencies: ["Chord", "Datable", "Transport", "TransmissionLinux", .product(name: "Logging", package: "swift-log")] + dependencies: ["Chord", "Datable", "Transport", "TransmissionLinux", "SwiftQueue", .product(name: "Logging", package: "swift-log")] ), .testTarget( name: "TransmissionTests", diff --git a/Sources/Transmission/Connection.swift b/Sources/Transmission/Connection.swift index 77fa7ae..d95e040 100644 --- a/Sources/Transmission/Connection.swift +++ b/Sources/Transmission/Connection.swift @@ -6,388 +6,24 @@ import Logging #if (os(macOS) || os(iOS) || os(watchOS) || os(tvOS)) -public class Connection +public protocol Connection { - var connection: Transport.Connection - var connectLock = DispatchGroup() - var readLock = DispatchGroup() - var writeLock = DispatchGroup() - let log: Logger? + init?(host: String, port: Int, type: ConnectionType, logger: Logger?) + init?(transport: Transport.Connection, logger: Logger?) - public convenience init?(host: String, port: Int, type: ConnectionType = .tcp, logger: Logger? = nil) - { - - let nwhost = NWEndpoint.Host(host) - let port16 = UInt16(port) - let nwport = NWEndpoint.Port(integerLiteral: port16) - - switch type - { - case .tcp: - let nwconnection = NWConnection(host: nwhost, port: nwport, using: .tcp) - self.init(connection: nwconnection, logger: logger) - case .udp: - let nwconnection = NWConnection(host: nwhost, port: nwport, using: .udp) - self.init(connection: nwconnection, logger: logger) - } - } - - public convenience init?(connection: NWConnection, logger: Logger? = nil) - { - self.init(transport: connection, logger: logger) - } - - public init?(transport: Transport.Connection, logger: Logger? = nil) - { - self.log = logger - maybeLog(message: "Initializing Transmission connection", logger: self.log) - - self.connection = transport - - var success = false - - self.connectLock.enter() - self.connection.stateUpdateHandler = - { - (state) in - - switch state - { - case .ready: - success = true - self.connectLock.leave() - return - case .cancelled: - self.failConnect() - return - case .failed(_): - self.failConnect() - return - case .waiting(_): - self.failConnect() - return - default: - return - } - } - - self.connection.start(queue: .global()) - - connectLock.wait() - - guard success else {return nil} - } - - func failConnect() - { - maybeLog(message: "Failed to make a Transmission connection", logger: self.log) - self.connection.stateUpdateHandler = nil - self.connection.cancel() - self.connectLock.leave() - } - // Reads exactly size bytes - public func read(size: Int) -> Data? - { - maybeLog(message: "Transmission read(size:) called \(Thread.current)", logger: self.log) - var result: Data? - - self.readLock.enter() - maybeLog(message: "Transmission read's connection.receive type: \(type(of: self.connection)) size: \(size)", logger: self.log) - self.connection.receive(minimumIncompleteLength: size, maximumLength: size) - { - (maybeData, maybeContext, isComplete, maybeError) in - - maybeLog(message: "entered Transmission read's receive callback", logger: self.log) - - guard maybeError == nil else - { - maybeLog(message: "leaving Transmission read's receive callback with error: \(String(describing: maybeError))", logger: self.log) - self.readLock.leave() - return - } - - if let data = maybeData - { - result = data - } - - maybeLog(message: "leaving Transmission read's receive callback", logger: self.log) - - self.readLock.leave() - } - - readLock.wait() - - maybeLog(message: "Transmission read finished!", logger: self.log) - - return result - } - - // reads up to maxSize bytes - public func read(maxSize: Int) -> Data? - { - maybeLog(message: "Transmission read(maxSize:) called \(Thread.current)", logger: self.log) - var result: Data? - - self.readLock.enter() - maybeLog(message: "Transmission read's connection.receive type: \(type(of: self.connection)) maxSize: \(maxSize)", logger: self.log) - self.connection.receive(minimumIncompleteLength: 1, maximumLength: maxSize) - { - (maybeData, maybeContext, isComplete, maybeError) in - - maybeLog(message: "entered Transmission read's receive callback", logger: self.log) - - guard maybeError == nil else - { - maybeLog(message: "leaving Transmission read's receive callback with error: \(String(describing: maybeError))", logger: self.log) - self.readLock.leave() - return - } - - if let data = maybeData - { - result = data - } - - maybeLog(message: "leaving Transmission read's receive callback", logger: self.log) - - self.readLock.leave() - } - - readLock.wait() - - maybeLog(message: "Transmission read finished!", logger: self.log) - - return result - } - - public func readWithLengthPrefix(prefixSizeInBits: Int) -> Data? - { - maybeLog(message: "Transmission readWithLengthPrefix called \(Thread.current)", logger: self.log) - var result: Data? - - self.readLock.enter() - - var maybeCount: Int? = nil - - let countLock = DispatchGroup() - countLock.enter() - switch prefixSizeInBits - { - case 8: - self.connection.receive(minimumIncompleteLength: 1, maximumLength: 1) - { - (maybeData, maybeContext, isComplete, maybeError) in - - guard maybeError == nil else - { - countLock.leave() - return - } - - if let data = maybeData - { - if let count = data.maybeNetworkUint8 - { - maybeCount = Int(count) - } - } - countLock.leave() - } - case 16: - self.connection.receive(minimumIncompleteLength: 2, maximumLength: 2) - { - (maybeData, maybeContext, isComplete, maybeError) in - - guard maybeError == nil else - { - countLock.leave() - return - } - - if let data = maybeData - { - if let count = data.maybeNetworkUint16 - { - maybeCount = Int(count) - } - } - countLock.leave() - } - case 32: - self.connection.receive(minimumIncompleteLength: 4, maximumLength: 4) - { - (maybeData, maybeContext, isComplete, maybeError) in - - guard maybeError == nil else - { - countLock.leave() - return - } - - if let data = maybeData - { - if let count = data.maybeNetworkUint32 - { - maybeCount = Int(count) - } - } - countLock.leave() - } - case 64: - self.connection.receive(minimumIncompleteLength: 8, maximumLength: 8) - { - (maybeData, maybeContext, isComplete, maybeError) in - - guard maybeError == nil else - { - countLock.leave() - return - } - - if let data = maybeData - { - if let count = data.maybeNetworkUint64 - { - maybeCount = Int(count) - } - } - countLock.leave() - } - default: - countLock.leave() - } - - countLock.wait() + func read(size: Int) -> Data? - guard let size = maybeCount else - { - readLock.leave() - return nil - } - - self.connection.receive(minimumIncompleteLength: size, maximumLength: size) - { - (maybeData, maybeContext, isComplete, maybeError) in - - guard maybeError == nil else - { - self.readLock.leave() - return - } - - if let data = maybeData - { - result = data - } - - self.readLock.leave() - } - - readLock.wait() - - return result - } - - public func write(string: String) -> Bool - { - let data = string.data - return write(data: data) - } - - public func write(data: Data) -> Bool - { - maybeLog(message: "Transmission write called \(Thread.current)", logger: self.log) - var success = false - - self.writeLock.enter() - self.connection.send(content: data, contentContext: NWConnection.ContentContext.defaultMessage, isComplete: false, completion: NWConnection.SendCompletion.contentProcessed( - { - (maybeError) in - - guard maybeError == nil else - { - success = false - self.writeLock.leave() - return - } - - success = true - self.writeLock.leave() - return - })) - - self.writeLock.wait() - - maybeLog(message: "Transmission write finished \(Thread.current)", logger: self.log) - - return success - } - - public func writeWithLengthPrefix(data: Data, prefixSizeInBits: Int) -> Bool - { - var maybeCountData: Data? = nil - - switch prefixSizeInBits - { - case 8: - let count = UInt8(data.count) - maybeCountData = count.maybeNetworkData - case 16: - let count = UInt16(data.count) - maybeCountData = count.maybeNetworkData - case 32: - let count = UInt32(data.count) - maybeCountData = count.maybeNetworkData - case 64: - let count = UInt64(data.count) - maybeCountData = count.maybeNetworkData - default: - return false - } - - guard let countData = maybeCountData else {return false} - - maybeLog(message: "Transmission writeWithLengthPrefix called \(Thread.current)", logger: self.log) - var success = false - - self.writeLock.enter() - - self.connection.send(content: countData, contentContext: NWConnection.ContentContext.defaultMessage, isComplete: false, completion: NWConnection.SendCompletion.contentProcessed( - { - (maybeError) in - - guard maybeError == nil else - { - success = false - self.writeLock.leave() - return - } - - self.connection.send(content: data, contentContext: NWConnection.ContentContext.defaultMessage, isComplete: false, completion: NWConnection.SendCompletion.contentProcessed( - { - (maybeError) in - - guard maybeError == nil else - { - success = false - self.writeLock.leave() - return - } + // reads up to maxSize bytes + func read(maxSize: Int) -> Data? - success = true - self.writeLock.leave() - return - })) - })) + func readWithLengthPrefix(prefixSizeInBits: Int) -> Data? - self.writeLock.wait() + func write(string: String) -> Bool - maybeLog(message: "Transmission writeWithLengthPrefix finished \(Thread.current)", logger: self.log) + func write(data: Data) -> Bool - return success - } + func writeWithLengthPrefix(data: Data, prefixSizeInBits: Int) -> Bool } public enum ConnectionType @@ -396,14 +32,6 @@ public enum ConnectionType case tcp } -public func maybeLog(message: String, logger: Logger? = nil) { - if logger != nil { - logger!.debug("\(message)") - } else { - print(message) - } -} - #else @_exported import TransmissionLinux diff --git a/Sources/Transmission/Listener.swift b/Sources/Transmission/Listener.swift index b063730..eb11a96 100644 --- a/Sources/Transmission/Listener.swift +++ b/Sources/Transmission/Listener.swift @@ -6,54 +6,10 @@ // import Foundation -import Network -import Chord +import Logging -#if (os(macOS) || os(iOS) || os(watchOS) || os(tvOS)) - -public class Listener +public protocol Listener { - let listener: NWListener - let queue: BlockingQueue = BlockingQueue() - let lock: DispatchGroup = DispatchGroup() - - public init?(port: Int, type: ConnectionType = .tcp) - { - let port16 = UInt16(port) - let nwport = NWEndpoint.Port(integerLiteral: port16) - - var params: NWParameters! - switch type - { - case .tcp: - params = NWParameters.tcp - case .udp: - params = NWParameters.udp - } - - guard let listener = try? NWListener(using: params, on: nwport) else {return nil} - self.listener = listener - - self.listener.newConnectionHandler = - { - nwconnection in - - guard let connection = Connection(connection: nwconnection) else {return} - - self.queue.enqueue(element: connection) - } - - self.listener.start(queue: .global()) - } - - public func accept() -> Connection - { - return self.queue.dequeue() - } + init?(port: Int, type: ConnectionType, logger: Logger?) + func accept() -> Transmission.Connection } - -#else - -@_exported import TransmissionLinux - -#endif diff --git a/Sources/Transmission/TransmissionConnection.swift b/Sources/Transmission/TransmissionConnection.swift new file mode 100644 index 0000000..8dc764b --- /dev/null +++ b/Sources/Transmission/TransmissionConnection.swift @@ -0,0 +1,408 @@ +import Foundation +import Network +import Datable +import Transport +import Logging +import SwiftQueue + +#if (os(macOS) || os(iOS) || os(watchOS) || os(tvOS)) + +public class TransmissionConnection: Transmission.Connection +{ + var connection: Transport.Connection + var connectLock = DispatchGroup() + var readLock = DispatchGroup() + var writeLock = DispatchGroup() + let log: Logger? + let states: Queue = Queue() + + public required init?(host: String, port: Int, type: ConnectionType = .tcp, logger: Logger? = nil) + { + + let nwhost = NWEndpoint.Host(host) + let port16 = UInt16(port) + let nwport = NWEndpoint.Port(integerLiteral: port16) + self.log = logger + + switch type + { + case .tcp: + let nwconnection = NWConnection(host: nwhost, port: nwport, using: .tcp) + self.connection = nwconnection + self.connection.stateUpdateHandler = self.handleState + self.connection.start(queue: .global()) + + guard let success = self.states.dequeue() else {return nil} + guard success else {return nil} + case .udp: + let nwconnection = NWConnection(host: nwhost, port: nwport, using: .udp) + self.connection = nwconnection + self.connection.stateUpdateHandler = self.handleState + self.connection.start(queue: .global()) + + guard let success = self.states.dequeue() else {return nil} + guard success else {return nil} + } + } + + public required init?(transport: Transport.Connection, logger: Logger? = nil) + { + self.log = logger + maybeLog(message: "Initializing Transmission connection", logger: self.log) + + self.connection = transport + self.connection.stateUpdateHandler = self.handleState + self.connection.start(queue: .global()) + + guard let success = self.states.dequeue() else {return nil} + guard success else {return nil} + } + + func handleState(state: NWConnection.State) + { + switch state + { + case .ready: + self.states.enqueue(true) + return + case .cancelled: + self.states.enqueue(false) + self.failConnect() + return + case .failed(_): + self.states.enqueue(false) + self.failConnect() + return + case .waiting(_): + self.states.enqueue(false) + self.failConnect() + return + default: + return + } + } + + func failConnect() + { + maybeLog(message: "Failed to make a Transmission connection", logger: self.log) + self.connection.stateUpdateHandler = nil + self.connection.cancel() + } + + // Reads exactly size bytes + public func read(size: Int) -> Data? + { + maybeLog(message: "Transmission read(size:) called \(Thread.current)", logger: self.log) + var result: Data? + + self.readLock.enter() + maybeLog(message: "Transmission read's connection.receive type: \(type(of: self.connection)) size: \(size)", logger: self.log) + self.connection.receive(minimumIncompleteLength: size, maximumLength: size) + { + (maybeData, maybeContext, isComplete, maybeError) in + + maybeLog(message: "entered Transmission read's receive callback", logger: self.log) + + guard maybeError == nil else + { + maybeLog(message: "leaving Transmission read's receive callback with error: \(String(describing: maybeError))", logger: self.log) + self.readLock.leave() + return + } + + if let data = maybeData + { + result = data + } + + maybeLog(message: "leaving Transmission read's receive callback", logger: self.log) + + self.readLock.leave() + } + + readLock.wait() + + maybeLog(message: "Transmission read finished!", logger: self.log) + + return result + } + + // reads up to maxSize bytes + public func read(maxSize: Int) -> Data? + { + maybeLog(message: "Transmission read(maxSize:) called \(Thread.current)", logger: self.log) + var result: Data? + + self.readLock.enter() + maybeLog(message: "Transmission read's connection.receive type: \(type(of: self.connection)) maxSize: \(maxSize)", logger: self.log) + self.connection.receive(minimumIncompleteLength: 1, maximumLength: maxSize) + { + (maybeData, maybeContext, isComplete, maybeError) in + + maybeLog(message: "entered Transmission read's receive callback", logger: self.log) + + guard maybeError == nil else + { + maybeLog(message: "leaving Transmission read's receive callback with error: \(String(describing: maybeError))", logger: self.log) + self.readLock.leave() + return + } + + if let data = maybeData + { + result = data + } + + maybeLog(message: "leaving Transmission read's receive callback", logger: self.log) + + self.readLock.leave() + } + + readLock.wait() + + maybeLog(message: "Transmission read finished!", logger: self.log) + + return result + } + + public func readWithLengthPrefix(prefixSizeInBits: Int) -> Data? + { + maybeLog(message: "Transmission readWithLengthPrefix called \(Thread.current)", logger: self.log) + var result: Data? + + self.readLock.enter() + + var maybeCount: Int? = nil + + let countLock = DispatchGroup() + countLock.enter() + switch prefixSizeInBits + { + case 8: + self.connection.receive(minimumIncompleteLength: 1, maximumLength: 1) + { + (maybeData, maybeContext, isComplete, maybeError) in + + guard maybeError == nil else + { + countLock.leave() + return + } + + if let data = maybeData + { + if let count = data.maybeNetworkUint8 + { + maybeCount = Int(count) + } + } + countLock.leave() + } + case 16: + self.connection.receive(minimumIncompleteLength: 2, maximumLength: 2) + { + (maybeData, maybeContext, isComplete, maybeError) in + + guard maybeError == nil else + { + countLock.leave() + return + } + + if let data = maybeData + { + if let count = data.maybeNetworkUint16 + { + maybeCount = Int(count) + } + } + countLock.leave() + } + case 32: + self.connection.receive(minimumIncompleteLength: 4, maximumLength: 4) + { + (maybeData, maybeContext, isComplete, maybeError) in + + guard maybeError == nil else + { + countLock.leave() + return + } + + if let data = maybeData + { + if let count = data.maybeNetworkUint32 + { + maybeCount = Int(count) + } + } + countLock.leave() + } + case 64: + self.connection.receive(minimumIncompleteLength: 8, maximumLength: 8) + { + (maybeData, maybeContext, isComplete, maybeError) in + + guard maybeError == nil else + { + countLock.leave() + return + } + + if let data = maybeData + { + if let count = data.maybeNetworkUint64 + { + maybeCount = Int(count) + } + } + countLock.leave() + } + default: + countLock.leave() + } + + countLock.wait() + + guard let size = maybeCount else + { + readLock.leave() + return nil + } + + self.connection.receive(minimumIncompleteLength: size, maximumLength: size) + { + (maybeData, maybeContext, isComplete, maybeError) in + + guard maybeError == nil else + { + self.readLock.leave() + return + } + + if let data = maybeData + { + result = data + } + + self.readLock.leave() + } + + readLock.wait() + + return result + } + + public func write(string: String) -> Bool + { + let data = string.data + return write(data: data) + } + + public func write(data: Data) -> Bool + { + maybeLog(message: "Transmission write called \(Thread.current)", logger: self.log) + var success = false + + self.writeLock.enter() + self.connection.send(content: data, contentContext: NWConnection.ContentContext.defaultMessage, isComplete: false, completion: NWConnection.SendCompletion.contentProcessed( + { + (maybeError) in + + guard maybeError == nil else + { + success = false + self.writeLock.leave() + return + } + + success = true + self.writeLock.leave() + return + })) + + self.writeLock.wait() + + maybeLog(message: "Transmission write finished \(Thread.current)", logger: self.log) + + return success + } + + public func writeWithLengthPrefix(data: Data, prefixSizeInBits: Int) -> Bool + { + var maybeCountData: Data? = nil + + switch prefixSizeInBits + { + case 8: + let count = UInt8(data.count) + maybeCountData = count.maybeNetworkData + case 16: + let count = UInt16(data.count) + maybeCountData = count.maybeNetworkData + case 32: + let count = UInt32(data.count) + maybeCountData = count.maybeNetworkData + case 64: + let count = UInt64(data.count) + maybeCountData = count.maybeNetworkData + default: + return false + } + + guard let countData = maybeCountData else {return false} + + maybeLog(message: "Transmission writeWithLengthPrefix called \(Thread.current)", logger: self.log) + var success = false + + self.writeLock.enter() + + self.connection.send(content: countData, contentContext: NWConnection.ContentContext.defaultMessage, isComplete: false, completion: NWConnection.SendCompletion.contentProcessed( + { + (maybeError) in + + guard maybeError == nil else + { + success = false + self.writeLock.leave() + return + } + + self.connection.send(content: data, contentContext: NWConnection.ContentContext.defaultMessage, isComplete: false, completion: NWConnection.SendCompletion.contentProcessed( + { + (maybeError) in + + guard maybeError == nil else + { + success = false + self.writeLock.leave() + return + } + + success = true + self.writeLock.leave() + return + })) + })) + + self.writeLock.wait() + + maybeLog(message: "Transmission writeWithLengthPrefix finished \(Thread.current)", logger: self.log) + + return success + } +} + +public func maybeLog(message: String, logger: Logger? = nil) { + if logger != nil { + logger!.debug("\(message)") + } else { + print(message) + } +} + +#else + +@_exported import TransmissionLinux + +#endif diff --git a/Sources/Transmission/TransmissionListener.swift b/Sources/Transmission/TransmissionListener.swift new file mode 100644 index 0000000..2275ccf --- /dev/null +++ b/Sources/Transmission/TransmissionListener.swift @@ -0,0 +1,60 @@ +// +// Listener.swift +// +// +// Created by Dr. Brandon Wiley on 8/31/20. +// + +import Foundation +import Network +import Chord +import Logging + +#if (os(macOS) || os(iOS) || os(watchOS) || os(tvOS)) + +public class TransmissionListener: Listener +{ + let listener: NWListener + let queue: BlockingQueue = BlockingQueue() + let lock: DispatchGroup = DispatchGroup() + + required public init?(port: Int, type: ConnectionType = .tcp, logger: Logger?) + { + let port16 = UInt16(port) + let nwport = NWEndpoint.Port(integerLiteral: port16) + + var params: NWParameters! + switch type + { + case .tcp: + params = NWParameters.tcp + case .udp: + params = NWParameters.udp + } + + guard let listener = try? NWListener(using: params, on: nwport) else {return nil} + self.listener = listener + + self.listener.newConnectionHandler = + { + nwconnection in + + guard let connection = TransmissionConnection(transport: nwconnection, logger: logger) else {return} + + self.queue.enqueue(element: connection) + } + + self.listener.start(queue: .global()) + } + + public func accept() -> Connection + { + return self.queue.dequeue() + } +} + +#else + +@_exported import TransmissionLinux + +#endif diff --git a/Tests/TransmissionTests/TransmissionTests.swift b/Tests/TransmissionTests/TransmissionTests.swift index 9630896..7012fbf 100644 --- a/Tests/TransmissionTests/TransmissionTests.swift +++ b/Tests/TransmissionTests/TransmissionTests.swift @@ -23,7 +23,7 @@ final class TransmissionTests: XCTestCase func runServer(_ lock: DispatchGroup) { - guard let listener = Listener(port: 1234) else {return} + guard let listener = TransmissionListener(port: 1234, logger: nil) else {return} lock.leave() let connection = listener.accept() @@ -33,7 +33,7 @@ final class TransmissionTests: XCTestCase func runClient() { - let connection = Connection(host: "127.0.0.1", port: 1234) + let connection = TransmissionConnection(host: "127.0.0.1", port: 1234, logger: nil) XCTAssertNotNil(connection) let writeResult = connection!.write(string: "test")