Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: implement periodic GC. #46

Merged
merged 2 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions packages/reflect-yjs/src/mutators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ export type UpdateYJSArgs = {
validator?: ((doc: Y.Doc) => void) | undefined;
};

const GC_FREQUENCY = 1_000;

export function updateYJS(args?: UpdateYJSArgs | undefined) {
return async function (
tx: WriteTransaction,
Expand All @@ -37,19 +39,25 @@ export function updateYJS(args?: UpdateYJSArgs | undefined) {
const existingServerUpdate = await getServerUpdate(name, tx);
const decodedUpdate = base64.toByteArray(update);
let merged = existingServerUpdate
? Y.mergeUpdatesV2([existingServerUpdate, decodedUpdate])
? Y.mergeUpdatesV2([existingServerUpdate.update, decodedUpdate])
: decodedUpdate;

if (validator) {
// If we have a validator, we need to materialize the doc.
// This is slow, but we'll add features to Reflect in the future to keep this doc
// loaded so we don't have to do it over and over. Currently we cannot because it is
// possible for multiple rooms to be loaded into the same JS context, so global
// variables don't work. We need some shared context that we can stash cross-mutator
// state like this on.
// We have to materialize if there is a validator (in order to run
// the validator) or if it is time to perform a GC (Y.mergeUpdatesV2 does
// not perform GC).
// This is slow, but we'll add features to Reflect in the future to keep this doc
// loaded so we don't have to do it over and over. Currently we cannot because it is
// possible for multiple rooms to be loaded into the same JS context, so global
// variables don't work. We need some shared context that we can stash cross-mutator
// state like this on.
if (
validator ||
(existingServerUpdate &&
existingServerUpdate.updateMeta.count % GC_FREQUENCY === 0)
) {
const doc = new Y.Doc();
Y.applyUpdateV2(doc, merged);
validator(doc);
validator?.(doc);
merged = Y.encodeStateAsUpdateV2(doc);
}
await setServerUpdate(name, merged, tx);
Expand Down Expand Up @@ -109,7 +117,7 @@ async function setServerUpdate(
) {
const existingInfo = (await tx.get(yjsProviderServerUpdateMetaKey(name))) as
| undefined
| ChunkedUpdateMeta;
| ServerUpdateMeta;
const toDelete: Set<string> = existingInfo
? new Set(existingInfo.chunkHashes)
: new Set();
Expand All @@ -121,9 +129,10 @@ async function setServerUpdate(
update,
hashFn,
);
const updateMeta: ChunkedUpdateMeta = {
const updateMeta: ServerUpdateMeta = {
chunkHashes: chunkInfo.sourceAsChunkHashes,
length: update.length,
count: (existingInfo?.count ?? 0) + 1,
};
await tx.set(yjsProviderServerUpdateMetaKey(name), updateMeta);
const writes = [];
Expand Down Expand Up @@ -160,18 +169,25 @@ async function setServerUpdate(
);
}

export type ChunkedUpdateMeta = {
export type ServerUpdateMeta = {
chunkHashes: string[];
length: number;
count: number;
};

async function getServerUpdate(
name: string,
tx: ReadTransaction,
): Promise<Uint8Array | undefined> {
): Promise<
| {
update: Uint8Array;
updateMeta: ServerUpdateMeta;
}
| undefined
> {
const updateMeta = (await tx.get(yjsProviderServerUpdateMetaKey(name))) as
| undefined
| ChunkedUpdateMeta;
| ServerUpdateMeta;
if (updateMeta === undefined) {
return undefined;
}
Expand All @@ -187,7 +203,10 @@ async function getServerUpdate(
const hash = key.substring(chunksPrefixLength, key.length);
chunksByHash.set(hash, base64.toByteArray(value as string));
}
return unchunk(chunksByHash, updateMeta.chunkHashes, updateMeta.length);
return {
update: unchunk(chunksByHash, updateMeta.chunkHashes, updateMeta.length),
updateMeta,
};
}

export function yjsAwarenessKey(
Expand Down
6 changes: 3 additions & 3 deletions packages/reflect-yjs/src/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type {Reflect} from '@rocicorp/reflect/client';
import * as base64 from 'base64-js';
import * as Y from 'yjs';
import {Awareness} from './awareness.js';
import type {ChunkedUpdateMeta, Mutators} from './mutators.js';
import type {ServerUpdateMeta, Mutators} from './mutators.js';
import {
yjsProviderKeyPrefix,
yjsProviderClientUpdateKeyPrefix,
Expand All @@ -19,7 +19,7 @@ export class Provider {
readonly #cancelWatch: () => void;

readonly name: string;
#serverUpdateMeta: ChunkedUpdateMeta | null = null;
#serverUpdateMeta: ServerUpdateMeta | null = null;
#serverUpdateChunks: Map<string, Uint8Array> = new Map();

constructor(reflect: Reflect<Mutators>, name: string, ydoc: Y.Doc) {
Expand Down Expand Up @@ -49,7 +49,7 @@ export class Provider {
base64.toByteArray(diffOp.newValue as string),
);
} else if (key === serverUpdateMetaKey) {
this.#serverUpdateMeta = diffOp.newValue as ChunkedUpdateMeta;
this.#serverUpdateMeta = diffOp.newValue as ServerUpdateMeta;
serverUpdateChange = true;
} else if (key.startsWith(serverUpdateChunkKeyPrefix)) {
this.#serverUpdateChunks.set(
Expand Down