Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance storage service with document hashing and improve subscriber ID handling #36

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/infra/redis/ioredis.adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ export class IoRedisAdapter implements RedisAdapter {
// This issue is not critical, as no data will be lost if this happens.
await this.redis
.multi()
.xtrim(task.stream, 'MINID', lastId - redisMinMessageLifetime)
.xadd(this.redisWorkerStreamName, '*', 'compact', task.stream)
.xtrim(task.stream, 'MINID', lastId - redisMinMessageLifetime) // trim current messages (https://redis.io/docs/latest/commands/xtrim/)
.xadd(this.redisWorkerStreamName, '*', 'compact', task.stream) // new worker entry with current date (https://redis.io/docs/latest/commands/xadd/)
.xreadgroup(
'GROUP',
this.redisWorkerGroupName,
Expand Down
24 changes: 21 additions & 3 deletions src/infra/storage/storage.service.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Client } from 'minio';
import { randomUUID } from 'crypto';
import { hash, randomUUID } from 'node:crypto';
import { Stream } from 'stream';
import * as Y from 'yjs';
import { Logger } from '../logger/index.js';
import { DocumentStorage } from '../y-redis/storage.js';
import { StorageConfig } from './storage.config.js';

const getDocHash = (doc: Uint8Array): string => {
const hashId = hash('md5', doc, 'hex');

return hashId;
};

export const encodeS3ObjectName = (room: string, docid: string, r = ''): string =>
`${encodeURIComponent(room)}/${encodeURIComponent(docid)}/${r}`;

Expand Down Expand Up @@ -38,7 +44,13 @@ export class StorageService implements DocumentStorage, OnModuleInit {

public async persistDoc(room: string, docname: string, ydoc: Y.Doc): Promise<void> {
const objectName = encodeS3ObjectName(room, docname, randomUUID());
await this.client.putObject(this.config.S3_BUCKET, objectName, Buffer.from(Y.encodeStateAsUpdateV2(ydoc)));

const updates = Y.encodeStateAsUpdateV2(ydoc);

const hashId = getDocHash(updates);
this.logger.log('hashId by persist ' + hashId);

await this.client.putObject(this.config.S3_BUCKET, objectName, Buffer.from(updates));
}

public async retrieveDoc(room: string, docname: string): Promise<{ doc: Uint8Array; references: string[] } | null> {
Expand Down Expand Up @@ -69,7 +81,13 @@ export class StorageService implements DocumentStorage, OnModuleInit {
updates = updates.filter((update) => update != null);
this.logger.log('retrieved doc room=' + room + ' docname=' + docname + ' updatesLen=' + updates.length);

return { doc: Y.mergeUpdatesV2(updates), references };
const ydoc = Y.mergeUpdatesV2(updates);

const hashId = getDocHash(ydoc);

this.logger.log('hashId by retrieve ' + hashId);

return { doc: ydoc, references };
}

public async retrieveStateVector(room: string, docname: string): Promise<Uint8Array | null> {
Expand Down
4 changes: 2 additions & 2 deletions src/infra/y-redis/subscriber.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ describe('SubscriberService', () => {
const id = '1';
const stream = 'test';

subscriber.subscribers.set(stream, { fs: new Set(), id: '2', nextId: null });
subscriber.subscribers.set(stream, { fs: new Set(), id: '2', nextId: null, otherId: '1' });

subscriber.ensureSubId(stream, id);

Expand All @@ -102,7 +102,7 @@ describe('SubscriberService', () => {
const id = '3';
const stream = 'test';

subscriber.subscribers.set(stream, { fs: new Set(), id: '2', nextId: null });
subscriber.subscribers.set(stream, { fs: new Set(), id: '2', nextId: null, otherId: '1' });

subscriber.ensureSubId(stream, id);

Expand Down
41 changes: 31 additions & 10 deletions src/infra/y-redis/subscriber.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
The original code from the `y-redis` repository is licensed under the AGPL-3.0 license.
https://github.com/yjs/y-redis
*/
import { randomUUID } from 'crypto';
import * as encoding from 'lib0/encoding';
import * as map from 'lib0/map';
import { RedisService } from '../redis/redis.service.js';
import { Api, createApiClient } from './api.service.js';
import { isSmallerRedisId } from './helper.js';
import * as protocol from './protocol.js';
import { DocumentStorage } from './storage.js';

export const running = true;
Expand All @@ -17,12 +21,14 @@ export const run = async (subscriber: Subscriber): Promise<void> => {
await subscriber.run();
}
};
const isAwarenessUpdate = (message: Buffer): boolean => message[0] === protocol.messageAwareness;

type SubscriptionHandler = (stream: string, message: Uint8Array[]) => void;
interface Subscriptions {
fs: Set<SubscriptionHandler>;
id: string;
nextId?: string | null;
otherId?: string;
}
export const createSubscriber = async (
store: DocumentStorage,
Expand All @@ -49,17 +55,16 @@ export class Subscriber {
}
}

public subscribe(stream: string, f: SubscriptionHandler): { redisId: string } {
const sub = this.subscribers.get(stream) ?? { fs: new Set<SubscriptionHandler>(), id: '0', nextId: null };
public subscribe(stream: string, f: SubscriptionHandler): Subscriptions {
const sub = map.setIfUndefined(this.subscribers, stream, () => ({
fs: new Set(),
id: '0',
nextId: null,
otherId: randomUUID(),
}));
sub.fs.add(f);

if (!this.subscribers.has(stream)) {
this.subscribers.set(stream, sub);
}

return {
redisId: sub.id,
};
return sub;
}

public unsubscribe(stream: string, f: SubscriptionHandler): void {
Expand All @@ -83,7 +88,23 @@ export class Subscriber {

for (const message of messages) {
const sub = this.subscribers.get(message.stream);
if (sub == null) continue;
if (sub == null) {
console.log('Subscriber not found for stream', message.stream);
continue;
}
const message0 =
message.messages.length === 1
? message.messages[0]
: encoding.encode((encoder) =>
message.messages.forEach((message) => {
encoding.writeUint8Array(encoder, message);
}),
);
const message1 = Buffer.from(message0.slice(0, message0.byteLength));

if (!isAwarenessUpdate(message1)) {
console.log('id', sub.id, 'message.lastId', message.lastId, 'nextId', sub.nextId, 'otherId', sub.otherId);
}
sub.id = message.lastId;
if (sub.nextId != null) {
sub.id = sub.nextId;
Expand Down
6 changes: 4 additions & 2 deletions src/infra/y-redis/ws.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,8 @@ describe('ws service', () => {
storeReferences: [],
docChanged: true,
});
subscriber.subscribe.mockReturnValueOnce({ redisId: '1-2' });
// @ts-ignore
subscriber.subscribe.mockReturnValueOnce({ id: '1-2', otherId: 'otherId' });
const redisStream = computeRedisRoomStreamName(user.room ?? '', 'index', client.redisPrefix);

await openCallback(ws, subscriber, client, redisMessageSubscriber);
Expand All @@ -448,7 +449,8 @@ describe('ws service', () => {
storeReferences: [],
docChanged: true,
});
subscriber.subscribe.mockReturnValueOnce({ redisId: '1-2' });
// @ts-ignore
subscriber.subscribe.mockReturnValueOnce({ id: '1-2', otherId: 'otherId' });

await openCallback(ws, subscriber, client, redisMessageSubscriber);

Expand Down
31 changes: 26 additions & 5 deletions src/infra/y-redis/ws.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,22 @@ export const openCallback = async (
const stream = computeRedisRoomStreamName(user.room, 'index', client.redisPrefix);
user.subs.add(stream);
ws.subscribe(stream);
user.initialRedisSubId = subscriber.subscribe(stream, redisMessageSubscriber).redisId;
const { id, otherId } = subscriber.subscribe(stream, redisMessageSubscriber);

user.initialRedisSubId = id;

console.log('new subscriber otherId', otherId);
const indexDoc = await client.getDoc(user.room, 'index');
if (indexDoc.ydoc.store.clients.size === 0) {
if (initDocCallback) {
initDocCallback(user.room, 'index', client);
}
}
if (user.isClosed) return;
// awareness is destroyed here to avoid memory leaks, see: https://github.com/yjs/y-redis/issues/24
indexDoc.awareness.destroy();
console.log('send initial state', indexDoc.ydoc);
console.log('send share', indexDoc.ydoc.share.entries());
ws.cork(() => {
ws.send(protocol.encodeSyncStep1(Y.encodeStateVector(indexDoc.ydoc)), true, false);
ws.send(protocol.encodeSyncStep2(Y.encodeStateAsUpdate(indexDoc.ydoc)), true, true);
Expand All @@ -161,12 +169,21 @@ export const openCallback = async (
}
});

// awareness is destroyed here to avoid memory leaks, see: https://github.com/yjs/y-redis/issues/24
indexDoc.awareness.destroy();

console.log(
'redisLastId vs. userInitialRedisId and otherId',
indexDoc.redisLastId,
user.initialRedisSubId,
otherId,
);
if (isSmallerRedisId(indexDoc.redisLastId, user.initialRedisSubId)) {
// our subscription is newer than the content that we received from the api
// need to renew subscription id and make sure that we catch the latest content.
console.log(
'renew subscription redisLastId vs. userInitialRedisId and otherId',
indexDoc.redisLastId,
user.initialRedisSubId,
otherId,
);
subscriber.ensureSubId(stream, indexDoc.redisLastId);
}
} catch (error) {
Expand Down Expand Up @@ -232,7 +249,11 @@ export const closeCallback = (
): void => {
try {
const user = ws.getUserData();
if (!user.room) return;
if (!user.room) {
console.log('User has no room', user);

return;
}

user.awarenessId &&
client.addMessage(
Expand Down
Loading