Skip to content

Commit

Permalink
Refactor the way we close sockets into the RelaySubscriptionManager
Browse files Browse the repository at this point in the history
  • Loading branch information
mplorentz committed Feb 4, 2025
1 parent f1441e6 commit da77fb6
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 79 deletions.
5 changes: 2 additions & 3 deletions Nos/Service/MockRelaySubscriptionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ class MockRelaySubscriptionManager: RelaySubscriptionManager {
false
}

func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) async -> Bool {
false
func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) async {
}

func forceCloseSubscriptionCount(for subscriptionID: RelaySubscription.ID) async {
func closeSubscription(with subscriptionID: RelaySubscription.ID) async {
}

func trackConnected(socket: WebSocket) async {
Expand Down
44 changes: 4 additions & 40 deletions Nos/Service/Relay/RelayService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -93,48 +93,21 @@ import UIKit
// MARK: Closing subscriptions
extension RelayService {

func decrementSubscriptionCount(for subscriptionIDs: [String]) {
func decrementSubscriptionCount(for subscriptionIDs: [RelaySubscription.ID]) {
for subscriptionID in subscriptionIDs {
self.decrementSubscriptionCount(for: subscriptionID)
}
}

func decrementSubscriptionCount(for subscriptionID: String) {
func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) {
Task {
let subscriptionStillActive = await subscriptionManager.decrementSubscriptionCount(for: subscriptionID)
if !subscriptionStillActive {
await self.sendCloseToAll(for: subscriptionID)
}
}
}

private func sendClose(from client: WebSocketClient, subscriptionID: RelaySubscription.ID) async {
do {
await subscriptionManager.forceCloseSubscriptionCount(for: subscriptionID)
let request: [Any] = ["CLOSE", subscriptionID]
let requestData = try JSONSerialization.data(withJSONObject: request)
let requestString = String(decoding: requestData, as: UTF8.self)
client.write(string: requestString)
} catch {
Log.error("Error: Could not send close \(error.localizedDescription)")
}
}

private func sendCloseToAll(for subscription: RelaySubscription.ID) async {
let sockets = await subscriptionManager.sockets()
for socket in sockets {
await self.sendClose(from: socket, subscriptionID: subscription)
await subscriptionManager.decrementSubscriptionCount(for: subscriptionID)
}
Task { await processSubscriptionQueue() }
}

func closeConnection(to relayAddress: String?) async {
guard let address = relayAddress else { return }
if let socket = await subscriptionManager.socket(for: address) {
for subscription in await subscriptionManager.active() {
await self.sendClose(from: socket, subscriptionID: subscription.id)
}

await subscriptionManager.close(socket: socket)
}
}
Expand Down Expand Up @@ -326,17 +299,8 @@ extension RelayService {
}

private func processSubscriptionQueue() async {
await clearStaleSubscriptions()

await subscriptionManager.processSubscriptionQueue()
}

private func clearStaleSubscriptions() async {
let staleSubscriptions = await subscriptionManager.staleSubscriptions()
for staleSubscription in staleSubscriptions {
await sendCloseToAll(for: staleSubscription.id)
}
}
}

// MARK: Parsing Events
Expand All @@ -351,7 +315,7 @@ extension RelayService {
let subscription = await subscriptionManager.subscription(from: subID),
subscription.closesAfterResponse {
// This is a one-off request. Close it.
await sendClose(from: socket, subscriptionID: subID)
await subscriptionManager.closeSubscription(with: subID)
}
}

Expand Down
107 changes: 71 additions & 36 deletions Nos/Service/Relay/RelaySubscriptionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ protocol RelaySubscriptionManager {

func set(socketQueue: DispatchQueue?, delegate: WebSocketDelegate?) async

@discardableResult
func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) async -> Bool
func forceCloseSubscriptionCount(for subscriptionID: RelaySubscription.ID) async
func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) async
func closeSubscription(with subscriptionID: RelaySubscription.ID) async
func trackConnected(socket: WebSocket) async
func processSubscriptionQueue() async
func queueSubscription(with filter: Filter, to relayAddress: URL) async -> RelaySubscription
Expand All @@ -21,7 +20,6 @@ protocol RelaySubscriptionManager {
func sockets() async -> [WebSocket]
func socket(for address: String) async -> WebSocket?
func socket(for url: URL) async -> WebSocket?
func staleSubscriptions() async -> [RelaySubscription]
func subscription(from subscriptionID: RelaySubscription.ID) async -> RelaySubscription?
func trackError(socket: WebSocket) async
}
Expand Down Expand Up @@ -70,52 +68,57 @@ actor RelaySubscriptionManagerActor: RelaySubscriptionManager {
}
}

private func removeSubscription(with subscriptionID: RelaySubscription.ID) {
if let subscriptionIndex = self.all.firstIndex(
where: { $0.id == subscriptionID }
) {
all.remove(at: subscriptionIndex)
}
}

func forceCloseSubscriptionCount(for subscriptionID: RelaySubscription.ID) {
removeSubscription(with: subscriptionID)
}

/// Lets the manager know that there is one less subscriber for the given subscription. If there are no
/// more subscribers this function returns `true`.
/// more subscribers this function closes the subscription.
///
/// Note that this does not send a close message on the websocket or close the socket. Right now those actions
/// are performed by the RelayService. It's yucky though. Maybe we should make the RelaySubscriptionManager
/// do that in the future.
@discardableResult
func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) async -> Bool {
/// Incrementing the subscription count is done by `queueSubscription(with:to:)`.
func decrementSubscriptionCount(for subscriptionID: RelaySubscription.ID) {
if let subscription = subscription(from: subscriptionID) {
if subscription.referenceCount == 1 {
removeSubscription(with: subscriptionID)
return false
closeSubscription(subscription)
} else {
subscription.referenceCount -= 1
return true
}
}
return false
}

/// Closes the subscription with the given ID. Sends a "CLOSE" message to the relay and removes the subscription
/// object from management.
func closeSubscription(with subscriptionID: RelaySubscription.ID) {
guard let subscription = subscription(from: subscriptionID) else {
Log.error("Tried to force close non-existent subscription \(subscriptionID)")
return
}
closeSubscription(subscription)
}

/// Finds stale subscriptions, removes them from the subscription list, and returns them.
func staleSubscriptions() async -> [RelaySubscription] {
var staleSubscriptions = [RelaySubscription]()
/// Closes the given subscription. Sends a "CLOSE" message to the relay and removes the subscription
/// object from management.
private func closeSubscription(_ subscription: RelaySubscription) {
sendClose(for: subscription)
removeSubscription(with: subscription.id)
}

/// Remove just removes a subscription from our internal tracking. It doesn't send the relay any notification
/// that we are closing the subscription.
private func removeSubscription(with subscriptionID: RelaySubscription.ID) {
if let subscriptionIndex = self.all.firstIndex(
where: { $0.id == subscriptionID }
) {
all.remove(at: subscriptionIndex)
}
}


Check failure on line 112 in Nos/Service/Relay/RelaySubscriptionManager.swift

View workflow job for this annotation

GitHub Actions / swift_lint

Limit vertical whitespace to a single empty line; currently 2 (vertical_whitespace)
/// Closes subscriptions that are supposed to close after a response but haven't returned any response for a while.
private func closeStaleSubscriptions() {
for subscription in active {
if subscription.closesAfterResponse,
let filterStartedAt = subscription.subscriptionStartDate,
filterStartedAt.distance(to: .now) > 10 {
staleSubscriptions.append(subscription)
closeSubscription(subscription)
}
}
for subscription in staleSubscriptions {
forceCloseSubscriptionCount(for: subscription.id)
}
return staleSubscriptions
}

// MARK: - Socket Management
Expand Down Expand Up @@ -170,7 +173,7 @@ actor RelaySubscriptionManagerActor: RelaySubscriptionManager {
socket.disconnect()
if let relayAddress = socket.url {
for subscription in all where subscription.relayAddress == relayAddress {
forceCloseSubscriptionCount(for: subscription.id)
closeSubscription(with: subscription.id)
}
socketConnections.removeValue(forKey: relayAddress)
}
Expand Down Expand Up @@ -235,6 +238,8 @@ actor RelaySubscriptionManagerActor: RelaySubscriptionManager {
///
/// It's called at appropriate times internally but can also be called externally in a loop. Idempotent.
func processSubscriptionQueue() {
closeStaleSubscriptions()

openSockets()
var waitingSubscriptions = [RelaySubscription]()

Expand Down Expand Up @@ -300,7 +305,7 @@ actor RelaySubscriptionManagerActor: RelaySubscriptionManager {
if let socket = socket(for: subscription.relayAddress) {
requestEvents(from: socket, subscription: subscription)
}
}
}

/// Takes a RelaySubscription model and makes a websockets request to the given socket
func requestEvents(from socket: WebSocketClient, subscription: RelaySubscription) {
Expand All @@ -315,6 +320,36 @@ actor RelaySubscriptionManagerActor: RelaySubscriptionManager {
}
}

/// Notifies the relay that we are closing the subscription with the given ID.
private func sendClose(for subscriptionID: RelaySubscription.ID) {
guard let subscription = subscription(from: subscriptionID) else {
Log.error("Tried to close a non-existing subscription \(subscriptionID)")
return
}
sendClose(for: subscription)
}

/// Notifies the associated relay that we are closing the given subscription.
func sendClose(for subscription: RelaySubscription) {
guard let socket = socket(for: subscription.relayAddress) else {
Log.error("Tried to close a non-existing subscription \(subscription.id)")
return
}
sendClose(from: socket, subscriptionID: subscription.id)
}

/// Writes a CLOSE message to the given socket, letting the relay know we are done with given subscription ID.
private func sendClose(from socket: WebSocketClient, subscriptionID: RelaySubscription.ID) {
do {
let request: [Any] = ["CLOSE", subscriptionID]
let requestData = try JSONSerialization.data(withJSONObject: request)
let requestString = String(decoding: requestData, as: UTF8.self)
socket.write(string: requestString)
} catch {
Log.error("Error: Could not send close \(error.localizedDescription)")
}
}

func receivedClose(for subscriptionID: RelaySubscription.ID, from socket: WebSocket) {
if let subscription = subscription(from: subscriptionID) {
// Move this subscription to the end of the queue where it will be retried
Expand Down

0 comments on commit da77fb6

Please sign in to comment.