Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the way we close sockets into the RelaySubscriptionManager #1754

Merged
merged 3 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Track breadcrumbs in Sentry for all analytics events. [#125](https://github.com/verse-pbc/issues/issues/125)
- Added functionality to get follows notifications in the Notifications tab. [#127](https://github.com/verse-pbc/issues/issues/127)
- Refactored the way the ProfileView downloads data and logs analytics events. [#1748](https://github.com/planetary-social/nos/pull/1748)
- Refactored the way we close relay subscriptions. [#1754](https://github.com/planetary-social/nos/pull/1754)

## [1.1] - 2025-01-03Z

Expand Down
5 changes: 2 additions & 3 deletions Nos/Service/MockRelaySubscriptionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ class MockRelaySubscriptionManager: RelaySubscriptionManager {
false
}

func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) async -> Bool {
false
func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) async {
}

func forceCloseSubscriptionCount(for subscriptionID: RelaySubscription.ID) async {
func closeSubscription(with subscriptionID: RelaySubscription.ID) async {
}

func trackConnected(socket: WebSocket) async {
Expand Down
44 changes: 4 additions & 40 deletions Nos/Service/Relay/RelayService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -93,48 +93,21 @@ import UIKit
// MARK: Closing subscriptions
extension RelayService {

func decrementSubscriptionCount(for subscriptionIDs: [String]) {
func decrementSubscriptionCount(for subscriptionIDs: [RelaySubscription.ID]) {
for subscriptionID in subscriptionIDs {
self.decrementSubscriptionCount(for: subscriptionID)
}
}

func decrementSubscriptionCount(for subscriptionID: String) {
func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) {
Task {
let subscriptionStillActive = await subscriptionManager.decrementSubscriptionCount(for: subscriptionID)
if !subscriptionStillActive {
await self.sendCloseToAll(for: subscriptionID)
}
}
}

private func sendClose(from client: WebSocketClient, subscriptionID: RelaySubscription.ID) async {
do {
await subscriptionManager.forceCloseSubscriptionCount(for: subscriptionID)
let request: [Any] = ["CLOSE", subscriptionID]
let requestData = try JSONSerialization.data(withJSONObject: request)
let requestString = String(decoding: requestData, as: UTF8.self)
client.write(string: requestString)
} catch {
Log.error("Error: Could not send close \(error.localizedDescription)")
}
}

private func sendCloseToAll(for subscription: RelaySubscription.ID) async {
let sockets = await subscriptionManager.sockets()
for socket in sockets {
await self.sendClose(from: socket, subscriptionID: subscription)
await subscriptionManager.decrementSubscriptionCount(for: subscriptionID)
}
Task { await processSubscriptionQueue() }
}

func closeConnection(to relayAddress: String?) async {
guard let address = relayAddress else { return }
if let socket = await subscriptionManager.socket(for: address) {
for subscription in await subscriptionManager.active() {
await self.sendClose(from: socket, subscriptionID: subscription.id)
}

await subscriptionManager.close(socket: socket)
}
}
Expand Down Expand Up @@ -326,17 +299,8 @@ extension RelayService {
}

private func processSubscriptionQueue() async {
await clearStaleSubscriptions()

await subscriptionManager.processSubscriptionQueue()
}

private func clearStaleSubscriptions() async {
let staleSubscriptions = await subscriptionManager.staleSubscriptions()
for staleSubscription in staleSubscriptions {
await sendCloseToAll(for: staleSubscription.id)
}
}
}

// MARK: Parsing Events
Expand All @@ -351,7 +315,7 @@ extension RelayService {
let subscription = await subscriptionManager.subscription(from: subID),
subscription.closesAfterResponse {
// This is a one-off request. Close it.
await sendClose(from: socket, subscriptionID: subID)
await subscriptionManager.closeSubscription(with: subID)
}
}

Expand Down
106 changes: 70 additions & 36 deletions Nos/Service/Relay/RelaySubscriptionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ protocol RelaySubscriptionManager {

func set(socketQueue: DispatchQueue?, delegate: WebSocketDelegate?) async

@discardableResult
func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) async -> Bool
func forceCloseSubscriptionCount(for subscriptionID: RelaySubscription.ID) async
func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) async
func closeSubscription(with subscriptionID: RelaySubscription.ID) async
func trackConnected(socket: WebSocket) async
func processSubscriptionQueue() async
func queueSubscription(with filter: Filter, to relayAddress: URL) async -> RelaySubscription
Expand All @@ -21,7 +20,6 @@ protocol RelaySubscriptionManager {
func sockets() async -> [WebSocket]
func socket(for address: String) async -> WebSocket?
func socket(for url: URL) async -> WebSocket?
func staleSubscriptions() async -> [RelaySubscription]
func subscription(from subscriptionID: RelaySubscription.ID) async -> RelaySubscription?
func trackError(socket: WebSocket) async
}
Expand Down Expand Up @@ -70,52 +68,56 @@ actor RelaySubscriptionManagerActor: RelaySubscriptionManager {
}
}

private func removeSubscription(with subscriptionID: RelaySubscription.ID) {
if let subscriptionIndex = self.all.firstIndex(
where: { $0.id == subscriptionID }
) {
all.remove(at: subscriptionIndex)
}
}

func forceCloseSubscriptionCount(for subscriptionID: RelaySubscription.ID) {
removeSubscription(with: subscriptionID)
}

/// Lets the manager know that there is one less subscriber for the given subscription. If there are no
/// more subscribers this function returns `true`.
/// more subscribers this function closes the subscription.
///
/// Note that this does not send a close message on the websocket or close the socket. Right now those actions
/// are performed by the RelayService. It's yucky though. Maybe we should make the RelaySubscriptionManager
/// do that in the future.
@discardableResult
func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) async -> Bool {
/// Incrementing the subscription count is done by `queueSubscription(with:to:)`.
func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) {
if let subscription = subscription(from: subscriptionID) {
if subscription.referenceCount == 1 {
removeSubscription(with: subscriptionID)
return false
closeSubscription(subscription)
} else {
subscription.referenceCount -= 1
return true
}
}
return false
}

/// Closes the subscription with the given ID. Sends a "CLOSE" message to the relay and removes the subscription
/// object from management.
func closeSubscription(with subscriptionID: RelaySubscription.ID) {
guard let subscription = subscription(from: subscriptionID) else {
Log.error("Tried to force close non-existent subscription \(subscriptionID)")
return
}
closeSubscription(subscription)
}

/// Closes the given subscription. Sends a "CLOSE" message to the relay and removes the subscription
/// object from management.
private func closeSubscription(_ subscription: RelaySubscription) {
sendClose(for: subscription)
removeSubscription(with: subscription.id)
}

/// Finds stale subscriptions, removes them from the subscription list, and returns them.
func staleSubscriptions() async -> [RelaySubscription] {
var staleSubscriptions = [RelaySubscription]()
/// Remove just removes a subscription from our internal tracking. It doesn't send the relay any notification
/// that we are closing the subscription.
private func removeSubscription(with subscriptionID: RelaySubscription.ID) {
if let subscriptionIndex = self.all.firstIndex(
where: { $0.id == subscriptionID }
) {
all.remove(at: subscriptionIndex)
}
}

/// Closes subscriptions that are supposed to close after a response but haven't returned any response for a while.
private func closeStaleSubscriptions() {
for subscription in active {
if subscription.closesAfterResponse,
let filterStartedAt = subscription.subscriptionStartDate,
filterStartedAt.distance(to: .now) > 10 {
staleSubscriptions.append(subscription)
closeSubscription(subscription)
}
}
for subscription in staleSubscriptions {
forceCloseSubscriptionCount(for: subscription.id)
}
return staleSubscriptions
}

// MARK: - Socket Management
Expand Down Expand Up @@ -170,7 +172,7 @@ actor RelaySubscriptionManagerActor: RelaySubscriptionManager {
socket.disconnect()
if let relayAddress = socket.url {
for subscription in all where subscription.relayAddress == relayAddress {
forceCloseSubscriptionCount(for: subscription.id)
closeSubscription(with: subscription.id)
}
socketConnections.removeValue(forKey: relayAddress)
}
Expand Down Expand Up @@ -235,6 +237,8 @@ actor RelaySubscriptionManagerActor: RelaySubscriptionManager {
///
/// It's called at appropriate times internally but can also be called externally in a loop. Idempotent.
func processSubscriptionQueue() {
closeStaleSubscriptions()

openSockets()
var waitingSubscriptions = [RelaySubscription]()

Expand Down Expand Up @@ -300,7 +304,7 @@ actor RelaySubscriptionManagerActor: RelaySubscriptionManager {
if let socket = socket(for: subscription.relayAddress) {
requestEvents(from: socket, subscription: subscription)
}
}
}

/// Takes a RelaySubscription model and makes a websockets request to the given socket
func requestEvents(from socket: WebSocketClient, subscription: RelaySubscription) {
Expand All @@ -315,6 +319,36 @@ actor RelaySubscriptionManagerActor: RelaySubscriptionManager {
}
}

/// Notifies the relay that we are closing the subscription with the given ID.
private func sendClose(for subscriptionID: RelaySubscription.ID) {
guard let subscription = subscription(from: subscriptionID) else {
Log.error("Tried to close a non-existing subscription \(subscriptionID)")
return
}
sendClose(for: subscription)
}

/// Notifies the associated relay that we are closing the given subscription.
func sendClose(for subscription: RelaySubscription) {
guard let socket = socket(for: subscription.relayAddress) else {
Log.error("Tried to close a non-existing subscription \(subscription.id)")
return
}
sendClose(from: socket, subscriptionID: subscription.id)
}

/// Writes a CLOSE message to the given socket, letting the relay know we are done with given subscription ID.
private func sendClose(from socket: WebSocketClient, subscriptionID: RelaySubscription.ID) {
do {
let request: [Any] = ["CLOSE", subscriptionID]
let requestData = try JSONSerialization.data(withJSONObject: request)
let requestString = String(decoding: requestData, as: UTF8.self)
socket.write(string: requestString)
} catch {
Log.error("Error: Could not send close \(error.localizedDescription)")
}
}

func receivedClose(for subscriptionID: RelaySubscription.ID, from socket: WebSocket) {
if let subscription = subscription(from: subscriptionID) {
// Move this subscription to the end of the queue where it will be retried
Expand Down