From 7cb44fc92668e54d637f91dff58ab466b02b9863 Mon Sep 17 00:00:00 2001 From: Sergej Hoffmann <97111299+SevenWaysDP@users.noreply.github.com> Date: Tue, 26 Nov 2024 11:07:32 +0100 Subject: [PATCH] BC-8431 - Broken tldraw docs when reloading (#38) --- src/infra/redis/ioredis.adapter.spec.ts | 2 +- src/infra/redis/ioredis.adapter.ts | 10 +--------- src/infra/y-redis/api.service.ts | 9 +++++++-- src/infra/y-redis/subscriber.service.ts | 7 ++----- 4 files changed, 11 insertions(+), 17 deletions(-) diff --git a/src/infra/redis/ioredis.adapter.spec.ts b/src/infra/redis/ioredis.adapter.spec.ts index 5925e1f3..7b4c7ce2 100644 --- a/src/infra/redis/ioredis.adapter.spec.ts +++ b/src/infra/redis/ioredis.adapter.spec.ts @@ -255,7 +255,7 @@ describe(IoRedisAdapter.name, () => { // @ts-ignore const id = readBufferReply[0][0].toString(); - const expectedProps = ['COUNT', 1000, 'BLOCK', 1000, 'STREAMS', computeRedisRoomStreamName, '0']; + const expectedProps = ['STREAMS', computeRedisRoomStreamName, '0']; const expectedResult = [ { diff --git a/src/infra/redis/ioredis.adapter.ts b/src/infra/redis/ioredis.adapter.ts index 2c98b0ee..d90c1d5a 100644 --- a/src/infra/redis/ioredis.adapter.ts +++ b/src/infra/redis/ioredis.adapter.ts @@ -105,15 +105,7 @@ export class IoRedisAdapter implements RedisAdapter { } public async readMessagesFromStream(streamName: string): Promise { - const reads = await this.redis.xreadBuffer( - 'COUNT', - 1000, // Adjust the count as needed - 'BLOCK', - 1000, // Adjust the block time as needed - 'STREAMS', - streamName, - '0', - ); + const reads = await this.redis.xreadBuffer('STREAMS', streamName, '0'); const streamReplyRes = mapToStreamMessagesReply(reads); diff --git a/src/infra/y-redis/api.service.ts b/src/infra/y-redis/api.service.ts index 154a2c17..8af2d418 100644 --- a/src/infra/y-redis/api.service.ts +++ b/src/infra/y-redis/api.service.ts @@ -94,7 +94,6 @@ export class Api { public async getDoc(room: string, docid: string): Promise { const end = MetricsService.methodDurationHistogram.startTimer(); - let docChanged = false; const roomComputed = computeRedisRoomStreamName(room, docid, this.redisPrefix); @@ -123,13 +122,19 @@ export class Api { end(); - return { + const response = { ydoc, awareness, redisLastId: docMessages?.lastId.toString() ?? '0', storeReferences: docstate?.references ?? null, docChanged, }; + + if (ydoc.store.pendingStructs !== null) { + console.warn(`Document ${room} has pending structs ${JSON.stringify(ydoc.store.pendingStructs)}.`); + } + + return response; } public async destroy(): Promise { diff --git a/src/infra/y-redis/subscriber.service.ts b/src/infra/y-redis/subscriber.service.ts index b1fa6a03..8614e648 100644 --- a/src/infra/y-redis/subscriber.service.ts +++ b/src/infra/y-redis/subscriber.service.ts @@ -5,6 +5,7 @@ The original code from the `y-redis` repository is licensed under the AGPL-3.0 license. https://github.com/yjs/y-redis */ +import * as map from 'lib0/map'; import { RedisService } from '../redis/redis.service.js'; import { Api, createApiClient } from './api.service.js'; import { isSmallerRedisId } from './helper.js'; @@ -50,13 +51,9 @@ export class Subscriber { } public subscribe(stream: string, f: SubscriptionHandler): { redisId: string } { - const sub = this.subscribers.get(stream) ?? { fs: new Set(), id: '0', nextId: null }; + const sub = map.setIfUndefined(this.subscribers, stream, () => ({ fs: new Set(), id: '0', nextId: null })); sub.fs.add(f); - if (!this.subscribers.has(stream)) { - this.subscribers.set(stream, sub); - } - return { redisId: sub.id, };