Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: change mutation updates to not be based on server vector #37

Merged
merged 7 commits into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions packages/reflect-yjs/src/mutators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export type UpdateYJSArgs = {
export function updateYJS(args?: UpdateYJSArgs | undefined) {
return async function (
tx: WriteTransaction,
{name, update}: {name: string; update: string},
{name, id, update}: {name: string; id: string; update: string},
) {
const {validator} = args ?? {};
if (tx.location === 'server') {
Expand All @@ -51,7 +51,7 @@ export function updateYJS(args?: UpdateYJSArgs | undefined) {
if (validator) {
throw new Error('validator only supported on server');
}
await setClientUpdate(name, update, tx);
await setClientUpdate(name, id, update, tx);
}
};
}
Expand All @@ -60,11 +60,15 @@ export function yjsProviderKeyPrefix(name: string): string {
return `'yjs/provider/${name}/`;
}

export function yjsProviderClientUpdateKey(name: string): string {
return `${yjsProviderKeyPrefix(name)}client`;
export function yjsProviderClientUpdateKeyPrefix(name: string): string {
return `${yjsProviderKeyPrefix(name)}/client/`;
}

function yjsProviderServerUpdateKeyPrefix(name: string): string {
function yjsProviderClientUpdateKey(name: string, id: string): string {
return `${yjsProviderClientUpdateKeyPrefix(name)}${id}`;
}

export function yjsProviderServerUpdateKeyPrefix(name: string): string {
return `${yjsProviderKeyPrefix(name)}/server/`;
}

Expand All @@ -83,8 +87,13 @@ export function yjsProviderServerChunkKey(
return `${yjsProviderServerUpdateChunkKeyPrefix(name)}${chunkHash}`;
}

function setClientUpdate(name: string, update: string, tx: WriteTransaction) {
return tx.set(yjsProviderClientUpdateKey(name), update);
function setClientUpdate(
name: string,
id: string,
update: string,
tx: WriteTransaction,
) {
return tx.set(yjsProviderClientUpdateKey(name, id), update);
}

const AVG_CHUNK_SIZE_B = 1024;
Expand Down Expand Up @@ -147,14 +156,6 @@ async function setServerUpdate(
);
}

export async function getClientUpdate(
name: string,
tx: ReadTransaction,
): Promise<string | undefined> {
const v = await tx.get(yjsProviderClientUpdateKey(name));
return typeof v === 'string' ? v : undefined;
}

export type ChunkedUpdateMeta = {
chunkHashes: string[];
length: number;
Expand Down
64 changes: 22 additions & 42 deletions packages/reflect-yjs/src/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import * as Y from 'yjs';
import {Awareness} from './awareness.js';
import type {ChunkedUpdateMeta, Mutators} from './mutators.js';
import {
yjsProviderClientUpdateKey,
yjsProviderKeyPrefix,
yjsProviderClientUpdateKeyPrefix,
yjsProviderServerUpdateChunkKeyPrefix,
yjsProviderServerUpdateMetaKey,
} from './mutators.js';
import {unchunk} from './chunk.js';
import {uuidv4} from 'lib0/random.js';

export class Provider {
readonly #reflect: Reflect<Mutators>;
Expand All @@ -18,37 +19,34 @@ export class Provider {
readonly #cancelWatch: () => void;

readonly name: string;
#clientUpdate: Uint8Array | null = null;
#serverUpdate: Uint8Array | null = null;
#serverUpdateMeta: ChunkedUpdateMeta | null = null;
#serverUpdateChunks: Map<string, Uint8Array> = new Map();
#vector: Uint8Array | null = null;

constructor(reflect: Reflect<Mutators>, name: string, ydoc: Y.Doc) {
this.#reflect = reflect;
this.name = name;
this.#ydoc = ydoc;

ydoc.on('update', this.#handleUpdate);
ydoc.on('updateV2', this.#handleUpdateV2);
ydoc.on('destroy', this.#handleDestroy);

const clientUpdateKey = yjsProviderClientUpdateKey(name);
const clientUpdateKeyPrefix = yjsProviderClientUpdateKeyPrefix(name);
const serverUpdateMetaKey = yjsProviderServerUpdateMetaKey(name);
const serverUpdateChunkKeyPrefix =
yjsProviderServerUpdateChunkKeyPrefix(name);

let isInitial = true;
this.#cancelWatch = reflect.experimentalWatch(
diff => {
const newClientUpdates: Uint8Array[] = [];
let serverUpdateChange = false;
for (const diffOp of diff) {
const {key} = diffOp;
switch (diffOp.op) {
case 'add':
case 'change':
if (key === clientUpdateKey) {
this.#clientUpdate = base64.toByteArray(
diffOp.newValue as string,
if (key.startsWith(clientUpdateKeyPrefix)) {
newClientUpdates.push(
base64.toByteArray(diffOp.newValue as string),
);
} else if (key === serverUpdateMetaKey) {
this.#serverUpdateMeta = diffOp.newValue as ChunkedUpdateMeta;
Expand All @@ -61,9 +59,7 @@ export class Provider {
}
break;
case 'del':
if (key === clientUpdateKey) {
this.#clientUpdate = null;
} else if (key === serverUpdateMetaKey) {
if (key === serverUpdateMetaKey) {
this.#serverUpdateMeta = null;
serverUpdateChange = true;
} else if (key.startsWith(serverUpdateChunkKeyPrefix)) {
Expand All @@ -74,28 +70,16 @@ export class Provider {
break;
}
}
if (serverUpdateChange) {
if (this.#serverUpdateMeta === null) {
this.#serverUpdate = null;
this.#vector = null;
} else {
this.#serverUpdate = unchunk(
this.#serverUpdateChunks,
this.#serverUpdateMeta.chunkHashes,
this.#serverUpdateMeta.length,
);
this.#vector = Y.encodeStateVectorFromUpdateV2(this.#serverUpdate);
Y.applyUpdateV2(ydoc, this.#serverUpdate, this);
}
if (serverUpdateChange && this.#serverUpdateMeta !== null) {
const serverUpdate = unchunk(
this.#serverUpdateChunks,
this.#serverUpdateMeta.chunkHashes,
this.#serverUpdateMeta.length,
);
Y.applyUpdateV2(ydoc, serverUpdate, this);
}
if (isInitial) {
isInitial = false;
// Only apply client update on initial load of document.
// All other client updates will have originated from this ydoc
// and thus not need to be applied.
if (this.#clientUpdate) {
Y.applyUpdateV2(ydoc, this.#clientUpdate, this);
}
for (const clientUpdate of newClientUpdates) {
Y.applyUpdateV2(ydoc, clientUpdate, this);
}
},
{
Expand All @@ -112,16 +96,14 @@ export class Provider {
return this.#awareness;
}

#handleUpdate = async (_update: unknown, origin: unknown) => {
#handleUpdateV2 = async (updateV2: Uint8Array, origin: unknown) => {
if (origin === this) {
return;
}
const diffUpdate = this.#vector
? Y.encodeStateAsUpdateV2(this.#ydoc, this.#vector)
: Y.encodeStateAsUpdateV2(this.#ydoc);
await this.#reflect.mutate.updateYJS({
name: this.name,
update: base64.fromByteArray(diffUpdate),
id: uuidv4(),
update: base64.fromByteArray(updateV2),
});
};

Expand All @@ -131,12 +113,10 @@ export class Provider {

destroy(): void {
this.#cancelWatch();
this.#clientUpdate = null;
this.#serverUpdateMeta = null;
this.#serverUpdateChunks.clear();
this.#vector = null;
this.#ydoc.off('destroy', this.#handleDestroy);
this.#ydoc.off('update', this.#handleUpdate);
this.#ydoc.off('updateV2', this.#handleUpdateV2);
this.#awareness?.destroy();
}
}