From ad99461cfc0c2fe12a7cafb14b1a746f5e2b646a Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Wed, 22 Jan 2025 21:34:55 +0800 Subject: [PATCH] fix: should wait a while before clearing resource --- src/y-socket-io/utils.js | 17 +++++ src/y-socket-io/y-socket-io.js | 122 +++++++++++++++++++++++---------- 2 files changed, 101 insertions(+), 38 deletions(-) create mode 100644 src/y-socket-io/utils.js diff --git a/src/y-socket-io/utils.js b/src/y-socket-io/utils.js new file mode 100644 index 0000000..159f5da --- /dev/null +++ b/src/y-socket-io/utils.js @@ -0,0 +1,17 @@ +/** + * Basically Promise.withResolvers() + * @template T + * @see https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/Promise/withResolvers + */ +export function promiseWithResolvers() { + /** @type {(value: T | PromiseLike) => void} */ + let res = () => {} + /** @type {(reason?: Error) => void} */ + let rej = () => {} + /** @type {Promise} */ + const promise = new Promise((resolve, reject) => { + res = resolve + rej = reject + }) + return { promise, resolve: res, reject: rej } +} diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 040dd0f..8ccc0ea 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -13,12 +13,14 @@ import { isDeepStrictEqual } from 'util' import { User } from './user.js' import { createModuleLogger } from 'lib0/logging' import toobusy from 'toobusy-js' +import { promiseWithResolvers } from './utils.js' const logSocketIO = createModuleLogger('@y/socket-io/server') const PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-persist-interval') || '3000') const MAX_PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-max-persist-interval') || '30000') const REVALIDATE_TIMEOUT = number.parseInt(env.getConf('y-socket-io-server-revalidate-timeout') || '60000') const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true' +const DEFAULT_CLEAR_TIMEOUT = number.parseInt(env.getConf('y-socket-io-default-clear-timeout') || '30000') process.on('SIGINT', function () { // calling .shutdown allows your process to exit normally @@ -137,11 +139,17 @@ export class YSocketIO { */ namespacePersistentMap = new Map() /** - * @type {Map void>} + * @type {Map, resolve: () => void }>} * @private * @readonly */ awaitingPersistMap = new Map() + /** + * @type {Map} + * @private + * @readonly + */ + awaitingCleanupNamespace = new Map() /** * YSocketIO constructor. @@ -213,6 +221,12 @@ export class YSocketIO { 'index', redisPrefix ) + const prevAwaitCleanup = this.awaitingCleanupNamespace.get(namespace) + if (prevAwaitCleanup) { + clearTimeout(prevAwaitCleanup) + this.cleanupNamespace(namespace, stream) + } + if (!this.namespaceMap.has(namespace)) { this.namespaceMap.set(namespace, socket.nsp) } @@ -346,13 +360,9 @@ export class YSocketIO { if (!ns) continue const nsp = this.namespaceMap.get(ns) if (nsp?.sockets.size === 0 && stream) { - this.subscriber.unsubscribe(stream, this.redisMessageSubscriber) - this.namespaceStreamMap.delete(ns) - this.streamNamespaceMap.delete(stream) - this.namespaceMap.delete(ns) - this.namespaceDocMap.get(ns)?.ydoc.destroy() - this.namespaceDocMap.delete(ns) - this.namespacePersistentMap.delete(ns) + this.cleanupNamespace(ns, stream, DEFAULT_CLEAR_TIMEOUT) + const doc = this.namespaceDocMap.get(ns) + if (doc) this.debouncedPersist(ns, doc.ydoc, true) } } }) @@ -398,18 +408,13 @@ export class YSocketIO { * @param {Array} messages */ redisMessageSubscriber = async (stream, messages) => { + console.log('[DEBUG]', { stream, messages }) const namespace = this.streamNamespaceMap.get(stream) if (!namespace) return const nsp = this.namespaceMap.get(namespace) if (!nsp) return if (nsp.sockets.size === 0 && this.subscriber) { - this.subscriber.unsubscribe(stream, this.redisMessageSubscriber) - this.namespaceStreamMap.delete(namespace) - this.streamNamespaceMap.delete(stream) - this.namespaceMap.delete(namespace) - this.namespaceDocMap.get(namespace)?.ydoc.destroy() - this.namespaceDocMap.delete(namespace) - this.namespacePersistentMap.delete(namespace) + this.cleanupNamespace(namespace, stream, DEFAULT_CLEAR_TIMEOUT) } /** @type {Uint8Array[]} */ @@ -463,9 +468,9 @@ export class YSocketIO { const lastPersistCalledAt = this.namespacePersistentMap.get(namespace) ?? 0 const now = Date.now() const shouldPersist = now - lastPersistCalledAt > MAX_PERSIST_INTERVAL - if (changed || shouldPersist) { + if (changed || shouldPersist || nsp.sockets.size === 0) { this.namespacePersistentMap.set(namespace, now) - this.debouncedPersist(namespace, doc.ydoc) + this.debouncedPersist(namespace, doc.ydoc, nsp.sockets.size === 0) } this.namespaceDocMap.get(namespace)?.ydoc.destroy() this.namespaceDocMap.set(namespace, doc) @@ -474,10 +479,17 @@ export class YSocketIO { /** * @param {string} namespace * @param {Y.Doc} doc + * @param {boolean=} immediate */ - async debouncedPersist (namespace, doc) { + debouncedPersist (namespace, doc, immediate = false) { this.debouncedPersistDocMap.set(namespace, doc) - if (this.debouncedPersistMap.has(namespace)) return + if (this.debouncedPersistMap.has(namespace)) { + if (!immediate) return + clearTimeout(this.debouncedPersistMap.get(namespace) || undefined) + } + const timeoutInterval = immediate + ? 0 + : PERSIST_INTERVAL + (Math.random() - 0.5) * PERSIST_INTERVAL const timeout = setTimeout( async () => { try { @@ -485,28 +497,24 @@ export class YSocketIO { const doc = this.debouncedPersistDocMap.get(namespace) logSocketIO(`trying to persist ${namespace}`) if (!doc) return - /** @type {Promise | null} */ - let workerPromise = null if (this.client.persistWorker) { - workerPromise = new Promise((resolve) => { - assert(this.client?.persistWorker) - this.awaitingPersistMap.set(namespace, resolve) - - const docState = Y.encodeStateAsUpdateV2(doc) - const buf = new Uint8Array(new SharedArrayBuffer(docState.length)) - buf.set(docState) - this.client.persistWorker.postMessage({ - room: namespace, - docstate: buf - }) + /** @type {ReturnType>} */ + const { promise, resolve } = promiseWithResolvers() + assert(this.client?.persistWorker) + this.awaitingPersistMap.set(namespace, { promise, resolve }) + + const docState = Y.encodeStateAsUpdateV2(doc) + const buf = new Uint8Array(new SharedArrayBuffer(docState.length)) + buf.set(docState) + this.client.persistWorker.postMessage({ + room: namespace, + docstate: buf }) - if (workerPromise) { - await workerPromise - } + await promise } else { await this.client.store.persistDoc(namespace, 'index', doc) } - await this.client.trimRoomStream(namespace, 'index', true) + await this.client.trimRoomStream(namespace, 'index') } catch (e) { console.error(e) } finally { @@ -514,7 +522,7 @@ export class YSocketIO { this.debouncedPersistMap.delete(namespace) } }, - PERSIST_INTERVAL + (Math.random() - 0.5) * PERSIST_INTERVAL + timeoutInterval ) this.debouncedPersistMap.set(namespace, timeout) @@ -608,7 +616,45 @@ export class YSocketIO { registerPersistWorkerResolve () { if (!this.client?.persistWorker) return this.client.persistWorker.on('message', ({ event, room }) => { - if (event === 'persisted') this.awaitingPersistMap.get(room)?.() + if (event === 'persisted') this.awaitingPersistMap.get(room)?.resolve() }) } + + /** + * @param {string} namespace + * @param {string} stream + * @param {number=} removeAfterWait + */ + cleanupNamespace (namespace, stream, removeAfterWait) { + if (!removeAfterWait) { + this.awaitingCleanupNamespace.delete(namespace) + return this.cleanupNamespaceImpl(namespace, stream) + } + if (this.awaitingCleanupNamespace.has(namespace)) return + + const timer = setTimeout(async () => { + const awaitingPersist = this.awaitingPersistMap.get(namespace) + if (awaitingPersist) await awaitingPersist.promise + this.cleanupNamespaceImpl(namespace, stream) + this.awaitingCleanupNamespace.delete(namespace) + logSocketIO(`no active connection, namespace: ${namespace} cleared`) + }, removeAfterWait) + this.awaitingCleanupNamespace.set(namespace, timer) + } + + /** + * @param {string} namespace + * @param {string} stream + * @private + */ + cleanupNamespaceImpl (namespace, stream) { + this.subscriber?.unsubscribe(stream, this.redisMessageSubscriber) + this.namespaceStreamMap.delete(namespace) + this.streamNamespaceMap.delete(stream) + this.namespaceMap.delete(namespace) + this.namespaceDocMap.get(namespace)?.ydoc.destroy() + this.namespaceDocMap.delete(namespace) + this.namespacePersistentMap.delete(namespace) + this.client?.trimRoomStream(namespace, 'index', true) + } }