diff --git a/packages/reflect-yjs/src/mutators.ts b/packages/reflect-yjs/src/mutators.ts index 741de09..229b754 100644 --- a/packages/reflect-yjs/src/mutators.ts +++ b/packages/reflect-yjs/src/mutators.ts @@ -20,24 +20,89 @@ export async function updateYJS( tx: WriteTransaction, {name, update}: {name: string; update: string}, ) { - const existing = await tx.get(yjsProviderServerKey(name)); - const getKeyToSet = - tx.location === 'client' ? yjsProviderClientKey : yjsProviderServerKey; + const existing = await getServerUpdate(name, tx); + const set = tx.location === 'client' ? setClientUpdate : setServerUpdate; if (!existing) { - await tx.set(getKeyToSet(name), update); + await set(name, update, tx); } else { const updates = [base64.toByteArray(existing), base64.toByteArray(update)]; const merged = Y.mergeUpdatesV2(updates); - await tx.set(getKeyToSet(name), base64.fromByteArray(merged)); + await set(name, base64.fromByteArray(merged), tx); } } -export function yjsProviderClientKey(name: string): string { - return `yjs/provider/client/${name}`; +const yjsProviderKeyPrefix = 'yjs/provider/'; + +function yjsProviderClientUpdateKey(name: string): string { + return `${yjsProviderKeyPrefix}client/${name}`; +} + +function yjsProviderServerUpdatePrefix(name: string): string { + return `${yjsProviderKeyPrefix}server/${name}/`; } -export function yjsProviderServerKey(name: string): string { - return `yjs/provider/server/${name}`; +function setClientUpdate(name: string, update: string, tx: WriteTransaction) { + return tx.set(yjsProviderClientUpdateKey(name), update); +} + +async function setServerUpdate( + name: string, + update: string, + tx: WriteTransaction, +) { + const writes = []; + let i = 0; + const existingEntries = tx + .scan({ + prefix: yjsProviderServerUpdatePrefix(name), + }) + .entries(); + for (; i * CHUNK_LENGTH < update.length; i++) { + const next = await existingEntries.next(); + const existing = next.done ? undefined : next.value[1]; + const chunk = update.substring( + i * CHUNK_LENGTH, + i * CHUNK_LENGTH + CHUNK_LENGTH, + ); + if (existing !== chunk) { + writes.push(tx.set(yjsProviderServerKey(name, i), chunk)); + } + } + // If the previous value had more chunks than thew new value, delete these + // additional chunks. + for await (const [key] of existingEntries) { + writes.push(tx.del(key)); + } + await Promise.all(writes); +} + +// Supports updates up to length 10^14 +const CHUNK_LENGTH = 10_000; +export function yjsProviderServerKey(name: string, chunkIndex: number): string { + return `${yjsProviderServerUpdatePrefix(name)}${chunkIndex + .toString(10) + .padStart(10, '0')}`; +} + +export async function getClientUpdate( + name: string, + tx: ReadTransaction, +): Promise { + const v = await tx.get(yjsProviderClientUpdateKey(name)); + return typeof v === 'string' ? v : undefined; +} + +export async function getServerUpdate( + name: string, + tx: ReadTransaction, +): Promise { + const chunks = await tx + .scan({ + prefix: yjsProviderServerUpdatePrefix(name), + }) + .values() + .toArray(); + return chunks.length === 0 ? undefined : chunks.join(''); } export function yjsAwarenessKey( diff --git a/packages/reflect-yjs/src/provider.test.ts b/packages/reflect-yjs/src/provider.test.ts index 0e9aeae..77b31e3 100644 --- a/packages/reflect-yjs/src/provider.test.ts +++ b/packages/reflect-yjs/src/provider.test.ts @@ -53,18 +53,15 @@ suite('Provider', () => { test('subscribes at construction time', () => { const reflect = fakeReflect(); new Provider(reflect, 'test', new Doc()); - expect(reflect.subscribe).toHaveBeenCalledTimes(2); + expect(reflect.subscribe).toHaveBeenCalledTimes(1); }); }); test('destroy unsubscribes', () => { const reflect = new FakeReflect(); reflect.subscribe.mockClear(); - const unsubscribe1 = vi.fn(); - const unsubscribe2 = vi.fn(); - reflect.subscribe - .mockImplementationOnce(() => unsubscribe1) - .mockImplementationOnce(() => unsubscribe2); + const unsubscribe = vi.fn(); + reflect.subscribe.mockImplementationOnce(() => unsubscribe); const p = new Provider( reflect as unknown as Reflect, @@ -73,7 +70,6 @@ suite('Provider', () => { ); p.destroy(); - expect(unsubscribe1).toHaveBeenCalledTimes(1); - expect(unsubscribe2).toHaveBeenCalledTimes(1); + expect(unsubscribe).toHaveBeenCalledTimes(1); }); }); diff --git a/packages/reflect-yjs/src/provider.ts b/packages/reflect-yjs/src/provider.ts index 81f04e6..735ebbd 100644 --- a/packages/reflect-yjs/src/provider.ts +++ b/packages/reflect-yjs/src/provider.ts @@ -3,14 +3,13 @@ import * as base64 from 'base64-js'; import * as Y from 'yjs'; import {Awareness} from './awareness.js'; import type {Mutators} from './mutators.js'; -import {yjsProviderClientKey, yjsProviderServerKey} from './mutators.js'; +import {getClientUpdate, getServerUpdate} from './mutators.js'; export class Provider { readonly #reflect: Reflect; readonly #ydoc: Y.Doc; #awareness: Awareness | null = null; - readonly #cancelUpdateSubscribe: () => void; - readonly #cancelVectorSubscribe: () => void; + readonly #cancelSubscribe: () => void; readonly name: string; #vector: Uint8Array | null = null; @@ -23,33 +22,21 @@ export class Provider { ydoc.on('update', this.#handleUpdate); ydoc.on('destroy', this.#handleDestroy); - this.#cancelUpdateSubscribe = reflect.subscribe( - async tx => { - let v = await tx.get(yjsProviderClientKey(this.name)); - if (typeof v !== 'string') { - v = await tx.get(yjsProviderServerKey(this.name)); - } - return typeof v === 'string' ? v : null; - }, - docStateFromReflect => { - if (docStateFromReflect !== null) { - const update = base64.toByteArray(docStateFromReflect); - Y.applyUpdateV2(ydoc, update); - } - }, - ); - - this.#cancelVectorSubscribe = reflect.subscribe( - async tx => { - const v = await tx.get(yjsProviderServerKey(this.name)); - return typeof v === 'string' ? v : null; - }, - docStateFromServer => { - if (docStateFromServer !== null) { + this.#cancelSubscribe = reflect.subscribe<[string | null, string | null]>( + async tx => [ + (await getServerUpdate(this.name, tx)) ?? null, + (await getClientUpdate(this.name, tx)) ?? null, + ], + ([serverUpdate, clientUpdate]) => { + if (serverUpdate !== null) { this.#vector = Y.encodeStateVectorFromUpdateV2( - base64.toByteArray(docStateFromServer), + base64.toByteArray(serverUpdate), ); } + const update = clientUpdate ?? serverUpdate; + if (update !== null) { + Y.applyUpdateV2(ydoc, base64.toByteArray(update)); + } }, ); } @@ -76,8 +63,7 @@ export class Provider { }; destroy(): void { - this.#cancelUpdateSubscribe(); - this.#cancelVectorSubscribe(); + this.#cancelSubscribe(); this.#vector = null; this.#ydoc.off('destroy', this.#handleDestroy); this.#ydoc.off('update', this.#handleUpdate);