diff --git a/apps/server/src/infra/rabbitmq/exchange/files-storage.ts b/apps/server/src/infra/rabbitmq/exchange/files-storage.ts index 768a25e50c4..938070f3f67 100644 --- a/apps/server/src/infra/rabbitmq/exchange/files-storage.ts +++ b/apps/server/src/infra/rabbitmq/exchange/files-storage.ts @@ -8,7 +8,7 @@ export enum FilesStorageEvents { 'LIST_FILES_OF_PARENT' = 'list-files-of-parent', 'DELETE_FILES_OF_PARENT' = 'delete-files-of-parent', 'REMOVE_CREATORID_OF_FILES' = 'remove-creatorId-of-files', - 'DELETE_ONE_FILE' = 'delete-one-file', + 'DELETE_FILES' = 'delete-files', } export enum ScanStatus { diff --git a/apps/server/src/modules/files-storage-client/service/files-storage-client.service.spec.ts b/apps/server/src/modules/files-storage-client/service/files-storage-client.service.spec.ts index 0671d0d1042..1980db764f3 100644 --- a/apps/server/src/modules/files-storage-client/service/files-storage-client.service.spec.ts +++ b/apps/server/src/modules/files-storage-client/service/files-storage-client.service.spec.ts @@ -151,19 +151,21 @@ describe('FilesStorageClientAdapterService', () => { }); }); - describe('deleteOneFile', () => { - describe('when file is deleted successfully', () => { + describe('deleteFiles', () => { + describe('when files are deleted successfully', () => { const setup = () => { const recordId = new ObjectId().toHexString(); - const spy = jest.spyOn(FilesStorageClientMapper, 'mapFileRecordResponseToFileDto').mockImplementation(() => { - return { - id: recordId, - name: 'file', - parentId: 'parentId', - parentType: FileRecordParentType.BoardNode, - }; - }); + const spy = jest + .spyOn(FilesStorageClientMapper, 'mapfileRecordListResponseToDomainFilesDto') + .mockImplementation(() => [ + { + id: recordId, + name: 'file', + parentId: 'parentId', + parentType: FileRecordParentType.BoardNode, + }, + ]); return { recordId, spy }; }; @@ -171,9 +173,9 @@ describe('FilesStorageClientAdapterService', () => { it('Should call all steps.', async () => { const { recordId, spy } = setup(); - await service.deleteOneFile(recordId); + await service.deleteFiles([recordId]); - expect(client.deleteOneFile).toHaveBeenCalledWith(recordId); + expect(client.deleteFiles).toHaveBeenCalledWith([recordId]); expect(spy).toBeCalled(); spy.mockRestore(); @@ -184,7 +186,7 @@ describe('FilesStorageClientAdapterService', () => { const setup = () => { const recordId = new ObjectId().toHexString(); - client.deleteOneFile.mockRejectedValue(new Error()); + client.deleteFiles.mockRejectedValue(new Error()); return { recordId }; }; @@ -192,7 +194,7 @@ describe('FilesStorageClientAdapterService', () => { it('Should call error mapper if throw an error.', async () => { const { recordId } = setup(); - await expect(service.deleteOneFile(recordId)).rejects.toThrowError(); + await expect(service.deleteFiles([recordId])).rejects.toThrowError(); }); }); }); diff --git a/apps/server/src/modules/files-storage-client/service/files-storage-client.service.ts b/apps/server/src/modules/files-storage-client/service/files-storage-client.service.ts index 95a96544a5a..399098a1376 100644 --- a/apps/server/src/modules/files-storage-client/service/files-storage-client.service.ts +++ b/apps/server/src/modules/files-storage-client/service/files-storage-client.service.ts @@ -38,12 +38,12 @@ export class FilesStorageClientAdapterService { return fileInfos; } - async deleteOneFile(fileRecordId: EntityId): Promise { - const response = await this.fileStorageMQProducer.deleteOneFile(fileRecordId); + async deleteFiles(fileRecordIds: EntityId[]): Promise { + const response = await this.fileStorageMQProducer.deleteFiles(fileRecordIds); - const fileInfo = FilesStorageClientMapper.mapFileRecordResponseToFileDto(response); + const fileInfos = FilesStorageClientMapper.mapfileRecordListResponseToDomainFilesDto(response); - return fileInfo; + return fileInfos; } async removeCreatorIdFromFileRecords(creatorId: EntityId): Promise { diff --git a/apps/server/src/modules/files-storage-client/service/files-storage.producer.spec.ts b/apps/server/src/modules/files-storage-client/service/files-storage.producer.spec.ts index 743046d188e..3910585ecab 100644 --- a/apps/server/src/modules/files-storage-client/service/files-storage.producer.spec.ts +++ b/apps/server/src/modules/files-storage-client/service/files-storage.producer.spec.ts @@ -239,7 +239,7 @@ describe('FilesStorageProducer', () => { }); }); - describe('deleteOneFile', () => { + describe('deleteFiles', () => { describe('when valid parameter passed and amqpConnection return with error in response', () => { const setup = () => { const recordId = new ObjectId().toHexString(); @@ -253,7 +253,7 @@ describe('FilesStorageProducer', () => { it('should call error mapper and throw with error', async () => { const { recordId, spy } = setup(); - await expect(service.deleteOneFile(recordId)).rejects.toThrowError(); + await expect(service.deleteFiles([recordId])).rejects.toThrowError(); expect(spy).toBeCalled(); }); }); @@ -267,18 +267,17 @@ describe('FilesStorageProducer', () => { const expectedParams = { exchange: FilesStorageExchange, - routingKey: FilesStorageEvents.DELETE_ONE_FILE, - payload: recordId, + routingKey: FilesStorageEvents.DELETE_FILES, + payload: [recordId], timeout, }; - return { recordId, message, expectedParams }; }; it('should call the ampqConnection.', async () => { const { recordId, expectedParams } = setup(); - await service.deleteOneFile(recordId); + await service.deleteFiles([recordId]); expect(amqpConnection.request).toHaveBeenCalledWith(expectedParams); }); @@ -286,7 +285,7 @@ describe('FilesStorageProducer', () => { it('should return the response message.', async () => { const { recordId, message } = setup(); - const res = await service.deleteOneFile(recordId); + const res = await service.deleteFiles([recordId]); expect(res).toEqual(message); }); diff --git a/apps/server/src/modules/files-storage-client/service/files-storage.producer.ts b/apps/server/src/modules/files-storage-client/service/files-storage.producer.ts index 3b639eebcfd..414ec26697b 100644 --- a/apps/server/src/modules/files-storage-client/service/files-storage.producer.ts +++ b/apps/server/src/modules/files-storage-client/service/files-storage.producer.ts @@ -51,11 +51,11 @@ export class FilesStorageProducer extends RpcMessageProducer { return response; } - async deleteOneFile(payload: EntityId): Promise { - this.logger.debug({ action: 'deleteOneFile:started', payload }); - const response = await this.request(FilesStorageEvents.DELETE_ONE_FILE, payload); + async deleteFiles(payload: EntityId[]): Promise { + this.logger.debug({ action: 'deleteFiles:started', payload }); + const response = await this.request(FilesStorageEvents.DELETE_FILES, payload); - this.logger.debug({ action: 'deleteOneFile:finished', payload }); + this.logger.debug({ action: 'deleteFiles:finished', payload }); return response; } diff --git a/apps/server/src/modules/files-storage/controller/files-storage.consumer.spec.ts b/apps/server/src/modules/files-storage/controller/files-storage.consumer.spec.ts index 9da4532cb89..c8291131252 100644 --- a/apps/server/src/modules/files-storage/controller/files-storage.consumer.spec.ts +++ b/apps/server/src/modules/files-storage/controller/files-storage.consumer.spec.ts @@ -211,7 +211,7 @@ describe('FilesStorageConsumer', () => { }); }); - describe('deleteOneFile()', () => { + describe('deleteFiles()', () => { describe('WHEN valid file exists', () => { const setup = () => { const recordId = new ObjectId().toHexString(); @@ -222,22 +222,22 @@ describe('FilesStorageConsumer', () => { return { recordId, fileRecord }; }; - it('should call filesStorageService.deleteOneFile with params', async () => { + it('should call filesStorageService.deleteFiles with params', async () => { const { recordId, fileRecord } = setup(); - await service.deleteOneFile(recordId); + await service.deleteFiles([recordId]); const result = [fileRecord]; expect(filesStorageService.getFileRecord).toBeCalledWith({ fileRecordId: recordId }); - expect(filesStorageService.deleteFilesOfParent).toBeCalledWith(result); + expect(filesStorageService.delete).toBeCalledWith(result); }); - it('should return an instance of FileRecordResponse', async () => { + it('should return array instances of FileRecordResponse', async () => { const { recordId } = setup(); - const response = await service.deleteOneFile(recordId); + const response = await service.deleteFiles([recordId]); - expect(response.message).toBeInstanceOf(FileRecordResponse); + expect(response.message[0]).toBeInstanceOf(FileRecordResponse); }); }); @@ -253,7 +253,7 @@ describe('FilesStorageConsumer', () => { it('should throw', async () => { const { recordId } = setup(); - await expect(service.deleteOneFile(recordId)).rejects.toThrow('not found'); + await expect(service.deleteFiles([recordId])).rejects.toThrow('not found'); }); }); }); diff --git a/apps/server/src/modules/files-storage/controller/files-storage.consumer.ts b/apps/server/src/modules/files-storage/controller/files-storage.consumer.ts index 722b114e1a5..eb795f4de81 100644 --- a/apps/server/src/modules/files-storage/controller/files-storage.consumer.ts +++ b/apps/server/src/modules/files-storage/controller/files-storage.consumer.ts @@ -1,6 +1,5 @@ import { RabbitPayload, RabbitRPC } from '@golevelup/nestjs-rabbitmq'; -import { CopyFileDO, FileDO, FilesStorageEvents, FilesStorageExchange } from '@infra/rabbitmq'; -import { RpcMessage } from '@infra/rabbitmq/rpc-message'; +import { CopyFileDO, FileDO, FilesStorageEvents, FilesStorageExchange, RpcMessage } from '@infra/rabbitmq'; import { MikroORM, UseRequestContext } from '@mikro-orm/core'; import { Injectable } from '@nestjs/common'; import { EntityId } from '@shared/domain/types'; @@ -75,21 +74,22 @@ export class FilesStorageConsumer { @RabbitRPC({ exchange: FilesStorageExchange, - routingKey: FilesStorageEvents.DELETE_ONE_FILE, - queue: FilesStorageEvents.DELETE_ONE_FILE, + routingKey: FilesStorageEvents.DELETE_FILES, + queue: FilesStorageEvents.DELETE_FILES, }) @UseRequestContext() - public async deleteOneFile(@RabbitPayload() payload: EntityId): Promise> { - this.logger.debug({ action: 'deleteOneFile', payload }); + public async deleteFiles(@RabbitPayload() payload: EntityId[]): Promise> { + this.logger.debug({ action: 'deleteFiles', payload }); - const fileRecord = await this.filesStorageService.getFileRecord({ fileRecordId: payload }); + const promise = payload.map((fileRecordId) => this.filesStorageService.getFileRecord({ fileRecordId })); + const fileRecords = await Promise.all(promise); - await this.previewService.deletePreviews([fileRecord]); - await this.filesStorageService.deleteFilesOfParent([fileRecord]); + await this.previewService.deletePreviews(fileRecords); + await this.filesStorageService.delete(fileRecords); - const response = FilesStorageMapper.mapToFileRecordResponse(fileRecord); + const response = FilesStorageMapper.mapToFileRecordListResponse(fileRecords, fileRecords.length); - return { message: response }; + return { message: response.data }; } @RabbitRPC({ diff --git a/apps/server/src/modules/tldraw/config.ts b/apps/server/src/modules/tldraw/config.ts index ae3bd145074..d1b421463eb 100644 --- a/apps/server/src/modules/tldraw/config.ts +++ b/apps/server/src/modules/tldraw/config.ts @@ -4,7 +4,7 @@ export interface TldrawConfig { TLDRAW_DB_URL: string; NEST_LOG_LEVEL: string; INCOMING_REQUEST_TIMEOUT: number; - TLDRAW_DB_FLUSH_SIZE: string; + TLDRAW_DB_COMPRESS_THRESHOLD: string; CONNECTION_STRING: string; FEATURE_TLDRAW_ENABLED: boolean; TLDRAW_PING_TIMEOUT: number; @@ -24,7 +24,7 @@ const tldrawConfig = { TLDRAW_DB_URL, NEST_LOG_LEVEL: Configuration.get('NEST_LOG_LEVEL') as string, INCOMING_REQUEST_TIMEOUT: Configuration.get('INCOMING_REQUEST_TIMEOUT_API') as number, - TLDRAW_DB_FLUSH_SIZE: Configuration.get('TLDRAW__DB_FLUSH_SIZE') as number, + TLDRAW_DB_COMPRESS_THRESHOLD: Configuration.get('TLDRAW__DB_COMPRESS_THRESHOLD') as number, FEATURE_TLDRAW_ENABLED: Configuration.get('FEATURE_TLDRAW_ENABLED') as boolean, CONNECTION_STRING: Configuration.get('TLDRAW_DB_URL') as string, TLDRAW_PING_TIMEOUT: Configuration.get('TLDRAW__PING_TIMEOUT') as number, diff --git a/apps/server/src/modules/tldraw/loggable/close-connection.loggable.spec.ts b/apps/server/src/modules/tldraw/loggable/close-connection.loggable.spec.ts index 3b32f617c1a..df2fbec507f 100644 --- a/apps/server/src/modules/tldraw/loggable/close-connection.loggable.spec.ts +++ b/apps/server/src/modules/tldraw/loggable/close-connection.loggable.spec.ts @@ -4,7 +4,7 @@ describe('CloseConnectionLoggable', () => { describe('getLogMessage', () => { const setup = () => { const error = new Error('test'); - const loggable = new CloseConnectionLoggable(error); + const loggable = new CloseConnectionLoggable('functionName', error); return { loggable, error }; }; @@ -15,8 +15,8 @@ describe('CloseConnectionLoggable', () => { const message = loggable.getLogMessage(); expect(message).toEqual({ - message: 'Close web socket connection error', - type: 'CLOSE_WEB_SOCKET_CONNECTION_ERROR', + message: 'Close web socket error in functionName', + type: 'CLOSE_WEB_SOCKET_ERROR', error, }); }); diff --git a/apps/server/src/modules/tldraw/loggable/close-connection.loggable.ts b/apps/server/src/modules/tldraw/loggable/close-connection.loggable.ts index 6f19ddaae3a..e1d2c90e0bd 100644 --- a/apps/server/src/modules/tldraw/loggable/close-connection.loggable.ts +++ b/apps/server/src/modules/tldraw/loggable/close-connection.loggable.ts @@ -3,7 +3,7 @@ import { ErrorLogMessage, Loggable, LogMessage, ValidationErrorLogMessage } from export class CloseConnectionLoggable implements Loggable { private error: Error | undefined; - constructor(private readonly err: unknown) { + constructor(private readonly errorLocation: string, private readonly err: unknown) { if (err instanceof Error) { this.error = err; } @@ -11,8 +11,8 @@ export class CloseConnectionLoggable implements Loggable { getLogMessage(): LogMessage | ErrorLogMessage | ValidationErrorLogMessage { return { - message: `Close web socket connection error`, - type: `CLOSE_WEB_SOCKET_CONNECTION_ERROR`, + message: `Close web socket error in ${this.errorLocation}`, + type: `CLOSE_WEB_SOCKET_ERROR`, error: this.error, }; } diff --git a/apps/server/src/modules/tldraw/loggable/file-storage-error.loggable.spec.ts b/apps/server/src/modules/tldraw/loggable/file-storage-error.loggable.spec.ts new file mode 100644 index 00000000000..817edd10f5e --- /dev/null +++ b/apps/server/src/modules/tldraw/loggable/file-storage-error.loggable.spec.ts @@ -0,0 +1,24 @@ +import { FileStorageErrorLoggable } from './file-storage-error.loggable'; + +describe('FileStorageErrorLoggable', () => { + describe('getLogMessage', () => { + const setup = () => { + const error = new Error('test'); + const loggable = new FileStorageErrorLoggable('doc1', error); + + return { loggable, error }; + }; + + it('should return a loggable message', () => { + const { loggable, error } = setup(); + + const message = loggable.getLogMessage(); + + expect(message).toEqual({ + message: 'Error in document doc1: assets could not be synchronized with file storage.', + type: 'FILE_STORAGE_GENERAL_ERROR', + error, + }); + }); + }); +}); diff --git a/apps/server/src/modules/tldraw/loggable/file-storage-error.loggable.ts b/apps/server/src/modules/tldraw/loggable/file-storage-error.loggable.ts new file mode 100644 index 00000000000..3654b608a17 --- /dev/null +++ b/apps/server/src/modules/tldraw/loggable/file-storage-error.loggable.ts @@ -0,0 +1,19 @@ +import { ErrorLogMessage, Loggable, LogMessage, ValidationErrorLogMessage } from '@src/core/logger'; + +export class FileStorageErrorLoggable implements Loggable { + private error: Error | undefined; + + constructor(private readonly docName: string, private readonly err: unknown) { + if (err instanceof Error) { + this.error = err; + } + } + + getLogMessage(): LogMessage | ErrorLogMessage | ValidationErrorLogMessage { + return { + message: `Error in document ${this.docName}: assets could not be synchronized with file storage.`, + type: `FILE_STORAGE_GENERAL_ERROR`, + error: this.error, + }; + } +} diff --git a/apps/server/src/modules/tldraw/loggable/index.ts b/apps/server/src/modules/tldraw/loggable/index.ts index 4b5746f33dc..286e877131c 100644 --- a/apps/server/src/modules/tldraw/loggable/index.ts +++ b/apps/server/src/modules/tldraw/loggable/index.ts @@ -6,3 +6,4 @@ export * from './websocket-close-error.loggable'; export * from './websocket-message-error.loggable'; export * from './ws-shared-doc-error.loggable'; export * from './close-connection.loggable'; +export * from './file-storage-error.loggable'; diff --git a/apps/server/src/modules/tldraw/loggable/websocket-error.loggable.spec.ts b/apps/server/src/modules/tldraw/loggable/websocket-error.loggable.spec.ts index 40c4ae922f3..4e129376cc3 100644 --- a/apps/server/src/modules/tldraw/loggable/websocket-error.loggable.spec.ts +++ b/apps/server/src/modules/tldraw/loggable/websocket-error.loggable.spec.ts @@ -14,7 +14,7 @@ describe('WebsocketErrorLoggable', () => { const message = loggable.getLogMessage(); - expect(message).toEqual({ message: 'Websocket error', error, type: 'WEBSOCKET_ERROR' }); + expect(message).toEqual({ message: 'Websocket error event', error, type: 'WEBSOCKET_ERROR' }); }); }); }); diff --git a/apps/server/src/modules/tldraw/loggable/websocket-error.loggable.ts b/apps/server/src/modules/tldraw/loggable/websocket-error.loggable.ts index ab58549c220..1da725b3518 100644 --- a/apps/server/src/modules/tldraw/loggable/websocket-error.loggable.ts +++ b/apps/server/src/modules/tldraw/loggable/websocket-error.loggable.ts @@ -11,7 +11,7 @@ export class WebsocketErrorLoggable implements Loggable { getLogMessage(): LogMessage | ErrorLogMessage | ValidationErrorLogMessage { return { - message: 'Websocket error', + message: 'Websocket error event', type: 'WEBSOCKET_ERROR', error: this.error, }; diff --git a/apps/server/src/modules/tldraw/repo/tldraw-board.repo.spec.ts b/apps/server/src/modules/tldraw/repo/tldraw-board.repo.spec.ts index a0a82351afb..a7c9b115d7e 100644 --- a/apps/server/src/modules/tldraw/repo/tldraw-board.repo.spec.ts +++ b/apps/server/src/modules/tldraw/repo/tldraw-board.repo.spec.ts @@ -1,19 +1,17 @@ import { Test } from '@nestjs/testing'; import { INestApplication } from '@nestjs/common'; -import WebSocket from 'ws'; import { WsAdapter } from '@nestjs/platform-ws'; import { Doc } from 'yjs'; -import { createMock, DeepMocked } from '@golevelup/ts-jest'; +import { createMock } from '@golevelup/ts-jest'; import { HttpService } from '@nestjs/axios'; import { Logger } from '@src/core/logger'; import { ConfigModule } from '@nestjs/config'; import { MongoMemoryDatabaseModule } from '@infra/database'; import { createConfigModuleOptions } from '@src/config'; -import * as YjsUtils from '../utils/ydoc-utils'; import { TldrawBoardRepo } from './tldraw-board.repo'; import { WsSharedDocDo } from '../domain'; import { TldrawFilesStorageAdapterService, TldrawWsService } from '../service'; -import { TestConnection, tldrawTestConfig } from '../testing'; +import { tldrawTestConfig } from '../testing'; import { TldrawDrawing } from '../entities'; import { TldrawWs } from '../controller'; import { MetricsService } from '../metrics'; @@ -24,11 +22,6 @@ import { TldrawRedisFactory } from '../redis'; describe('TldrawBoardRepo', () => { let app: INestApplication; let repo: TldrawBoardRepo; - let ws: WebSocket; - let logger: DeepMocked; - - const gatewayPort = 3346; - const wsUrl = TestConnection.getWsUrl(gatewayPort); beforeAll(async () => { const testingModule = await Test.createTestingModule({ @@ -63,7 +56,6 @@ describe('TldrawBoardRepo', () => { }).compile(); repo = testingModule.get(TldrawBoardRepo); - logger = testingModule.get(Logger); app = testingModule.createNestApplication(); app.useWebSocketAdapter(new WsAdapter(app)); await app.init(); @@ -84,73 +76,6 @@ describe('TldrawBoardRepo', () => { expect(repo.mdb).toBeDefined(); }); - describe('updateDocument', () => { - describe('when document receives empty update', () => { - const setup = async () => { - const doc = new WsSharedDocDo('TEST2'); - ws = await TestConnection.setupWs(wsUrl, 'TEST2'); - const wsSet: Set = new Set(); - wsSet.add(0); - doc.connections.set(ws, wsSet); - const storeGetYDocSpy = jest.spyOn(repo.mdb, 'getYDoc').mockResolvedValueOnce(new WsSharedDocDo('TEST')); - const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdateTransactional').mockResolvedValueOnce(1); - - return { - doc, - storeUpdateSpy, - storeGetYDocSpy, - }; - }; - - it('should not update db with diff', async () => { - const { doc, storeUpdateSpy, storeGetYDocSpy } = await setup(); - - await repo.updateDocument('TEST2', doc); - - expect(storeUpdateSpy).toHaveBeenCalledTimes(0); - storeUpdateSpy.mockRestore(); - storeGetYDocSpy.mockRestore(); - ws.close(); - }); - }); - - describe('when document receive update', () => { - const setup = async () => { - const clientMessageMock = 'test-message'; - const doc = new WsSharedDocDo('TEST'); - ws = await TestConnection.setupWs(wsUrl, 'TEST'); - const wsSet: Set = new Set(); - wsSet.add(0); - doc.connections.set(ws, wsSet); - const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdateTransactional').mockResolvedValue(1); - const storeGetYDocSpy = jest.spyOn(repo.mdb, 'getYDoc').mockResolvedValueOnce(new WsSharedDocDo('TEST')); - const byteArray = new TextEncoder().encode(clientMessageMock); - const errorLogSpy = jest.spyOn(logger, 'warning'); - - return { - doc, - byteArray, - storeUpdateSpy, - storeGetYDocSpy, - errorLogSpy, - }; - }; - - it('should update db with diff', async () => { - const { doc, byteArray, storeUpdateSpy, storeGetYDocSpy } = await setup(); - - await repo.updateDocument('TEST', doc); - doc.emit('update', [byteArray, undefined, doc]); - - expect(storeUpdateSpy).toHaveBeenCalled(); - expect(storeUpdateSpy).toHaveBeenCalledTimes(1); - storeUpdateSpy.mockRestore(); - storeGetYDocSpy.mockRestore(); - ws.close(); - }); - }); - }); - describe('getYDocFromMdb', () => { describe('when taking doc data from db', () => { const setup = () => { @@ -172,88 +97,17 @@ describe('TldrawBoardRepo', () => { }); }); - describe('updateStoredDocWithDiff', () => { - describe('when the difference between update and current drawing is more than 0', () => { - const setup = (shouldStoreUpdateThrowError: boolean) => { - const calculateDiffSpy = jest.spyOn(YjsUtils, 'calculateDiff').mockReturnValueOnce(1); - const errorLogSpy = jest.spyOn(logger, 'warning'); - const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdateTransactional'); - - if (shouldStoreUpdateThrowError) { - storeUpdateSpy.mockRejectedValueOnce(new Error('test error')); - } else { - storeUpdateSpy.mockResolvedValueOnce(1); - } - - return { - calculateDiffSpy, - errorLogSpy, - storeUpdateSpy, - }; - }; - - it('should call store update method', async () => { - const { calculateDiffSpy, storeUpdateSpy } = setup(false); - // const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdateTransactional').mockResolvedValueOnce(1); - const diffArray = new Uint8Array(); - - await repo.updateStoredDocWithDiff('test', diffArray); - - expect(storeUpdateSpy).toHaveBeenCalled(); - calculateDiffSpy.mockRestore(); - storeUpdateSpy.mockRestore(); - }); - - it('should log error if update fails', async () => { - const { calculateDiffSpy, errorLogSpy, storeUpdateSpy } = setup(true); - // const storeUpdateSpy = jest - // .spyOn(repo.mdb, 'storeUpdateTransactional') - // .mockRejectedValueOnce(new Error('test error')); - const diffArray = new Uint8Array(); - await expect(repo.updateStoredDocWithDiff('test', diffArray)).rejects.toThrow('test error'); - - expect(storeUpdateSpy).toHaveBeenCalled(); - expect(errorLogSpy).toHaveBeenCalled(); - calculateDiffSpy.mockRestore(); - storeUpdateSpy.mockRestore(); - }); - }); - - describe('when the difference between update and current drawing is 0', () => { - const setup = () => { - const calculateDiffSpy = jest.spyOn(YjsUtils, 'calculateDiff').mockReturnValueOnce(0); - const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdateTransactional'); - - return { - calculateDiffSpy, - storeUpdateSpy, - }; - }; - - it('should not call store update method', async () => { - const { storeUpdateSpy, calculateDiffSpy } = setup(); - const diffArray = new Uint8Array(); - - await repo.updateStoredDocWithDiff('test', diffArray); - - expect(storeUpdateSpy).not.toHaveBeenCalled(); - calculateDiffSpy.mockRestore(); - storeUpdateSpy.mockRestore(); - }); - }); - }); - - describe('flushDocument', () => { + describe('compressDocument', () => { const setup = () => { - const flushDocumentSpy = jest.spyOn(repo.mdb, 'flushDocumentTransactional').mockResolvedValueOnce(); + const flushDocumentSpy = jest.spyOn(repo.mdb, 'compressDocumentTransactional').mockResolvedValueOnce(); return { flushDocumentSpy }; }; - it('should call flush method on YMongo', async () => { + it('should call compress method on YMongo', async () => { const { flushDocumentSpy } = setup(); - await repo.flushDocument('test'); + await repo.compressDocument('test'); expect(flushDocumentSpy).toHaveBeenCalled(); flushDocumentSpy.mockRestore(); @@ -262,9 +116,13 @@ describe('TldrawBoardRepo', () => { describe('storeUpdate', () => { const setup = () => { - const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdateTransactional'); + const storeUpdateSpy = jest.spyOn(repo.mdb, 'storeUpdateTransactional').mockResolvedValue(2); + const compressDocumentSpy = jest.spyOn(repo.mdb, 'compressDocumentTransactional').mockResolvedValueOnce(); - return { storeUpdateSpy }; + return { + storeUpdateSpy, + compressDocumentSpy, + }; }; it('should call store update method on YMongo', async () => { @@ -275,5 +133,15 @@ describe('TldrawBoardRepo', () => { expect(storeUpdateSpy).toHaveBeenCalled(); storeUpdateSpy.mockRestore(); }); + + it('should call compressDocument if compress threshold was reached', async () => { + const { storeUpdateSpy, compressDocumentSpy } = setup(); + + await repo.storeUpdate('test', new Uint8Array()); + + expect(storeUpdateSpy).toHaveBeenCalled(); + expect(compressDocumentSpy).toHaveBeenCalled(); + storeUpdateSpy.mockRestore(); + }); }); }); diff --git a/apps/server/src/modules/tldraw/repo/tldraw-board.repo.ts b/apps/server/src/modules/tldraw/repo/tldraw-board.repo.ts index c7eddec7b04..57ea2d408dd 100644 --- a/apps/server/src/modules/tldraw/repo/tldraw-board.repo.ts +++ b/apps/server/src/modules/tldraw/repo/tldraw-board.repo.ts @@ -1,14 +1,17 @@ import { Injectable } from '@nestjs/common'; -import { applyUpdate, Doc, encodeStateAsUpdate, encodeStateVector } from 'yjs'; import { Logger } from '@src/core/logger'; -import { MongoTransactionErrorLoggable } from '../loggable'; -import { calculateDiff } from '../utils'; +import { ConfigService } from '@nestjs/config'; +import { TldrawConfig } from '../config'; import { WsSharedDocDo } from '../domain'; import { YMongodb } from './y-mongodb'; @Injectable() export class TldrawBoardRepo { - constructor(readonly mdb: YMongodb, private readonly logger: Logger) { + constructor( + private readonly configService: ConfigService, + readonly mdb: YMongodb, + private readonly logger: Logger + ) { this.logger.setContext(TldrawBoardRepo.name); } @@ -16,39 +19,21 @@ export class TldrawBoardRepo { await this.mdb.createIndex(); } - public async getYDocFromMdb(docName: string): Promise { + public async getYDocFromMdb(docName: string): Promise { const yDoc = await this.mdb.getYDoc(docName); return yDoc; } - public async updateStoredDocWithDiff(docName: string, diff: Uint8Array): Promise { - const calc = calculateDiff(diff); - if (calc > 0) { - await this.mdb.storeUpdateTransactional(docName, diff).catch((err) => { - this.logger.warning(new MongoTransactionErrorLoggable(err)); - throw err; - }); - } - } - - public async updateDocument(docName: string, ydoc: WsSharedDocDo): Promise { - const persistedYdoc = await this.getYDocFromMdb(docName); - const persistedStateVector = encodeStateVector(persistedYdoc); - const diff = encodeStateAsUpdate(ydoc, persistedStateVector); - await this.updateStoredDocWithDiff(docName, diff); - - applyUpdate(ydoc, encodeStateAsUpdate(persistedYdoc)); - - ydoc.on('update', (update: Uint8Array) => this.mdb.storeUpdateTransactional(docName, update)); - - persistedYdoc.destroy(); - } - - public async flushDocument(docName: string): Promise { - await this.mdb.flushDocumentTransactional(docName); + public async compressDocument(docName: string): Promise { + await this.mdb.compressDocumentTransactional(docName); } public async storeUpdate(docName: string, update: Uint8Array): Promise { - await this.mdb.storeUpdateTransactional(docName, update); + const compressThreshold = this.configService.get('TLDRAW_DB_COMPRESS_THRESHOLD'); + const currentClock = await this.mdb.storeUpdateTransactional(docName, update); + + if (currentClock % compressThreshold === 0) { + await this.compressDocument(docName); + } } } diff --git a/apps/server/src/modules/tldraw/repo/y-mongodb.spec.ts b/apps/server/src/modules/tldraw/repo/y-mongodb.spec.ts index 9c529dd7798..669dc930096 100644 --- a/apps/server/src/modules/tldraw/repo/y-mongodb.spec.ts +++ b/apps/server/src/modules/tldraw/repo/y-mongodb.spec.ts @@ -122,9 +122,10 @@ describe('YMongoDb', () => { }); }); - describe('flushDocumentTransactional', () => { + describe('compressDocumentTransactional', () => { const setup = async () => { - const applyUpdateSpy = jest.spyOn(Yjs, 'applyUpdate').mockReturnValue(); + const applyUpdateSpy = jest.spyOn(Yjs, 'applyUpdate').mockReturnValueOnce(); + const mergeUpdatesSpy = jest.spyOn(Yjs, 'mergeUpdates').mockReturnValueOnce(new Uint8Array([])); const drawing1 = tldrawEntityFactory.build({ clock: 1, part: undefined }); const drawing2 = tldrawEntityFactory.build({ clock: 2, part: undefined }); @@ -136,6 +137,7 @@ describe('YMongoDb', () => { return { applyUpdateSpy, + mergeUpdatesSpy, drawing1, }; }; @@ -143,7 +145,7 @@ describe('YMongoDb', () => { it('should merge multiple documents with the same name in the database into two (one main document and one with update)', async () => { const { applyUpdateSpy, drawing1 } = await setup(); - await mdb.flushDocumentTransactional(drawing1.docName); + await mdb.compressDocumentTransactional(drawing1.docName); const docs = await em.findAndCount(TldrawDrawing, { docName: drawing1.docName }); expect(docs.length).toEqual(2); @@ -173,6 +175,7 @@ describe('YMongoDb', () => { describe('when getting document with well defined parts', () => { const setup = async () => { const applyUpdateSpy = jest.spyOn(Yjs, 'applyUpdate').mockReturnValue(); + const mergeUpdatesSpy = jest.spyOn(Yjs, 'mergeUpdates').mockReturnValue(new Uint8Array([])); const drawing1 = tldrawEntityFactory.build({ clock: 1, part: 1 }); const drawing2 = tldrawEntityFactory.build({ clock: 1, part: 2 }); @@ -183,6 +186,7 @@ describe('YMongoDb', () => { return { applyUpdateSpy, + mergeUpdatesSpy, drawing1, drawing2, drawing3, diff --git a/apps/server/src/modules/tldraw/repo/y-mongodb.ts b/apps/server/src/modules/tldraw/repo/y-mongodb.ts index e9a1657f5a2..faad33396f7 100644 --- a/apps/server/src/modules/tldraw/repo/y-mongodb.ts +++ b/apps/server/src/modules/tldraw/repo/y-mongodb.ts @@ -1,12 +1,13 @@ import { ConfigService } from '@nestjs/config'; import * as promise from 'lib0/promise'; -import { applyUpdate, Doc, encodeStateAsUpdate, encodeStateVector } from 'yjs'; +import { applyUpdate, Doc, encodeStateAsUpdate, encodeStateVector, mergeUpdates } from 'yjs'; import { Injectable } from '@nestjs/common'; import { Logger } from '@src/core/logger'; import { Buffer } from 'buffer'; import * as binary from 'lib0/binary'; import * as encoding from 'lib0/encoding'; import { BulkWriteResult } from 'mongodb'; +import { WsSharedDocDo } from '../domain'; import { MongoTransactionErrorLoggable } from '../loggable'; import { TldrawDrawing } from '../entities'; import { TldrawConfig } from '../config'; @@ -18,7 +19,7 @@ import { KeyFactory } from './key.factory'; export class YMongodb { private readonly maxDocumentSize: number; - private readonly flushSize: number; + private readonly gcEnabled: boolean; private readonly _transact: >(docName: string, fn: () => T) => T; @@ -33,7 +34,7 @@ export class YMongodb { ) { this.logger.setContext(YMongodb.name); - this.flushSize = this.configService.get('TLDRAW_DB_FLUSH_SIZE'); + this.gcEnabled = this.configService.get('TLDRAW_GC_ENABLED'); this.maxDocumentSize = this.configService.get('TLDRAW_MAX_DOCUMENT_SIZE'); // execute a transaction on a database @@ -74,18 +75,14 @@ export class YMongodb { await this.repo.ensureIndexes(); } - public getYDoc(docName: string): Promise { - return this._transact(docName, async (): Promise => { + public getYDoc(docName: string): Promise { + return this._transact(docName, async (): Promise => { const updates = await this.getMongoUpdates(docName); - const ydoc = new Doc(); - ydoc.transact(() => { - for (const element of updates) { - applyUpdate(ydoc, element); - } - }); - if (updates.length > this.flushSize) { - await this.flushDocument(docName, encodeStateAsUpdate(ydoc), encodeStateVector(ydoc)); - } + const mergedUpdates = mergeUpdates(updates); + + const ydoc = new WsSharedDocDo(docName, this.gcEnabled); + applyUpdate(ydoc, mergedUpdates); + return ydoc; }); } @@ -94,14 +91,41 @@ export class YMongodb { return this._transact(docName, () => this.storeUpdate(docName, update)); } - public flushDocumentTransactional(docName: string): Promise { + public compressDocumentTransactional(docName: string): Promise { return this._transact(docName, async () => { const updates = await this.getMongoUpdates(docName); - const { update, sv } = this.mergeUpdates(updates); - await this.flushDocument(docName, update, sv); + const mergedUpdates = mergeUpdates(updates); + + const ydoc = new Doc(); + applyUpdate(ydoc, mergedUpdates); + + const stateAsUpdate = encodeStateAsUpdate(ydoc); + const sv = encodeStateVector(ydoc); + const clock = await this.storeUpdate(docName, stateAsUpdate); + await this.writeStateVector(docName, sv, clock); + await this.clearUpdatesRange(docName, 0, clock); + + ydoc.destroy(); }); } + public async getCurrentUpdateClock(docName: string): Promise { + const updates = await this.getMongoBulkData( + { + ...KeyFactory.createForUpdate(docName, 0), + clock: { + $gte: 0, + $lt: binary.BITS32, + }, + }, + { reverse: true, limit: 1 } + ); + + const clock = this.extractClock(updates); + + return clock; + } + private async clearUpdatesRange(docName: string, from: number, to: number): Promise { return this.repo.del({ docName, @@ -168,23 +192,6 @@ export class YMongodb { return this.convertMongoUpdates(docs); } - private async getCurrentUpdateClock(docName: string): Promise { - const updates = await this.getMongoBulkData( - { - ...KeyFactory.createForUpdate(docName, 0), - clock: { - $gte: 0, - $lt: binary.BITS32, - }, - }, - { reverse: true, limit: 1 } - ); - - const clock = this.extractClock(updates); - - return clock; - } - private async writeStateVector(docName: string, sv: Uint8Array, clock: number): Promise { const encoder = encoding.createEncoder(); encoding.writeVarUint(encoder, clock); @@ -236,30 +243,6 @@ export class YMongodb { return clock + 1; } - /** - * For now this is a helper method that creates a Y.Doc and then re-encodes a document update. - * In the future this will be handled by Yjs without creating a Y.Doc (constant memory consumption). - */ - private mergeUpdates(updates: Array): { update: Uint8Array; sv: Uint8Array } { - const ydoc = new Doc(); - ydoc.transact(() => { - for (const element of updates) { - applyUpdate(ydoc, element); - } - }); - return { update: encodeStateAsUpdate(ydoc), sv: encodeStateVector(ydoc) }; - } - - /** - * Merge all MongoDB documents of the same yjs document together. - */ - private async flushDocument(docName: string, stateAsUpdate: Uint8Array, stateVector: Uint8Array): Promise { - const clock = await this.storeUpdate(docName, stateAsUpdate); - await this.writeStateVector(docName, stateVector, clock); - await this.clearUpdatesRange(docName, 0, clock); - return clock; - } - private isSameClock(doc1: TldrawDrawing, doc2: TldrawDrawing): boolean { return doc1.clock === doc2.clock; } diff --git a/apps/server/src/modules/tldraw/service/tldraw-files-storage.service.spec.ts b/apps/server/src/modules/tldraw/service/tldraw-files-storage.service.spec.ts index 02b8f2e45ab..f8ace211d6e 100644 --- a/apps/server/src/modules/tldraw/service/tldraw-files-storage.service.spec.ts +++ b/apps/server/src/modules/tldraw/service/tldraw-files-storage.service.spec.ts @@ -50,22 +50,22 @@ describe('TldrawFilesStorageAdapterService', () => { const listFilesOfParentSpy = jest .spyOn(filesStorageClientAdapterService, 'listFilesOfParent') .mockResolvedValueOnce(fileDtos); - const deleteOneFileSpy = jest.spyOn(filesStorageClientAdapterService, 'deleteOneFile'); + const deleteFilesSpy = jest.spyOn(filesStorageClientAdapterService, 'deleteFiles'); return { usedAssets, listFilesOfParentSpy, - deleteOneFileSpy, + deleteFilesSpy, }; }; - it('should call deleteOneFile on filesStorageClientAdapterService correct number of times', async () => { - const { usedAssets, listFilesOfParentSpy, deleteOneFileSpy } = setup(); + it('should call deleteFiles on filesStorageClientAdapterService', async () => { + const { usedAssets, listFilesOfParentSpy, deleteFilesSpy } = setup(); await tldrawFilesStorageAdapterService.deleteUnusedFilesForDocument('docname', usedAssets); expect(listFilesOfParentSpy).toHaveBeenCalled(); - expect(deleteOneFileSpy).toHaveBeenCalledTimes(2); + expect(deleteFilesSpy).toHaveBeenCalled(); }); }); @@ -74,21 +74,21 @@ describe('TldrawFilesStorageAdapterService', () => { const listFilesOfParentSpy = jest .spyOn(filesStorageClientAdapterService, 'listFilesOfParent') .mockResolvedValueOnce([]); - const deleteOneFileSpy = jest.spyOn(filesStorageClientAdapterService, 'deleteOneFile'); + const deleteFilesSpy = jest.spyOn(filesStorageClientAdapterService, 'deleteFiles'); return { listFilesOfParentSpy, - deleteOneFileSpy, + deleteFilesSpy, }; }; - it('should not call deleteOneFile on filesStorageClientAdapterService', async () => { - const { listFilesOfParentSpy, deleteOneFileSpy } = setup(); + it('should not call deleteFiles on filesStorageClientAdapterService', async () => { + const { listFilesOfParentSpy, deleteFilesSpy } = setup(); await tldrawFilesStorageAdapterService.deleteUnusedFilesForDocument('docname', []); expect(listFilesOfParentSpy).toHaveBeenCalled(); - expect(deleteOneFileSpy).not.toHaveBeenCalled(); + expect(deleteFilesSpy).not.toHaveBeenCalled(); }); }); }); diff --git a/apps/server/src/modules/tldraw/service/tldraw-files-storage.service.ts b/apps/server/src/modules/tldraw/service/tldraw-files-storage.service.ts index a9f44f5b163..c0263edfcf6 100644 --- a/apps/server/src/modules/tldraw/service/tldraw-files-storage.service.ts +++ b/apps/server/src/modules/tldraw/service/tldraw-files-storage.service.ts @@ -6,21 +6,28 @@ import { TldrawAsset } from '../types'; export class TldrawFilesStorageAdapterService { constructor(private readonly filesStorageClientAdapterService: FilesStorageClientAdapterService) {} - public async deleteUnusedFilesForDocument(docName: string, usedAssets: TldrawAsset[]) { + public async deleteUnusedFilesForDocument(docName: string, usedAssets: TldrawAsset[]): Promise { const fileRecords = await this.filesStorageClientAdapterService.listFilesOfParent(docName); + const fileRecordIdsForDeletion = this.foundAssetsForDeletion(fileRecords, usedAssets); - const deleteFilePromises = fileRecords.map((fileRecord) => - this.createFileDeletionActionWhenAssetNotExists(fileRecord, usedAssets) - ); + if (fileRecordIdsForDeletion.length === 0) { + return; + } - await Promise.allSettled(deleteFilePromises); + await this.filesStorageClientAdapterService.deleteFiles(fileRecordIdsForDeletion); } - private createFileDeletionActionWhenAssetNotExists(fileRecord: FileDto, usedAssets: TldrawAsset[]) { - const foundAsset = usedAssets.find((asset) => this.matchAssetWithFileRecord(asset, fileRecord)); - const promise = foundAsset ? Promise.resolve() : this.filesStorageClientAdapterService.deleteOneFile(fileRecord.id); + private foundAssetsForDeletion(fileRecords: FileDto[], usedAssets: TldrawAsset[]): string[] { + const fileRecordIdsForDeletion: string[] = []; - return promise; + for (const fileRecord of fileRecords) { + const foundAsset = usedAssets.some((asset) => this.matchAssetWithFileRecord(asset, fileRecord)); + if (!foundAsset) { + fileRecordIdsForDeletion.push(fileRecord.id); + } + } + + return fileRecordIdsForDeletion; } private matchAssetWithFileRecord(asset: TldrawAsset, fileRecord: FileDto) { diff --git a/apps/server/src/modules/tldraw/service/tldraw.ws.service.spec.ts b/apps/server/src/modules/tldraw/service/tldraw.ws.service.spec.ts index 695a1257b32..f83b1f1c9b8 100644 --- a/apps/server/src/modules/tldraw/service/tldraw.ws.service.spec.ts +++ b/apps/server/src/modules/tldraw/service/tldraw.ws.service.spec.ts @@ -53,8 +53,9 @@ describe('TldrawWSService', () => { let app: INestApplication; let ws: WebSocket; let service: TldrawWsService; - let boardRepo: TldrawBoardRepo; + let boardRepo: DeepMocked; let logger: DeepMocked; + let tldrawFilesStorageAdapterService: DeepMocked; const gatewayPort = 3346; const wsUrl = TestConnection.getWsUrl(gatewayPort); @@ -73,10 +74,13 @@ describe('TldrawWSService', () => { providers: [ TldrawWs, TldrawWsService, - TldrawBoardRepo, YMongodb, MetricsService, TldrawRedisFactory, + { + provide: TldrawBoardRepo, + useValue: createMock(), + }, { provide: TldrawRepo, useValue: createMock(), @@ -99,6 +103,7 @@ describe('TldrawWSService', () => { service = testingModule.get(TldrawWsService); boardRepo = testingModule.get(TldrawBoardRepo); logger = testingModule.get(Logger); + tldrawFilesStorageAdapterService = testingModule.get(TldrawFilesStorageAdapterService); app = testingModule.createNestApplication(); app.useWebSocketAdapter(new WsAdapter(app)); await app.init(); @@ -499,6 +504,7 @@ describe('TldrawWSService', () => { describe('on websocket error', () => { const setup = async () => { + boardRepo.getYDocFromMdb.mockResolvedValueOnce(new WsSharedDocDo('TEST')); ws = await TestConnection.setupWs(wsUrl, 'TEST'); const errorLogSpy = jest.spyOn(logger, 'warning'); @@ -509,7 +515,6 @@ describe('TldrawWSService', () => { it('should log error', async () => { const { errorLogSpy } = await setup(); - await service.setupWSConnection(ws, 'TEST'); ws.emit('error', new Error('error')); @@ -521,38 +526,66 @@ describe('TldrawWSService', () => { describe('closeConn', () => { describe('when there is no error', () => { const setup = async () => { + boardRepo.getYDocFromMdb.mockResolvedValueOnce(new WsSharedDocDo('TEST')); ws = await TestConnection.setupWs(wsUrl); - const flushDocumentSpy = jest.spyOn(boardRepo, 'flushDocument').mockResolvedValueOnce(); + boardRepo.compressDocument.mockResolvedValueOnce(); const redisUnsubscribeSpy = jest.spyOn(Ioredis.Redis.prototype, 'unsubscribe').mockResolvedValueOnce(1); const closeConnSpy = jest.spyOn(service, 'closeConn'); jest.spyOn(Ioredis.Redis.prototype, 'subscribe').mockResolvedValueOnce({}); return { - flushDocumentSpy, redisUnsubscribeSpy, closeConnSpy, }; }; it('should close connection', async () => { - const { flushDocumentSpy, redisUnsubscribeSpy, closeConnSpy } = await setup(); + const { redisUnsubscribeSpy, closeConnSpy } = await setup(); await service.setupWSConnection(ws, 'TEST'); expect(closeConnSpy).toHaveBeenCalled(); ws.close(); closeConnSpy.mockRestore(); - flushDocumentSpy.mockRestore(); redisUnsubscribeSpy.mockRestore(); }); }); + describe('when deleteUnusedFilesForDocument fails', () => { + const setup = async () => { + ws = await TestConnection.setupWs(wsUrl); + const doc = TldrawWsFactory.createWsSharedDocDo(); + + const errorLogSpy = jest.spyOn(logger, 'warning'); + const storageSpy = jest + .spyOn(tldrawFilesStorageAdapterService, 'deleteUnusedFilesForDocument') + .mockRejectedValueOnce(new Error('error')); + + return { + doc, + errorLogSpy, + storageSpy, + }; + }; + + it('should log error', async () => { + const { doc, errorLogSpy } = await setup(); + + await service.closeConn(doc, ws); + await delay(100); + + expect(errorLogSpy).toHaveBeenCalled(); + ws.close(); + }); + }); + describe('when close connection fails', () => { const setup = async () => { + boardRepo.getYDocFromMdb.mockResolvedValueOnce(new WsSharedDocDo('TEST')); ws = await TestConnection.setupWs(wsUrl); - const flushDocumentSpy = jest.spyOn(boardRepo, 'flushDocument').mockResolvedValueOnce(); + boardRepo.compressDocument.mockResolvedValueOnce(); const redisUnsubscribeSpy = jest.spyOn(Ioredis.Redis.prototype, 'unsubscribe').mockResolvedValueOnce(1); const closeConnSpy = jest.spyOn(service, 'closeConn').mockRejectedValueOnce(new Error('error')); const errorLogSpy = jest.spyOn(logger, 'warning'); @@ -560,7 +593,6 @@ describe('TldrawWSService', () => { jest.spyOn(Ioredis.Redis.prototype, 'subscribe').mockResolvedValueOnce({}); return { - flushDocumentSpy, redisUnsubscribeSpy, closeConnSpy, errorLogSpy, @@ -569,7 +601,7 @@ describe('TldrawWSService', () => { }; it('should log error', async () => { - const { flushDocumentSpy, redisUnsubscribeSpy, closeConnSpy, errorLogSpy, sendSpyError } = await setup(); + const { redisUnsubscribeSpy, closeConnSpy, errorLogSpy, sendSpyError } = await setup(); await service.setupWSConnection(ws, 'TEST'); @@ -580,7 +612,6 @@ describe('TldrawWSService', () => { ws.close(); await delay(100); expect(errorLogSpy).toHaveBeenCalled(); - flushDocumentSpy.mockRestore(); redisUnsubscribeSpy.mockRestore(); closeConnSpy.mockRestore(); sendSpyError.mockRestore(); @@ -593,7 +624,7 @@ describe('TldrawWSService', () => { const doc = TldrawWsFactory.createWsSharedDocDo(); doc.connections.set(ws, new Set()); - const flushDocumentSpy = jest.spyOn(boardRepo, 'flushDocument').mockResolvedValueOnce(); + boardRepo.compressDocument.mockResolvedValueOnce(); const redisUnsubscribeSpy = jest .spyOn(Ioredis.Redis.prototype, 'unsubscribe') .mockImplementationOnce((...args: unknown[]) => { @@ -605,18 +636,16 @@ describe('TldrawWSService', () => { return Promise.resolve(0); }); const errorLogSpy = jest.spyOn(logger, 'warning'); - jest.spyOn(Ioredis.Redis.prototype, 'subscribe').mockResolvedValueOnce({}); return { doc, - flushDocumentSpy, redisUnsubscribeSpy, errorLogSpy, }; }; it('should log error', async () => { - const { doc, errorLogSpy, flushDocumentSpy, redisUnsubscribeSpy } = await setup(); + const { doc, errorLogSpy, redisUnsubscribeSpy } = await setup(); await service.closeConn(doc, ws); @@ -624,7 +653,6 @@ describe('TldrawWSService', () => { expect(redisUnsubscribeSpy).toHaveBeenCalled(); expect(errorLogSpy).toHaveBeenCalled(); - flushDocumentSpy.mockRestore(); redisUnsubscribeSpy.mockRestore(); }); }); @@ -635,7 +663,7 @@ describe('TldrawWSService', () => { const doc = TldrawWsFactory.createWsSharedDocDo(); doc.connections.set(ws, new Set()); - const flushDocumentSpy = jest.spyOn(boardRepo, 'flushDocument').mockResolvedValueOnce(); + boardRepo.compressDocument.mockResolvedValueOnce(); const redisUnsubscribeSpy = jest .spyOn(Ioredis.Redis.prototype, 'unsubscribe') .mockRejectedValue(new Error('error')); @@ -645,7 +673,6 @@ describe('TldrawWSService', () => { return { doc, - flushDocumentSpy, redisUnsubscribeSpy, closeConnSpy, errorLogSpy, @@ -653,7 +680,7 @@ describe('TldrawWSService', () => { }; it('should log error', async () => { - const { doc, errorLogSpy, flushDocumentSpy, redisUnsubscribeSpy, closeConnSpy } = await setup(); + const { doc, errorLogSpy, redisUnsubscribeSpy, closeConnSpy } = await setup(); await service.closeConn(doc, ws); await delay(200); @@ -662,33 +689,31 @@ describe('TldrawWSService', () => { expect(closeConnSpy).toHaveBeenCalled(); expect(errorLogSpy).toHaveBeenCalled(); closeConnSpy.mockRestore(); - flushDocumentSpy.mockRestore(); redisUnsubscribeSpy.mockRestore(); }); }); describe('when updating new document fails', () => { const setup = async () => { + boardRepo.getYDocFromMdb.mockResolvedValueOnce(new WsSharedDocDo('test-update-fail')); ws = await TestConnection.setupWs(wsUrl); const closeConnSpy = jest.spyOn(service, 'closeConn'); const errorLogSpy = jest.spyOn(logger, 'warning'); - const updateDocSpy = jest.spyOn(boardRepo, 'updateDocument'); const sendSpy = jest.spyOn(service, 'send').mockImplementation(() => {}); + jest.spyOn(Ioredis.Redis.prototype, 'subscribe').mockResolvedValueOnce({}); return { closeConnSpy, errorLogSpy, - updateDocSpy, sendSpy, }; }; it('should log error', async () => { - const { sendSpy, errorLogSpy, updateDocSpy, closeConnSpy } = await setup(); - updateDocSpy.mockRejectedValueOnce(new Error('error')); + const { sendSpy, errorLogSpy, closeConnSpy } = await setup(); - await expect(service.setupWSConnection(ws, 'test-update-fail')).rejects.toThrow('error'); + await service.setupWSConnection(ws, 'test-update-fail'); ws.close(); expect(errorLogSpy).toHaveBeenCalled(); @@ -700,6 +725,7 @@ describe('TldrawWSService', () => { describe('when pong not received', () => { const setup = async () => { + boardRepo.getYDocFromMdb.mockResolvedValueOnce(new WsSharedDocDo('TEST')); ws = await TestConnection.setupWs(wsUrl, 'TEST'); const messageHandlerSpy = jest.spyOn(service, 'messageHandler').mockReturnValueOnce(); @@ -738,6 +764,7 @@ describe('TldrawWSService', () => { describe('when pong not received and close connection fails', () => { const setup = async () => { + boardRepo.getYDocFromMdb.mockResolvedValueOnce(new WsSharedDocDo('TEST')); ws = await TestConnection.setupWs(wsUrl, 'TEST'); const messageHandlerSpy = jest.spyOn(service, 'messageHandler').mockReturnValueOnce(); @@ -746,7 +773,6 @@ describe('TldrawWSService', () => { const sendSpy = jest.spyOn(service, 'send').mockImplementation(() => {}); const clearIntervalSpy = jest.spyOn(global, 'clearInterval'); const errorLogSpy = jest.spyOn(logger, 'warning'); - jest.spyOn(boardRepo, 'updateDocument').mockImplementationOnce(() => Promise.resolve()); jest.spyOn(Ioredis.Redis.prototype, 'subscribe').mockResolvedValueOnce({}); return { @@ -825,23 +851,21 @@ describe('TldrawWSService', () => { const doc = TldrawWsFactory.createWsSharedDocDo(); doc.connections.set(ws, new Set()); - const flushDocumentSpy = jest.spyOn(boardRepo, 'flushDocument'); + boardRepo.compressDocument.mockRejectedValueOnce(new Error('error')); const errorLogSpy = jest.spyOn(logger, 'warning'); return { doc, - flushDocumentSpy, errorLogSpy, }; }; it('should log error', async () => { - const { doc, flushDocumentSpy, errorLogSpy } = await setup(); - flushDocumentSpy.mockRejectedValueOnce(new Error('error')); + const { doc, errorLogSpy } = await setup(); await expect(service.closeConn(doc, ws)).rejects.toThrow('error'); - expect(flushDocumentSpy).toHaveBeenCalled(); + expect(boardRepo.compressDocument).toHaveBeenCalled(); expect(errorLogSpy).toHaveBeenCalled(); ws.close(); }); @@ -884,20 +908,24 @@ describe('TldrawWSService', () => { describe('databaseUpdateHandler', () => { const setup = async () => { ws = await TestConnection.setupWs(wsUrl); + boardRepo.storeUpdate.mockResolvedValueOnce(); + }; - const storeUpdateSpy = jest.spyOn(boardRepo, 'storeUpdate').mockResolvedValueOnce(); + it('should call storeUpdate method', async () => { + await setup(); - return { - storeUpdateSpy, - }; - }; + await service.databaseUpdateHandler('test', new Uint8Array(), 'test'); - it('should call send method', async () => { - const { storeUpdateSpy } = await setup(); + expect(boardRepo.storeUpdate).toHaveBeenCalled(); + ws.close(); + }); + + it('should not call storeUpdate when origin is redis', async () => { + await setup(); - await service.databaseUpdateHandler('test', new Uint8Array()); + await service.databaseUpdateHandler('test', new Uint8Array(), 'redis'); - expect(storeUpdateSpy).toHaveBeenCalled(); + expect(boardRepo.storeUpdate).not.toHaveBeenCalled(); ws.close(); }); }); @@ -982,6 +1010,7 @@ describe('TldrawWSService', () => { describe('messageHandler', () => { describe('when message is received', () => { const setup = async (messageValues: number[]) => { + boardRepo.getYDocFromMdb.mockResolvedValueOnce(new WsSharedDocDo('TEST')); ws = await TestConnection.setupWs(wsUrl, 'TEST'); const errorLogSpy = jest.spyOn(logger, 'warning'); @@ -1019,6 +1048,23 @@ describe('TldrawWSService', () => { publishSpy.mockRestore(); }); + it('should log error when messageHandler throws', async () => { + const { messageHandlerSpy, msg, errorLogSpy } = await setup([0, 1]); + messageHandlerSpy.mockImplementationOnce(() => { + throw new Error('error'); + }); + + await service.setupWSConnection(ws, 'TEST'); + ws.emit('message', msg); + + await delay(20); + + expect(errorLogSpy).toHaveBeenCalled(); + ws.close(); + messageHandlerSpy.mockRestore(); + errorLogSpy.mockRestore(); + }); + it('should log error when publish to Redis throws', async () => { const { errorLogSpy, publishSpy } = await setup([1, 1]); publishSpy.mockRejectedValueOnce(new Error('error')); @@ -1034,6 +1080,7 @@ describe('TldrawWSService', () => { describe('getYDoc', () => { describe('when getting yDoc by name', () => { it('should assign to service docs map and return instance', async () => { + boardRepo.getYDocFromMdb.mockResolvedValueOnce(new WsSharedDocDo('get-test')); jest.spyOn(Ioredis.Redis.prototype, 'subscribe').mockResolvedValueOnce({}); const docName = 'get-test'; const doc = await service.getYDoc(docName); @@ -1044,9 +1091,13 @@ describe('TldrawWSService', () => { describe('when subscribing to redis channel', () => { const setup = () => { + boardRepo.getYDocFromMdb.mockResolvedValueOnce(new WsSharedDocDo('test-redis')); + const doc = new WsSharedDocDo('test-redis'); + const redisSubscribeSpy = jest.spyOn(Ioredis.Redis.prototype, 'subscribe').mockResolvedValueOnce(1); const redisOnSpy = jest.spyOn(Ioredis.Redis.prototype, 'on'); const errorLogSpy = jest.spyOn(logger, 'warning'); + boardRepo.getYDocFromMdb.mockResolvedValueOnce(doc); return { redisOnSpy, @@ -1055,15 +1106,15 @@ describe('TldrawWSService', () => { }; }; - it('should register new listener', () => { + it('should register new listener', async () => { const { redisOnSpy, redisSubscribeSpy } = setup(); - const doc = service.getYDoc('test-redis'); + const doc = await service.getYDoc('test-redis'); expect(doc).toBeDefined(); expect(redisOnSpy).toHaveBeenCalled(); redisSubscribeSpy.mockRestore(); - redisSubscribeSpy.mockRestore(); + redisOnSpy.mockRestore(); }); }); @@ -1104,6 +1155,7 @@ describe('TldrawWSService', () => { describe('when subscribing to redis channel throws error', () => { const setup = () => { + boardRepo.getYDocFromMdb.mockResolvedValueOnce(new WsSharedDocDo('test-redis-fail-2')); const redisSubscribeSpy = jest .spyOn(Ioredis.Redis.prototype, 'subscribe') .mockRejectedValue(new Error('error')); diff --git a/apps/server/src/modules/tldraw/service/tldraw.ws.service.ts b/apps/server/src/modules/tldraw/service/tldraw.ws.service.ts index 903a2226573..cc11442842a 100644 --- a/apps/server/src/modules/tldraw/service/tldraw.ws.service.ts +++ b/apps/server/src/modules/tldraw/service/tldraw.ws.service.ts @@ -12,6 +12,7 @@ import { YMap } from 'yjs/dist/src/types/YMap'; import { TldrawRedisFactory } from '../redis'; import { CloseConnectionLoggable, + FileStorageErrorLoggable, RedisPublishErrorLoggable, WebsocketErrorLoggable, WebsocketMessageErrorLoggable, @@ -23,7 +24,7 @@ import { RedisConnectionTypeEnum, TldrawAsset, TldrawShape, - WSConnectionState, + UpdateOrigin, WSMessageType, } from '../types'; import { WsSharedDocDo } from '../domain'; @@ -37,8 +38,6 @@ export class TldrawWsService { private readonly pingTimeout: number; - private readonly gcEnabled: boolean; - public readonly sub: Redis; private readonly pub: Redis; @@ -53,7 +52,6 @@ export class TldrawWsService { ) { this.logger.setContext(TldrawWsService.name); this.pingTimeout = this.configService.get('TLDRAW_PING_TIMEOUT'); - this.gcEnabled = this.configService.get('TLDRAW_GC_ENABLED'); this.sub = this.tldrawRedisFactory.build(RedisConnectionTypeEnum.SUBSCRIBE); this.pub = this.tldrawRedisFactory.build(RedisConnectionTypeEnum.PUBLISH); @@ -61,9 +59,10 @@ export class TldrawWsService { public async closeConn(doc: WsSharedDocDo, ws: WebSocket): Promise { if (doc.connections.has(ws)) { - const controlledIds = doc.connections.get(ws) as Set; + const controlledIds = doc.connections.get(ws); doc.connections.delete(ws); - removeAwarenessStates(doc.awareness, Array.from(controlledIds), null); + removeAwarenessStates(doc.awareness, this.forceToArray(controlledIds), null); + await this.storeStateAndDestroyYDocIfPersisted(doc); this.metricsService.decrementNumberOfUsersOnServerCounter(); } @@ -72,31 +71,34 @@ export class TldrawWsService { } public send(doc: WsSharedDocDo, conn: WebSocket, message: Uint8Array): void { - if (conn.readyState !== WSConnectionState.CONNECTING && conn.readyState !== WSConnectionState.OPEN) { + if (this.isClosedOrClosing(conn)) { this.closeConn(doc, conn).catch((err) => { - this.logger.warning(new CloseConnectionLoggable(err)); + this.logger.warning(new CloseConnectionLoggable('send | isClosedOrClosing', err)); }); } conn.send(message, (err) => { if (err) { this.closeConn(doc, conn).catch((e) => { - this.logger.warning(new CloseConnectionLoggable(e)); + this.logger.warning(new CloseConnectionLoggable('send', e)); }); } }); } public updateHandler(update: Uint8Array, origin, doc: WsSharedDocDo): void { - const isOriginWSConn = doc.connections.has(origin as WebSocket); - if (isOriginWSConn) { + if (this.isFromConnectedWebSocket(doc, origin)) { this.publishUpdateToRedis(doc, update, 'document'); } this.propagateUpdate(update, doc); } - public async databaseUpdateHandler(docName: string, update: Uint8Array) { + public async databaseUpdateHandler(docName: string, update: Uint8Array, origin) { + if (this.isFromRedis(origin)) { + return; + } + await this.tldrawBoardRepo.storeUpdate(docName, update); } @@ -112,12 +114,11 @@ export class TldrawWsService { public async getYDoc(docName: string) { const wsSharedDocDo = await map.setIfUndefined(this.docs, docName, async () => { - const doc = new WsSharedDocDo(docName, this.gcEnabled); + const doc = await this.tldrawBoardRepo.getYDocFromMdb(docName); + this.registerAwarenessUpdateHandler(doc); this.registerUpdateHandler(doc); this.subscribeToRedisChannels(doc); - - await this.updateDocument(docName, doc); this.registerDatabaseUpdateHandler(doc); this.docs.set(docName, doc); @@ -133,45 +134,53 @@ export class TldrawWsService { } public messageHandler(conn: WebSocket, doc: WsSharedDocDo, message: Uint8Array): void { - try { - const encoder = encoding.createEncoder(); - const decoder = decoding.createDecoder(message); - const messageType = decoding.readVarUint(decoder); - switch (messageType) { - case WSMessageType.SYNC: - encoding.writeVarUint(encoder, WSMessageType.SYNC); - readSyncMessage(decoder, encoder, doc, conn); - - // If the `encoder` only contains the type of reply message and no - // message, there is no need to send the message. When `encoder` only - // contains the type of reply, its length is 1. - if (encoding.length(encoder) > 1) { - this.send(doc, conn, encoding.toUint8Array(encoder)); - } - break; - case WSMessageType.AWARENESS: { - const update = decoding.readVarUint8Array(decoder); - this.publishUpdateToRedis(doc, update, 'awareness'); - break; - } - default: - break; + const encoder = encoding.createEncoder(); + const decoder = decoding.createDecoder(message); + const messageType = decoding.readVarUint(decoder); + switch (messageType) { + case WSMessageType.SYNC: + this.handleSyncMessage(doc, encoder, decoder, conn); + break; + case WSMessageType.AWARENESS: { + this.handleAwarenessMessage(doc, decoder); + break; } - } catch (err) { - this.logger.warning(new WebsocketMessageErrorLoggable(err)); - throw err; + default: + break; } } + private handleSyncMessage( + doc: WsSharedDocDo, + encoder: encoding.Encoder, + decoder: decoding.Decoder, + conn: WebSocket + ): void { + encoding.writeVarUint(encoder, WSMessageType.SYNC); + readSyncMessage(decoder, encoder, doc, conn); + + // If the `encoder` only contains the type of reply message and no + // message, there is no need to send the message. When `encoder` only + // contains the type of reply, its length is 1. + if (encoding.length(encoder) > 1) { + this.send(doc, conn, encoding.toUint8Array(encoder)); + } + } + + private handleAwarenessMessage(doc: WsSharedDocDo, decoder: decoding.Decoder) { + const update = decoding.readVarUint8Array(decoder); + this.publishUpdateToRedis(doc, update, 'awareness'); + } + public redisMessageHandler = (channel: Buffer, update: Buffer, doc: WsSharedDocDo): void => { const channelId = channel.toString(); if (channelId === doc.name) { - applyUpdate(doc, update, this.sub); + applyUpdate(doc, update, UpdateOrigin.REDIS); } if (channelId === doc.awarenessChannel) { - applyAwarenessUpdate(doc.awareness, update, this.sub); + applyAwarenessUpdate(doc.awareness, update, UpdateOrigin.REDIS); } }; @@ -187,7 +196,11 @@ export class TldrawWsService { }); ws.on('message', (message: ArrayBufferLike) => { - this.messageHandler(ws, doc, new Uint8Array(message)); + try { + this.messageHandler(ws, doc, new Uint8Array(message)); + } catch (err) { + this.logger.warning(new WebsocketMessageErrorLoggable(err)); + } }); // send initial doc state to client as update @@ -203,14 +216,14 @@ export class TldrawWsService { } this.closeConn(doc, ws).catch((err) => { - this.logger.warning(new CloseConnectionLoggable(err)); + this.logger.warning(new CloseConnectionLoggable('pingInterval', err)); }); clearInterval(pingInterval); }, this.pingTimeout); ws.on('close', () => { this.closeConn(doc, ws).catch((err) => { - this.logger.warning(new CloseConnectionLoggable(err)); + this.logger.warning(new CloseConnectionLoggable('websocket close', err)); }); clearInterval(pingInterval); }); @@ -244,9 +257,13 @@ export class TldrawWsService { // if persisted, we store state and destroy yDoc try { const usedAssets = this.syncDocumentAssetsWithShapes(doc); - await this.filesStorageTldrawAdapterService.deleteUnusedFilesForDocument(doc.name, usedAssets); - await this.tldrawBoardRepo.flushDocument(doc.name); + + await this.tldrawBoardRepo.compressDocument(doc.name); this.unsubscribeFromRedisChannels(doc); + + void this.filesStorageTldrawAdapterService.deleteUnusedFilesForDocument(doc.name, usedAssets).catch((err) => { + this.logger.warning(new FileStorageErrorLoggable(doc.name, err)); + }); doc.destroy(); } catch (err) { this.logger.warning(new WsSharedDocErrorLoggable(doc.name, 'Error while flushing doc', err)); @@ -342,7 +359,7 @@ export class TldrawWsService { } private registerDatabaseUpdateHandler(doc: WsSharedDocDo) { - doc.on('update', (update: Uint8Array) => this.databaseUpdateHandler(doc.name, update)); + doc.on('update', (update: Uint8Array, origin) => this.databaseUpdateHandler(doc.name, update, origin)); } private subscribeToRedisChannels(doc: WsSharedDocDo) { @@ -374,15 +391,6 @@ export class TldrawWsService { }); } - private async updateDocument(docName: string, doc: WsSharedDocDo) { - try { - await this.tldrawBoardRepo.updateDocument(docName, doc); - } catch (err) { - this.logger.warning(new WsSharedDocErrorLoggable(doc.name, 'Error while updating document', err)); - throw err; - } - } - private publishUpdateToRedis(doc: WsSharedDocDo, update: Uint8Array, type: 'awareness' | 'document') { const channel = type === 'awareness' ? doc.awarenessChannel : doc.name; this.pub @@ -402,4 +410,20 @@ export class TldrawWsService { writeUpdate(encoder, encodeStateAsUpdate(doc)); this.send(doc, ws, encoding.toUint8Array(encoder)); } + + private isClosedOrClosing(connection: WebSocket): boolean { + return connection.readyState === WebSocket.CLOSING || connection.readyState === WebSocket.CLOSED; + } + + private forceToArray(connections: Set | undefined): number[] { + return connections ? Array.from(connections) : []; + } + + private isFromConnectedWebSocket(doc: WsSharedDocDo, origin: unknown) { + return origin instanceof WebSocket && doc.connections.has(origin); + } + + private isFromRedis(origin: unknown): boolean { + return typeof origin === 'string' && origin === UpdateOrigin.REDIS; + } } diff --git a/apps/server/src/modules/tldraw/testing/testConfig.ts b/apps/server/src/modules/tldraw/testing/testConfig.ts index fb345fcba1c..e3557bfbaf5 100644 --- a/apps/server/src/modules/tldraw/testing/testConfig.ts +++ b/apps/server/src/modules/tldraw/testing/testConfig.ts @@ -5,7 +5,7 @@ export const tldrawTestConfig = () => { if (!conf.REDIS_URI) { conf.REDIS_URI = 'redis://127.0.0.1:6379'; } - conf.TLDRAW_DB_FLUSH_SIZE = 2; + conf.TLDRAW_DB_COMPRESS_THRESHOLD = 2; conf.TLDRAW_PING_TIMEOUT = 0; conf.TLDRAW_MAX_DOCUMENT_SIZE = 3; return conf; diff --git a/apps/server/src/modules/tldraw/tldraw.module.ts b/apps/server/src/modules/tldraw/tldraw.module.ts index 588afe8eb60..db9320f50d2 100644 --- a/apps/server/src/modules/tldraw/tldraw.module.ts +++ b/apps/server/src/modules/tldraw/tldraw.module.ts @@ -1,13 +1,11 @@ import { Module, NotFoundException } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import { createConfigModuleOptions, DB_PASSWORD, DB_USERNAME } from '@src/config'; -import { CoreModule } from '@src/core'; import { LoggerModule } from '@src/core/logger'; import { MikroOrmModule, MikroOrmModuleSyncOptions } from '@mikro-orm/nestjs'; import { AuthenticationModule } from '@src/modules/authentication/authentication.module'; -import { RabbitMQWrapperModule } from '@infra/rabbitmq'; import { Dictionary, IPrimaryKey } from '@mikro-orm/core'; -import { AuthorizationModule } from '@modules/authorization'; +import { CoreModule } from '@src/core'; import { config, TLDRAW_DB_URL } from './config'; import { TldrawDrawing } from './entities'; import { TldrawController } from './controller'; @@ -23,10 +21,8 @@ const defaultMikroOrmOptions: MikroOrmModuleSyncOptions = { @Module({ imports: [ LoggerModule, - AuthorizationModule, AuthenticationModule, CoreModule, - RabbitMQWrapperModule, MikroOrmModule.forRoot({ ...defaultMikroOrmOptions, type: 'mongo', diff --git a/apps/server/src/modules/tldraw/types/connection-enum.ts b/apps/server/src/modules/tldraw/types/connection-enum.ts index 6a9a4692e03..c8c0cfdd2c3 100644 --- a/apps/server/src/modules/tldraw/types/connection-enum.ts +++ b/apps/server/src/modules/tldraw/types/connection-enum.ts @@ -1,8 +1,3 @@ -export enum WSConnectionState { - CONNECTING = 0, - OPEN = 1, -} - export enum WSMessageType { SYNC = 0, AWARENESS = 1, diff --git a/apps/server/src/modules/tldraw/types/index.ts b/apps/server/src/modules/tldraw/types/index.ts index 00c68e500f7..ed5a4a4b6b5 100644 --- a/apps/server/src/modules/tldraw/types/index.ts +++ b/apps/server/src/modules/tldraw/types/index.ts @@ -4,3 +4,4 @@ export * from './y-transaction-type'; export * from './ws-close-enum'; export * from './awareness-connections-update-type'; export * from './redis-connection-type.enum'; +export * from './update-origin-enum'; diff --git a/apps/server/src/modules/tldraw/types/update-origin-enum.ts b/apps/server/src/modules/tldraw/types/update-origin-enum.ts new file mode 100644 index 00000000000..9b2cc0aa505 --- /dev/null +++ b/apps/server/src/modules/tldraw/types/update-origin-enum.ts @@ -0,0 +1,3 @@ +export enum UpdateOrigin { + REDIS = 'redis', +} diff --git a/apps/server/src/modules/tldraw/utils/index.ts b/apps/server/src/modules/tldraw/utils/index.ts deleted file mode 100644 index a51b9059bc1..00000000000 --- a/apps/server/src/modules/tldraw/utils/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from './ydoc-utils'; diff --git a/apps/server/src/modules/tldraw/utils/ydoc-utils.ts b/apps/server/src/modules/tldraw/utils/ydoc-utils.ts deleted file mode 100644 index 6d0817ecc9d..00000000000 --- a/apps/server/src/modules/tldraw/utils/ydoc-utils.ts +++ /dev/null @@ -1,2 +0,0 @@ -export const calculateDiff = (diff: Uint8Array): number => - diff.reduce((previousValue, currentValue) => previousValue + currentValue, 0); diff --git a/config/default.schema.json b/config/default.schema.json index ffadc78f5df..12633cc568c 100644 --- a/config/default.schema.json +++ b/config/default.schema.json @@ -1490,7 +1490,7 @@ "PING_TIMEOUT", "SOCKET_PORT", "GC_ENABLED", - "DB_FLUSH_SIZE", + "DB_COMPRESS_THRESHOLD", "MAX_DOCUMENT_SIZE", "ASSETS_ENABLED", "ASSETS_MAX_SIZE", @@ -1509,9 +1509,9 @@ "type": "boolean", "description": "If tldraw garbage collector should be enabled" }, - "DB_FLUSH_SIZE": { + "DB_COMPRESS_THRESHOLD": { "type": "integer", - "description": "DB collection flushing size" + "description": "Mongo documents with same docName compress threshold size" }, "MAX_DOCUMENT_SIZE": { "type": "number", @@ -1535,7 +1535,7 @@ "SOCKET_PORT": 3345, "PING_TIMEOUT": 30000, "GC_ENABLED": true, - "DB_FLUSH_SIZE": 400, + "DB_COMPRESS_THRESHOLD": 400, "MAX_DOCUMENT_SIZE": 15000000, "ASSETS_ENABLED": true, "ASSETS_MAX_SIZE": 10485760, diff --git a/config/test.json b/config/test.json index eb4d6715824..99c4c310e8e 100644 --- a/config/test.json +++ b/config/test.json @@ -70,7 +70,7 @@ "SOCKET_PORT": 3346, "PING_TIMEOUT": 1, "GC_ENABLED": true, - "DB_FLUSH_SIZE": 400, + "DB_COMPRESS_THRESHOLD": 400, "MAX_DOCUMENT_SIZE": 15000000, "ASSETS_ENABLED": true, "ASSETS_MAX_SIZE": 25000000,