diff --git a/CHANGELOG.md b/CHANGELOG.md index 8577ed038..0ac219335 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Track breadcrumbs in Sentry for all analytics events. [#125](https://github.com/verse-pbc/issues/issues/125) - Added functionality to get follows notifications in the Notifications tab. [#127](https://github.com/verse-pbc/issues/issues/127) - Refactored the way the ProfileView downloads data and logs analytics events. [#1748](https://github.com/planetary-social/nos/pull/1748) +- Refactored the way we close relay subscriptions. [#1754](https://github.com/planetary-social/nos/pull/1754) ## [1.1] - 2025-01-03Z diff --git a/Nos/Service/MockRelaySubscriptionManager.swift b/Nos/Service/MockRelaySubscriptionManager.swift index ea1df8e67..b6ac39c8d 100644 --- a/Nos/Service/MockRelaySubscriptionManager.swift +++ b/Nos/Service/MockRelaySubscriptionManager.swift @@ -37,11 +37,10 @@ class MockRelaySubscriptionManager: RelaySubscriptionManager { false } - func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) async -> Bool { - false + func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) async { } - func forceCloseSubscriptionCount(for subscriptionID: RelaySubscription.ID) async { + func closeSubscription(with subscriptionID: RelaySubscription.ID) async { } func trackConnected(socket: WebSocket) async { diff --git a/Nos/Service/Relay/RelayService.swift b/Nos/Service/Relay/RelayService.swift index 84c264a05..2e72f41c7 100644 --- a/Nos/Service/Relay/RelayService.swift +++ b/Nos/Service/Relay/RelayService.swift @@ -93,48 +93,21 @@ import UIKit // MARK: Closing subscriptions extension RelayService { - func decrementSubscriptionCount(for subscriptionIDs: [String]) { + func decrementSubscriptionCount(for subscriptionIDs: [RelaySubscription.ID]) { for subscriptionID in subscriptionIDs { self.decrementSubscriptionCount(for: subscriptionID) } } - func decrementSubscriptionCount(for subscriptionID: String) { + func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) { Task { - let subscriptionStillActive = await subscriptionManager.decrementSubscriptionCount(for: subscriptionID) - if !subscriptionStillActive { - await self.sendCloseToAll(for: subscriptionID) - } - } - } - - private func sendClose(from client: WebSocketClient, subscriptionID: RelaySubscription.ID) async { - do { - await subscriptionManager.forceCloseSubscriptionCount(for: subscriptionID) - let request: [Any] = ["CLOSE", subscriptionID] - let requestData = try JSONSerialization.data(withJSONObject: request) - let requestString = String(decoding: requestData, as: UTF8.self) - client.write(string: requestString) - } catch { - Log.error("Error: Could not send close \(error.localizedDescription)") - } - } - - private func sendCloseToAll(for subscription: RelaySubscription.ID) async { - let sockets = await subscriptionManager.sockets() - for socket in sockets { - await self.sendClose(from: socket, subscriptionID: subscription) + await subscriptionManager.decrementSubscriptionCount(for: subscriptionID) } - Task { await processSubscriptionQueue() } } func closeConnection(to relayAddress: String?) async { guard let address = relayAddress else { return } if let socket = await subscriptionManager.socket(for: address) { - for subscription in await subscriptionManager.active() { - await self.sendClose(from: socket, subscriptionID: subscription.id) - } - await subscriptionManager.close(socket: socket) } } @@ -326,17 +299,8 @@ extension RelayService { } private func processSubscriptionQueue() async { - await clearStaleSubscriptions() - await subscriptionManager.processSubscriptionQueue() } - - private func clearStaleSubscriptions() async { - let staleSubscriptions = await subscriptionManager.staleSubscriptions() - for staleSubscription in staleSubscriptions { - await sendCloseToAll(for: staleSubscription.id) - } - } } // MARK: Parsing Events @@ -351,7 +315,7 @@ extension RelayService { let subscription = await subscriptionManager.subscription(from: subID), subscription.closesAfterResponse { // This is a one-off request. Close it. - await sendClose(from: socket, subscriptionID: subID) + await subscriptionManager.closeSubscription(with: subID) } } diff --git a/Nos/Service/Relay/RelaySubscriptionManager.swift b/Nos/Service/Relay/RelaySubscriptionManager.swift index 5cfe64843..1557e6671 100644 --- a/Nos/Service/Relay/RelaySubscriptionManager.swift +++ b/Nos/Service/Relay/RelaySubscriptionManager.swift @@ -7,9 +7,8 @@ protocol RelaySubscriptionManager { func set(socketQueue: DispatchQueue?, delegate: WebSocketDelegate?) async - @discardableResult - func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) async -> Bool - func forceCloseSubscriptionCount(for subscriptionID: RelaySubscription.ID) async + func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) async + func closeSubscription(with subscriptionID: RelaySubscription.ID) async func trackConnected(socket: WebSocket) async func processSubscriptionQueue() async func queueSubscription(with filter: Filter, to relayAddress: URL) async -> RelaySubscription @@ -21,7 +20,6 @@ protocol RelaySubscriptionManager { func sockets() async -> [WebSocket] func socket(for address: String) async -> WebSocket? func socket(for url: URL) async -> WebSocket? - func staleSubscriptions() async -> [RelaySubscription] func subscription(from subscriptionID: RelaySubscription.ID) async -> RelaySubscription? func trackError(socket: WebSocket) async } @@ -70,52 +68,56 @@ actor RelaySubscriptionManagerActor: RelaySubscriptionManager { } } - private func removeSubscription(with subscriptionID: RelaySubscription.ID) { - if let subscriptionIndex = self.all.firstIndex( - where: { $0.id == subscriptionID } - ) { - all.remove(at: subscriptionIndex) - } - } - - func forceCloseSubscriptionCount(for subscriptionID: RelaySubscription.ID) { - removeSubscription(with: subscriptionID) - } - /// Lets the manager know that there is one less subscriber for the given subscription. If there are no - /// more subscribers this function returns `true`. + /// more subscribers this function closes the subscription. /// - /// Note that this does not send a close message on the websocket or close the socket. Right now those actions - /// are performed by the RelayService. It's yucky though. Maybe we should make the RelaySubscriptionManager - /// do that in the future. - @discardableResult - func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) async -> Bool { + /// Incrementing the subscription count is done by `queueSubscription(with:to:)`. + func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) { if let subscription = subscription(from: subscriptionID) { if subscription.referenceCount == 1 { - removeSubscription(with: subscriptionID) - return false + closeSubscription(subscription) } else { subscription.referenceCount -= 1 - return true } } - return false + } + + /// Closes the subscription with the given ID. Sends a "CLOSE" message to the relay and removes the subscription + /// object from management. + func closeSubscription(with subscriptionID: RelaySubscription.ID) { + guard let subscription = subscription(from: subscriptionID) else { + Log.error("Tried to force close non-existent subscription \(subscriptionID)") + return + } + closeSubscription(subscription) + } + + /// Closes the given subscription. Sends a "CLOSE" message to the relay and removes the subscription + /// object from management. + private func closeSubscription(_ subscription: RelaySubscription) { + sendClose(for: subscription) + removeSubscription(with: subscription.id) } - /// Finds stale subscriptions, removes them from the subscription list, and returns them. - func staleSubscriptions() async -> [RelaySubscription] { - var staleSubscriptions = [RelaySubscription]() + /// Remove just removes a subscription from our internal tracking. It doesn't send the relay any notification + /// that we are closing the subscription. + private func removeSubscription(with subscriptionID: RelaySubscription.ID) { + if let subscriptionIndex = self.all.firstIndex( + where: { $0.id == subscriptionID } + ) { + all.remove(at: subscriptionIndex) + } + } + + /// Closes subscriptions that are supposed to close after a response but haven't returned any response for a while. + private func closeStaleSubscriptions() { for subscription in active { if subscription.closesAfterResponse, let filterStartedAt = subscription.subscriptionStartDate, filterStartedAt.distance(to: .now) > 10 { - staleSubscriptions.append(subscription) + closeSubscription(subscription) } } - for subscription in staleSubscriptions { - forceCloseSubscriptionCount(for: subscription.id) - } - return staleSubscriptions } // MARK: - Socket Management @@ -170,7 +172,7 @@ actor RelaySubscriptionManagerActor: RelaySubscriptionManager { socket.disconnect() if let relayAddress = socket.url { for subscription in all where subscription.relayAddress == relayAddress { - forceCloseSubscriptionCount(for: subscription.id) + closeSubscription(with: subscription.id) } socketConnections.removeValue(forKey: relayAddress) } @@ -235,6 +237,8 @@ actor RelaySubscriptionManagerActor: RelaySubscriptionManager { /// /// It's called at appropriate times internally but can also be called externally in a loop. Idempotent. func processSubscriptionQueue() { + closeStaleSubscriptions() + openSockets() var waitingSubscriptions = [RelaySubscription]() @@ -300,7 +304,7 @@ actor RelaySubscriptionManagerActor: RelaySubscriptionManager { if let socket = socket(for: subscription.relayAddress) { requestEvents(from: socket, subscription: subscription) } - } + } /// Takes a RelaySubscription model and makes a websockets request to the given socket func requestEvents(from socket: WebSocketClient, subscription: RelaySubscription) { @@ -315,6 +319,36 @@ actor RelaySubscriptionManagerActor: RelaySubscriptionManager { } } + /// Notifies the relay that we are closing the subscription with the given ID. + private func sendClose(for subscriptionID: RelaySubscription.ID) { + guard let subscription = subscription(from: subscriptionID) else { + Log.error("Tried to close a non-existing subscription \(subscriptionID)") + return + } + sendClose(for: subscription) + } + + /// Notifies the associated relay that we are closing the given subscription. + func sendClose(for subscription: RelaySubscription) { + guard let socket = socket(for: subscription.relayAddress) else { + Log.error("Tried to close a non-existing subscription \(subscription.id)") + return + } + sendClose(from: socket, subscriptionID: subscription.id) + } + + /// Writes a CLOSE message to the given socket, letting the relay know we are done with given subscription ID. + private func sendClose(from socket: WebSocketClient, subscriptionID: RelaySubscription.ID) { + do { + let request: [Any] = ["CLOSE", subscriptionID] + let requestData = try JSONSerialization.data(withJSONObject: request) + let requestString = String(decoding: requestData, as: UTF8.self) + socket.write(string: requestString) + } catch { + Log.error("Error: Could not send close \(error.localizedDescription)") + } + } + func receivedClose(for subscriptionID: RelaySubscription.ID, from socket: WebSocket) { if let subscription = subscription(from: subscriptionID) { // Move this subscription to the end of the queue where it will be retried