Skip to content

Commit

Permalink
feat: large document support (#27)
Browse files Browse the repository at this point in the history
Chunks the server update state into 10_000 character chunks.

This works around the 128 KB value size limit for Cloudflare durable storage.

Prior to this change 128 KB was the maximum document size.

Tested with 1, 5 and 10 MB documents.

With this chunking, many edits (adding new lines, editing in the middle of the document, etc), result in all chunks being poked from the server to the clients. So large documents are inefficient but functioning.

We should try to replace this basic chunking approach with a [content-defined chunking approach](https://joshleeb.com/posts/content-defined-chunking.html).
  • Loading branch information
grgbkr authored Dec 7, 2023
1 parent 7b3ca3e commit 57dcfb1
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 46 deletions.
83 changes: 74 additions & 9 deletions packages/reflect-yjs/src/mutators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,89 @@ export async function updateYJS(
tx: WriteTransaction,
{name, update}: {name: string; update: string},
) {
const existing = await tx.get<string>(yjsProviderServerKey(name));
const getKeyToSet =
tx.location === 'client' ? yjsProviderClientKey : yjsProviderServerKey;
const existing = await getServerUpdate(name, tx);
const set = tx.location === 'client' ? setClientUpdate : setServerUpdate;
if (!existing) {
await tx.set(getKeyToSet(name), update);
await set(name, update, tx);
} else {
const updates = [base64.toByteArray(existing), base64.toByteArray(update)];
const merged = Y.mergeUpdatesV2(updates);
await tx.set(getKeyToSet(name), base64.fromByteArray(merged));
await set(name, base64.fromByteArray(merged), tx);
}
}

export function yjsProviderClientKey(name: string): string {
return `yjs/provider/client/${name}`;
const yjsProviderKeyPrefix = 'yjs/provider/';

function yjsProviderClientUpdateKey(name: string): string {
return `${yjsProviderKeyPrefix}client/${name}`;
}

function yjsProviderServerUpdatePrefix(name: string): string {
return `${yjsProviderKeyPrefix}server/${name}/`;
}

export function yjsProviderServerKey(name: string): string {
return `yjs/provider/server/${name}`;
function setClientUpdate(name: string, update: string, tx: WriteTransaction) {
return tx.set(yjsProviderClientUpdateKey(name), update);
}

async function setServerUpdate(
name: string,
update: string,
tx: WriteTransaction,
) {
const writes = [];
let i = 0;
const existingEntries = tx
.scan({
prefix: yjsProviderServerUpdatePrefix(name),
})
.entries();
for (; i * CHUNK_LENGTH < update.length; i++) {
const next = await existingEntries.next();
const existing = next.done ? undefined : next.value[1];
const chunk = update.substring(
i * CHUNK_LENGTH,
i * CHUNK_LENGTH + CHUNK_LENGTH,
);
if (existing !== chunk) {
writes.push(tx.set(yjsProviderServerKey(name, i), chunk));
}
}
// If the previous value had more chunks than thew new value, delete these
// additional chunks.
for await (const [key] of existingEntries) {
writes.push(tx.del(key));
}
await Promise.all(writes);
}

// Supports updates up to length 10^14
const CHUNK_LENGTH = 10_000;
export function yjsProviderServerKey(name: string, chunkIndex: number): string {
return `${yjsProviderServerUpdatePrefix(name)}${chunkIndex
.toString(10)
.padStart(10, '0')}`;
}

export async function getClientUpdate(
name: string,
tx: ReadTransaction,
): Promise<string | undefined> {
const v = await tx.get(yjsProviderClientUpdateKey(name));
return typeof v === 'string' ? v : undefined;
}

export async function getServerUpdate(
name: string,
tx: ReadTransaction,
): Promise<string | undefined> {
const chunks = await tx
.scan({
prefix: yjsProviderServerUpdatePrefix(name),
})
.values()
.toArray();
return chunks.length === 0 ? undefined : chunks.join('');
}

export function yjsAwarenessKey(
Expand Down
12 changes: 4 additions & 8 deletions packages/reflect-yjs/src/provider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,15 @@ suite('Provider', () => {
test('subscribes at construction time', () => {
const reflect = fakeReflect();
new Provider(reflect, 'test', new Doc());
expect(reflect.subscribe).toHaveBeenCalledTimes(2);
expect(reflect.subscribe).toHaveBeenCalledTimes(1);
});
});

test('destroy unsubscribes', () => {
const reflect = new FakeReflect();
reflect.subscribe.mockClear();
const unsubscribe1 = vi.fn();
const unsubscribe2 = vi.fn();
reflect.subscribe
.mockImplementationOnce(() => unsubscribe1)
.mockImplementationOnce(() => unsubscribe2);
const unsubscribe = vi.fn();
reflect.subscribe.mockImplementationOnce(() => unsubscribe);

const p = new Provider(
reflect as unknown as Reflect<Mutators>,
Expand All @@ -73,7 +70,6 @@ suite('Provider', () => {
);

p.destroy();
expect(unsubscribe1).toHaveBeenCalledTimes(1);
expect(unsubscribe2).toHaveBeenCalledTimes(1);
expect(unsubscribe).toHaveBeenCalledTimes(1);
});
});
44 changes: 15 additions & 29 deletions packages/reflect-yjs/src/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ import * as base64 from 'base64-js';
import * as Y from 'yjs';
import {Awareness} from './awareness.js';
import type {Mutators} from './mutators.js';
import {yjsProviderClientKey, yjsProviderServerKey} from './mutators.js';
import {getClientUpdate, getServerUpdate} from './mutators.js';

export class Provider {
readonly #reflect: Reflect<Mutators>;
readonly #ydoc: Y.Doc;
#awareness: Awareness | null = null;
readonly #cancelUpdateSubscribe: () => void;
readonly #cancelVectorSubscribe: () => void;
readonly #cancelSubscribe: () => void;

readonly name: string;
#vector: Uint8Array | null = null;
Expand All @@ -23,33 +22,21 @@ export class Provider {
ydoc.on('update', this.#handleUpdate);
ydoc.on('destroy', this.#handleDestroy);

this.#cancelUpdateSubscribe = reflect.subscribe(
async tx => {
let v = await tx.get(yjsProviderClientKey(this.name));
if (typeof v !== 'string') {
v = await tx.get(yjsProviderServerKey(this.name));
}
return typeof v === 'string' ? v : null;
},
docStateFromReflect => {
if (docStateFromReflect !== null) {
const update = base64.toByteArray(docStateFromReflect);
Y.applyUpdateV2(ydoc, update);
}
},
);

this.#cancelVectorSubscribe = reflect.subscribe(
async tx => {
const v = await tx.get(yjsProviderServerKey(this.name));
return typeof v === 'string' ? v : null;
},
docStateFromServer => {
if (docStateFromServer !== null) {
this.#cancelSubscribe = reflect.subscribe<[string | null, string | null]>(
async tx => [
(await getServerUpdate(this.name, tx)) ?? null,
(await getClientUpdate(this.name, tx)) ?? null,
],
([serverUpdate, clientUpdate]) => {
if (serverUpdate !== null) {
this.#vector = Y.encodeStateVectorFromUpdateV2(
base64.toByteArray(docStateFromServer),
base64.toByteArray(serverUpdate),
);
}
const update = clientUpdate ?? serverUpdate;
if (update !== null) {
Y.applyUpdateV2(ydoc, base64.toByteArray(update));
}
},
);
}
Expand All @@ -76,8 +63,7 @@ export class Provider {
};

destroy(): void {
this.#cancelUpdateSubscribe();
this.#cancelVectorSubscribe();
this.#cancelSubscribe();
this.#vector = null;
this.#ydoc.off('destroy', this.#handleDestroy);
this.#ydoc.off('update', this.#handleUpdate);
Expand Down

0 comments on commit 57dcfb1

Please sign in to comment.