Skip to content
This repository has been archived by the owner on Mar 30, 2024. It is now read-only.

Added support for Notifications that use DispatchSource #59

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions Sources/PostgreSQL/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ public final class Connection: ConnInfoInitializable {
public let channel: String
public let payload: String?

/// initializer usable without knowledge of CPostgreSQL
/// required to allow unit testing of classes using Notifications
public init(pid: Int, channel: String, payload: String?) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do not need to make this init public. If you do @testable import, it should just work. This struct should not be initializable outside of this library.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove that change

self.pid = pid
self.channel = channel
self.payload = payload
}

/// internal initializer
init(pgNotify: PGnotify) {
channel = String(cString: pgNotify.relname)
pid = Int(pgNotify.be_pid)
Expand All @@ -171,6 +180,37 @@ public final class Connection: ConnInfoInitializable {
}
}

/// Creates a dispatch read source for this connection that will call `callback` on `queue` when a notification is received
///
/// - Parameter channel: the channel to register for
/// - Parameter queue: the queue to create the DispatchSource on
/// - Parameter callback: the callback
/// - Parameter note: The notification received from the database
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The note and err parameters cannot be commented like this. It should be part of the callback comments. Also I would write out their full names instead of abbreviations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? If you comment them that way, option clicking on the listen function shows them in a nested table underneath the callback parameter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sorry, didn’t know it worked like that.

/// - Parameter err: Any error while reading the notification. If not nil, the source will have been canceled
/// - Returns: the dispatch socket to activate
/// - Throws: if fails to get the socket for the connection
public func makeListenDispatchSource(toChannel channel: String, queue: DispatchQueue, callback: @escaping (_ note: Notification?, _ err: Error?) -> Void) throws -> DispatchSourceRead {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would simply call this function listen(toChannel:queue:callback:)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. I'd just copied Apple's naming convention. Better to use a swifty one.

guard let sock = Optional.some(PQsocket(self.cConnection)), sock >= 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit strange, why create an optional out of something not optional. Also, there is a callback, I would use this callback in case of an error. This way you can remove throws from this function. So it would change to:

let sock = PQsocket(cConnection)
guard sock >= 0 else {
    let error = PostgreSQLError(code: .ioError, reason: "failed to get socket for connection")
    callback(nil, error)
    return
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The optional allows using it in a guard statement. I try to put all precondition guards at the top of the function. I'll change.

It can't just return because it returns a non-optional DispatchSource. That's why it throws. Better to explicitly generate an error instead of returning nil.

else { throw PostgreSQLError(code: .ioError, reason: "failed to get socket for connection") }
let src = DispatchSource.makeReadSource(fileDescriptor: sock, queue: queue)
src.setEventHandler { [unowned self] in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using unowned self can cause a crash. It might be better not to retain self here, so I would change it to:

src.setEventHandler { [weak self] in
    guard let strongSelf = self else {
        return
    }

    // Use strongSelf instead of self
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

do {
try self.validateConnection()
PQconsumeInput(self.cConnection)
while let pgNotify = PQnotifies(self.cConnection) {
let notification = Notification(pgNotify: pgNotify.pointee)
callback(notification, nil)
PQfreemem(pgNotify)
}
} catch {
callback(nil, error)
src.cancel()
}
}
try self.execute("LISTEN \(channel)")
return src
}

/// Registers as a listener on a specific notification channel.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering the new method should be the way to go, I would mark the old function as deprecated.

///
/// - Parameters:
Expand Down
54 changes: 54 additions & 0 deletions Tests/PostgreSQLTests/PostgreSQLTests.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import XCTest
@testable import PostgreSQL
import Foundation
import Dispatch

class PostgreSQLTests: XCTestCase {
static let allTests = [
Expand Down Expand Up @@ -30,6 +31,8 @@ class PostgreSQLTests: XCTestCase {
("testUnsupportedObject", testUnsupportedObject),
("testNotification", testNotification),
("testNotificationWithPayload", testNotificationWithPayload),
("testDispatchNotification", testDispatchNotification),
("testDispatchNotificationWithPayload", testDispatchNotificationWithPayload),
("testQueryToNode", testQueryToNode)
]

Expand Down Expand Up @@ -779,6 +782,31 @@ class PostgreSQLTests: XCTestCase {
waitForExpectations(timeout: 5)
}

func testDispatchNotification() throws {
let conn1 = try postgreSQL.makeConnection()
let conn2 = try postgreSQL.makeConnection()

let testExpectation = expectation(description: "Receive notification")

let queue = DispatchQueue.global()
var source: DispatchSourceRead?
source = try! conn1.makeListenDispatchSource(toChannel: "test_channel1", queue: queue) { (notification, error) in
XCTAssertEqual(notification?.channel, "test_channel1")
XCTAssertNil(notification?.payload)
XCTAssertNil(error)

testExpectation.fulfill()
source?.cancel()
}
source?.resume()

sleep(1)

try conn2.notify(channel: "test_channel1", payload: nil)

waitForExpectations(timeout: 5)
}

func testNotificationWithPayload() throws {
let conn1 = try postgreSQL.makeConnection()
let conn2 = try postgreSQL.makeConnection()
Expand All @@ -801,6 +829,32 @@ class PostgreSQLTests: XCTestCase {
waitForExpectations(timeout: 5)
}

func testDispatchNotificationWithPayload() throws {
let conn1 = try postgreSQL.makeConnection()
let conn2 = try postgreSQL.makeConnection()

let testExpectation = expectation(description: "Receive notification with payload")

let queue = DispatchQueue.global()
var source: DispatchSourceRead?
source = try! conn1.makeListenDispatchSource(toChannel: "test_channel2", queue: queue) { (notification, error) in
XCTAssertEqual(notification?.channel, "test_channel2")
XCTAssertEqual(notification?.payload, "test_payload")
XCTAssertNotNil(notification?.payload)
XCTAssertNil(error)

testExpectation.fulfill()
source?.cancel()
}
source?.resume()

sleep(1)

try conn2.notify(channel: "test_channel2", payload: "test_payload")

waitForExpectations(timeout: 5)
}

func testQueryToNode() throws {
let conn = try postgreSQL.makeConnection()

Expand Down