diff --git a/packages/reflect-yjs/src/mutators.ts b/packages/reflect-yjs/src/mutators.ts index d2836cf..00764f8 100644 --- a/packages/reflect-yjs/src/mutators.ts +++ b/packages/reflect-yjs/src/mutators.ts @@ -27,6 +27,8 @@ export type UpdateYJSArgs = { validator?: ((doc: Y.Doc) => void) | undefined; }; +const GC_FREQUENCY = 1_000; + export function updateYJS(args?: UpdateYJSArgs | undefined) { return async function ( tx: WriteTransaction, @@ -37,19 +39,25 @@ export function updateYJS(args?: UpdateYJSArgs | undefined) { const existingServerUpdate = await getServerUpdate(name, tx); const decodedUpdate = base64.toByteArray(update); let merged = existingServerUpdate - ? Y.mergeUpdatesV2([existingServerUpdate, decodedUpdate]) + ? Y.mergeUpdatesV2([existingServerUpdate.update, decodedUpdate]) : decodedUpdate; - if (validator) { - // If we have a validator, we need to materialize the doc. - // This is slow, but we'll add features to Reflect in the future to keep this doc - // loaded so we don't have to do it over and over. Currently we cannot because it is - // possible for multiple rooms to be loaded into the same JS context, so global - // variables don't work. We need some shared context that we can stash cross-mutator - // state like this on. + // We have to materialize if there is a validator (in order to run + // the validator) or if it is time to perform a GC (Y.mergeUpdatesV2 does + // not perform GC). + // This is slow, but we'll add features to Reflect in the future to keep this doc + // loaded so we don't have to do it over and over. Currently we cannot because it is + // possible for multiple rooms to be loaded into the same JS context, so global + // variables don't work. We need some shared context that we can stash cross-mutator + // state like this on. + if ( + validator || + (existingServerUpdate && + existingServerUpdate.updateMeta.count % GC_FREQUENCY === 0) + ) { const doc = new Y.Doc(); Y.applyUpdateV2(doc, merged); - validator(doc); + validator?.(doc); merged = Y.encodeStateAsUpdateV2(doc); } await setServerUpdate(name, merged, tx); @@ -109,7 +117,7 @@ async function setServerUpdate( ) { const existingInfo = (await tx.get(yjsProviderServerUpdateMetaKey(name))) as | undefined - | ChunkedUpdateMeta; + | ServerUpdateMeta; const toDelete: Set = existingInfo ? new Set(existingInfo.chunkHashes) : new Set(); @@ -121,9 +129,10 @@ async function setServerUpdate( update, hashFn, ); - const updateMeta: ChunkedUpdateMeta = { + const updateMeta: ServerUpdateMeta = { chunkHashes: chunkInfo.sourceAsChunkHashes, length: update.length, + count: (existingInfo?.count ?? 0) + 1, }; await tx.set(yjsProviderServerUpdateMetaKey(name), updateMeta); const writes = []; @@ -160,18 +169,25 @@ async function setServerUpdate( ); } -export type ChunkedUpdateMeta = { +export type ServerUpdateMeta = { chunkHashes: string[]; length: number; + count: number; }; async function getServerUpdate( name: string, tx: ReadTransaction, -): Promise { +): Promise< + | { + update: Uint8Array; + updateMeta: ServerUpdateMeta; + } + | undefined +> { const updateMeta = (await tx.get(yjsProviderServerUpdateMetaKey(name))) as | undefined - | ChunkedUpdateMeta; + | ServerUpdateMeta; if (updateMeta === undefined) { return undefined; } @@ -187,7 +203,10 @@ async function getServerUpdate( const hash = key.substring(chunksPrefixLength, key.length); chunksByHash.set(hash, base64.toByteArray(value as string)); } - return unchunk(chunksByHash, updateMeta.chunkHashes, updateMeta.length); + return { + update: unchunk(chunksByHash, updateMeta.chunkHashes, updateMeta.length), + updateMeta, + }; } export function yjsAwarenessKey( diff --git a/packages/reflect-yjs/src/provider.ts b/packages/reflect-yjs/src/provider.ts index e034c46..94f9373 100644 --- a/packages/reflect-yjs/src/provider.ts +++ b/packages/reflect-yjs/src/provider.ts @@ -2,7 +2,7 @@ import type {Reflect} from '@rocicorp/reflect/client'; import * as base64 from 'base64-js'; import * as Y from 'yjs'; import {Awareness} from './awareness.js'; -import type {ChunkedUpdateMeta, Mutators} from './mutators.js'; +import type {ServerUpdateMeta, Mutators} from './mutators.js'; import { yjsProviderKeyPrefix, yjsProviderClientUpdateKeyPrefix, @@ -19,7 +19,7 @@ export class Provider { readonly #cancelWatch: () => void; readonly name: string; - #serverUpdateMeta: ChunkedUpdateMeta | null = null; + #serverUpdateMeta: ServerUpdateMeta | null = null; #serverUpdateChunks: Map = new Map(); constructor(reflect: Reflect, name: string, ydoc: Y.Doc) { @@ -49,7 +49,7 @@ export class Provider { base64.toByteArray(diffOp.newValue as string), ); } else if (key === serverUpdateMetaKey) { - this.#serverUpdateMeta = diffOp.newValue as ChunkedUpdateMeta; + this.#serverUpdateMeta = diffOp.newValue as ServerUpdateMeta; serverUpdateChange = true; } else if (key.startsWith(serverUpdateChunkKeyPrefix)) { this.#serverUpdateChunks.set(