From ede8207a090e34e2d5aad498ef65521d97ab0d53 Mon Sep 17 00:00:00 2001 From: Hoang Pham Date: Fri, 1 Nov 2024 17:22:02 +0700 Subject: [PATCH] Data management improvements Signed-off-by: Hoang Pham --- .gitignore | 1 + websocket_server/ApiService.js | 8 +- websocket_server/BackupManager.js | 233 +++++++++++++++++++ websocket_server/LRUCacheStrategy.js | 5 +- websocket_server/RoomDataManager.js | 319 ++++++++++++++++++++++++--- websocket_server/ServerManager.js | 4 +- websocket_server/SocketManager.js | 7 +- websocket_server/Utils.js | 22 ++ 8 files changed, 563 insertions(+), 36 deletions(-) create mode 100644 websocket_server/BackupManager.js diff --git a/.gitignore b/.gitignore index 4416880..1379b05 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ /css/ /vendor/ /node_modules/ +/backup/ .php-cs-fixer.cache .phpunit.result.cache diff --git a/websocket_server/ApiService.js b/websocket_server/ApiService.js index ce8c039..fc40697 100644 --- a/websocket_server/ApiService.js +++ b/websocket_server/ApiService.js @@ -59,7 +59,13 @@ export default class ApiService { console.log(`[${roomID}] Saving room data to server: ${roomData.length} elements, ${Object.keys(files).length} files`) const url = `${this.NEXTCLOUD_URL}/index.php/apps/whiteboard/${roomID}` - const body = { data: { elements: roomData, files: this.cleanupFiles(roomData, files) } } + const body = { + data: { + elements: roomData, + files: this.cleanupFiles(roomData, files), + savedAt: Date.now(), + }, + } const options = this.fetchOptions('PUT', null, body, roomID, lastEditedUser) return this.fetchData(url, options) } diff --git a/websocket_server/BackupManager.js b/websocket_server/BackupManager.js new file mode 100644 index 0000000..880dc15 --- /dev/null +++ b/websocket_server/BackupManager.js @@ -0,0 +1,233 @@ +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +/* eslint-disable no-console */ + +import fs from 'fs/promises' +import path from 'path' +import crypto from 'crypto' +import zlib from 'zlib' +import { promisify } from 'util' + +const gzip = promisify(zlib.gzip) +const gunzip = promisify(zlib.gunzip) + +export default class BackupManager { + + constructor(options = {}) { + const { backupDir = './backup', maxBackupsPerRoom = 5 } = options + this.backupDir = backupDir + this.maxBackupsPerRoom = maxBackupsPerRoom + this.locks = new Map() + this.lockTimeout = options.lockTimeout || 5000 // 5 seconds + this.lockRetryInterval = options.lockRetryInterval || 50 // 50ms + this.init() + } + + async init() { + try { + await fs.mkdir(this.backupDir, { recursive: true }) + await this.cleanupTemporaryFiles() + } catch (error) { + console.error('Failed to initialize BackupManager:', error) + throw error + } + } + + async cleanupTemporaryFiles() { + try { + const files = await fs.readdir(this.backupDir) + const tmpFiles = files.filter((f) => f.endsWith('.tmp')) + await Promise.all( + tmpFiles.map((file) => + fs + .unlink(path.join(this.backupDir, file)) + .catch(console.error), + ), + ) + } catch (error) { + console.error('Failed to cleanup temporary files:', error) + } + } + + async acquireLock(roomId) { + const startTime = Date.now() + while (this.locks.get(roomId)) { + if (Date.now() - startTime > this.lockTimeout) { + throw new Error(`Lock acquisition timeout for room ${roomId}`) + } + await new Promise((resolve) => + setTimeout(resolve, this.lockRetryInterval), + ) + } + this.locks.set(roomId, Date.now()) + } + + async releaseLock(roomId) { + this.locks.delete(roomId) + } + + sanitizeRoomId(roomId) { + return roomId.replace(/[^a-zA-Z0-9-_]/g, '_') + } + + calculateChecksum(data) { + return crypto + .createHash('sha256') + .update(typeof data === 'string' ? data : JSON.stringify(data)) + .digest('hex') + } + + async createBackup(roomId, data) { + if (!roomId || !data) { + throw new Error('Invalid backup parameters') + } + + const sanitizedRoomId = this.sanitizeRoomId(roomId) + + try { + await this.acquireLock(sanitizedRoomId) + + const backupData = this.prepareBackupData(sanitizedRoomId, data) + await this.writeBackupFile(sanitizedRoomId, backupData) + await this.cleanupOldBackups(sanitizedRoomId) + + return backupData.id + } finally { + await this.releaseLock(sanitizedRoomId) + } + } + + prepareBackupData(roomId, data) { + return { + id: crypto.randomUUID(), + timestamp: Date.now(), + roomId, + checksum: this.calculateChecksum(data), + data, + savedAt: data.savedAt || Date.now(), + } + } + + async writeBackupFile(roomId, backupData) { + const backupFile = path.join( + this.backupDir, + `${roomId}_${backupData.timestamp}.bak`, + ) + const tempFile = `${backupFile}.tmp` + + const compressed = await gzip(JSON.stringify(backupData)) + await fs.writeFile(tempFile, compressed) + await fs.rename(tempFile, backupFile) + } + + async getLatestBackup(roomId) { + const sanitizedRoomId = this.sanitizeRoomId(roomId) + const files = await fs.readdir(this.backupDir) + const roomBackups = files + .filter( + (f) => + f.startsWith(`${sanitizedRoomId}_`) && f.endsWith('.bak'), + ) + .sort() + .reverse() + + if (roomBackups.length === 0) return null + + try { + const compressed = await fs.readFile( + path.join(this.backupDir, roomBackups[0]), + ) + const decompressed = await gunzip(compressed) + const backup = JSON.parse(decompressed.toString()) + + // Verify checksum + const calculatedChecksum = this.calculateChecksum(backup.data) + if (calculatedChecksum !== backup.checksum) { + throw new Error('Backup data corruption detected') + } + + return backup + } catch (error) { + console.error( + `Failed to read latest backup for room ${sanitizedRoomId}:`, + error, + ) + throw error + } + } + + async cleanupOldBackups(roomId) { + const sanitizedRoomId = this.sanitizeRoomId(roomId) + + try { + const files = await fs.readdir(this.backupDir) + const roomBackups = files + .filter( + (f) => + f.startsWith(`${sanitizedRoomId}_`) + && f.endsWith('.bak'), + ) + .sort() + .reverse() + + if (roomBackups.length <= this.maxBackupsPerRoom) { + return + } + + const filesToDelete = roomBackups.slice(this.maxBackupsPerRoom) + await Promise.all( + filesToDelete.map((file) => + fs + .unlink(path.join(this.backupDir, file)) + .catch((error) => { + console.error( + `Failed to delete backup ${file}:`, + error, + ) + // Don't throw to continue with other deletions + }), + ), + ) + } catch (error) { + console.error(`Failed to cleanup old backups for ${roomId}:`, error) + // Don't throw as this is a non-critical operation + } + } + + async getAllBackups(roomId) { + const sanitizedRoomId = this.sanitizeRoomId(roomId) + const files = await fs.readdir(this.backupDir) + return files + .filter( + (f) => + f.startsWith(`${sanitizedRoomId}_`) && f.endsWith('.bak'), + ) + .sort() + .reverse() + } + + async recoverFromBackup(roomId) { + const backup = await this.getLatestBackup(roomId) + if (!backup) { + console.log(`No backup found for room ${roomId}`) + return null + } + return backup.data + } + + async isDataFresher(roomId, serverData) { + const latestBackup = await this.getLatestBackup(roomId) + + if (!latestBackup) return true + + // Simply compare savedAt timestamps + const serverTimestamp = serverData?.savedAt || 0 + const backupTimestamp = latestBackup.savedAt || 0 + + return serverTimestamp >= backupTimestamp + } + +} diff --git a/websocket_server/LRUCacheStrategy.js b/websocket_server/LRUCacheStrategy.js index 324d8a9..73d6610 100644 --- a/websocket_server/LRUCacheStrategy.js +++ b/websocket_server/LRUCacheStrategy.js @@ -20,6 +20,7 @@ export default class LRUCacheStrategy extends StorageStrategy { ttlAutopurge: true, dispose: async (value, key) => { console.log(`[${key}] Disposing room`) + if (value?.data && value?.lastEditedUser) { try { await this.apiService.saveRoomDataToServer( @@ -53,7 +54,9 @@ export default class LRUCacheStrategy extends StorageStrategy { } getRooms() { - const rooms = Array.from(this.cache.values()).filter((room) => room instanceof Room) + const rooms = Array.from(this.cache.values()).filter( + (room) => room instanceof Room, + ) return rooms } diff --git a/websocket_server/RoomDataManager.js b/websocket_server/RoomDataManager.js index ff602c9..67266b8 100644 --- a/websocket_server/RoomDataManager.js +++ b/websocket_server/RoomDataManager.js @@ -1,67 +1,322 @@ -/* eslint-disable no-console */ - /** * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors * SPDX-License-Identifier: AGPL-3.0-or-later */ import Room from './Room.js' +import Utils from './Utils.js' +import ApiService from './ApiService.js' +import BackupManager from './BackupManager.js' +import StorageManager from './StorageManager.js' + +/** + * @typedef {object} RoomData + * @property {Array} [elements] - Array of room elements + * @property {object} [files] - Object containing file information + * @property {number} [savedAt] - Timestamp of when the data was saved + */ + +/** + * @typedef {{ + * inputData: RoomData, + * currentData: RoomData, + * jwtToken: string, + * }} SyncOptions + */ +/** + * Manages room data synchronization, backup, and storage operations + * @class + */ export default class RoomDataManager { - constructor(storageManager, apiService) { + /** + * Default configuration for room data + * @static + * @readonly + */ + static CONFIG = Object.freeze({ + defaultData: { + elements: [], + files: {}, + }, + }) + + /** + * @param {StorageManager} storageManager - Manager for room storage operations + * @param {ApiService} apiService - Service for API communications + * @param {BackupManager} backupManager - Manager for backup operations + */ + constructor(storageManager, apiService, backupManager) { this.storageManager = storageManager this.apiService = apiService + this.backupManager = backupManager } + /** + * Synchronizes room data across different sources + * @param {string} roomId - Unique identifier for the room + * @param {RoomData} data - Room data to synchronize + * @param {Array} users - Array of users in the room + * @param {object} lastEditedUser - User who last edited the room + * @param {string} jwtToken - JWT token for authentication + * @return {Promise} Updated room instance or null if empty + * @throws {Error} When synchronization fails + */ async syncRoomData(roomId, data, users, lastEditedUser, jwtToken) { - console.log(`[${roomId}] Syncing room data`) + Utils.logOperation(roomId, 'Starting sync', { + hasInputData: !!data, + hasToken: !!jwtToken, + }) + + try { + const room = await this.getOrCreateRoom(roomId) + const updatedData = await this.determineLatestData(roomId, { + inputData: data, + currentData: room.data, + jwtToken, + }) + + if (updatedData) { + await this.updateRoomWithData( + room, + updatedData, + users, + lastEditedUser, + ) + return room.isEmpty() + ? await this.handleEmptyRoom(roomId) + : room + } - let room = await this.storageManager.get(roomId) + return room.isEmpty() ? null : room + } catch (error) { + Utils.logError(roomId, 'Room sync failed', error) + throw error + } + } + + /** + * Determines the most recent data from available sources + * @param {string} roomId - Room identifier + * @param {SyncOptions} options - Sync options containing input, current data and token + * @return {Promise} Most recent room data + */ + async determineLatestData(roomId, { inputData, currentData, jwtToken }) { + Utils.logOperation(roomId, 'Determining latest data', { + hasInputData: !!inputData, + hasCurrentData: !!currentData, + hasToken: !!jwtToken, + }) - if (!room) { - room = new Room(roomId) - await this.storageManager.set(roomId, room) + if (inputData) { + Utils.logOperation(roomId, 'Using input data') + return this.normalizeRoomData(inputData) } - if (!data && !room.data) { - data = await this.fetchRoomDataFromServer(roomId, jwtToken) + if (jwtToken) { + return await this.fetchRoomData(roomId, jwtToken) } - const files = data?.files - const elements = data?.elements ?? data - if (elements) room.setData(elements) - if (lastEditedUser) room.updateLastEditedUser(lastEditedUser) - if (users) room.setUsers(users) - if (files) room.setFiles(files) + if (currentData) { + Utils.logOperation(roomId, 'Using current room data') + return this.normalizeRoomData(currentData) + } + + return RoomDataManager.CONFIG.defaultData + } + + /** + * Normalizes room data to ensure consistent format + * @param {*} data - Raw room data to normalize + * @return {RoomData} Normalized room data + */ + normalizeRoomData(data) { + if (!data) { + return RoomDataManager.CONFIG.defaultData + } + + const normalized = { + elements: [], + files: {}, + savedAt: Date.now(), + } - await this.storageManager.set(roomId, room) + if (Array.isArray(data)) { + normalized.elements = [...data] + } else if (typeof data === 'object') { + normalized.elements = Array.isArray(data.elements) + ? [...data.elements] + : data.elements + ? Object.values(data.elements) + : [] + normalized.files = { ...(data.files || {}) } + normalized.savedAt = data.savedAt || Date.now() + } + + return normalized + } - console.log(`[${roomId}] Room data synced. Users: ${room.users.size}, Last edited by: ${room.lastEditedUser}, files: ${Object.keys(room.files).length}`) + /** + * Updates room with new data and creates backup + * @param {Room} room - Room instance to update + * @param {RoomData} data - New room data + * @param {Array} users - Updated user list + * @param {object} lastEditedUser - User who last edited + * @return {Promise} + */ + async updateRoomWithData(room, data, users, lastEditedUser) { + await this.updateRoom(room, data, users, lastEditedUser) + await this.storageManager.set(room.id, room) + this.createRoomBackup(room.id, room) + } + + /** + * Updates room properties with new data + * @param {Room} room - Room instance to update + * @param {RoomData} data - New room data + * @param {Array} users - Updated user list + * @param {object} lastEditedUser - User who last edited + * @return {Promise} + */ + async updateRoom(room, data, users, lastEditedUser) { + if (data.elements) room.setData(data.elements) + if (data.files) room.setFiles(data.files) + if (users) room.setUsers(users) + if (lastEditedUser) room.updateLastEditedUser(lastEditedUser) - if (room.isEmpty()) { - await this.storageManager.delete(roomId) - console.log(`[${roomId}] Room is empty, removed from cache`) - return null + Utils.logOperation(room.id, 'Room updated', { + elementsCount: room.data?.length || 0, + filesCount: Object.keys(room.files || {}).length, + }) + } + + /** + * Creates a backup of room data + * @param {string} roomId - Room identifier + * @param {Room} room - Room instance to backup + * @return {Promise} + */ + async createRoomBackup(roomId, room) { + const backupData = { + elements: Array.isArray(room.data) ? [...room.data] : [], + files: { ...room.files }, + savedAt: Date.now(), } - return room + try { + await this.backupManager.createBackup(roomId, backupData) + Utils.logOperation(roomId, 'Backup created', { + elementsCount: backupData.elements.length, + }) + } catch (error) { + Utils.logError(roomId, 'Backup creation failed', error) + } } - async fetchRoomDataFromServer(roomId, jwtToken) { - console.log(`[${roomId}] No data provided or existing, fetching from server...`) + /** + * Fetches and validates room data from server + * @param {string} roomId - Room identifier + * @param {string} jwtToken - JWT token for authentication + * @return {Promise} Room data or null if fetch fails + */ + async fetchRoomData(roomId, jwtToken) { + Utils.logOperation(roomId, 'Fetching server data') + try { - const result = await this.apiService.getRoomDataFromServer(roomId, jwtToken) - console.log(`[${roomId}] Fetched data from server: \n`, result) - return result?.data || { elements: [], files: {} } + const result = await this.apiService.getRoomDataFromServer( + roomId, + jwtToken, + ) + + if (!this.isValidServerData(result)) { + Utils.logOperation( + roomId, + 'Server data is invalid, recovering from backup', + ) + return await this.tryRecoverFromBackup(roomId) + } + + const serverData = result.data + const backupData = await this.tryRecoverFromBackup(roomId) + + if ( + backupData + && (await this.backupManager.isDataFresher(roomId, serverData)) + ) { + Utils.logOperation( + roomId, + 'Server data is fresher than backup, using server data', + ) + return this.normalizeRoomData(serverData) + } + + Utils.logOperation( + roomId, + 'Server data is older than backup, using backup data', + ) + return backupData + ? this.normalizeRoomData(backupData) + : this.normalizeRoomData(serverData) } catch (error) { - console.error(`[${roomId}] Failed to fetch data from server:`, error) - return { elements: [], files: {} } + Utils.logError(roomId, 'Server fetch failed, using backup', error) + return await this.tryRecoverFromBackup(roomId) } } - async removeAllRoomData() { - await this.storageManager.clear() + /** + * Retrieves existing room or creates new one + * @param {string} roomId - Room identifier + * @return {Promise} Room instance + */ + async getOrCreateRoom(roomId) { + return (await this.storageManager.get(roomId)) || new Room(roomId) + } + + /** + * Validates server response data structure + * @param {object} result - Server response + * @return {boolean} Whether data is valid + */ + isValidServerData(result) { + return ( + result?.data + && (Array.isArray(result.data.elements) + || typeof result.data.elements === 'object') + ) + } + + /** + * Attempts to recover room data from backup + * @param {string} roomId - Room identifier + * @return {Promise} Recovered data or null + */ + async tryRecoverFromBackup(roomId) { + const backupData = await this.backupManager.recoverFromBackup(roomId) + if (backupData) { + Utils.logOperation(roomId, 'Recovered from backup') + } + return backupData + } + + /** + * Handles empty room cleanup + * @param {string} roomId - Room identifier + * @return {Promise} + */ + async handleEmptyRoom(roomId) { + await this.cleanupEmptyRoom(roomId) + return null + } + + /** + * Removes empty room from storage + * @param {string} roomId - Room identifier + * @return {Promise} + */ + async cleanupEmptyRoom(roomId) { + await this.storageManager.delete(roomId) + Utils.logOperation(roomId, 'Empty room removed from cache') } } diff --git a/websocket_server/ServerManager.js b/websocket_server/ServerManager.js index 4068bbb..20c008f 100644 --- a/websocket_server/ServerManager.js +++ b/websocket_server/ServerManager.js @@ -16,6 +16,7 @@ import RoomDataManager from './RoomDataManager.js' import AppManager from './AppManager.js' import SocketManager from './SocketManager.js' import Utils from './Utils.js' +import BackupManager from './BackupManager.js' export default class ServerManager { @@ -24,8 +25,9 @@ export default class ServerManager { this.closing = false this.tokenGenerator = new SharedTokenGenerator() this.apiService = new ApiService(this.tokenGenerator) + this.backupManager = new BackupManager({}) this.storageManager = StorageManager.create(this.config.storageStrategy, this.apiService) - this.roomDataManager = new RoomDataManager(this.storageManager, this.apiService) + this.roomDataManager = new RoomDataManager(this.storageManager, this.apiService, this.backupManager) this.appManager = new AppManager(this.storageManager) this.server = this.createConfiguredServer(this.appManager.getApp()) this.socketManager = new SocketManager(this.server, this.roomDataManager, this.storageManager) diff --git a/websocket_server/SocketManager.js b/websocket_server/SocketManager.js index de45e32..4f19c73 100644 --- a/websocket_server/SocketManager.js +++ b/websocket_server/SocketManager.js @@ -201,8 +201,13 @@ export default class SocketManager { socket.broadcast.to(roomID).emit('client-broadcast', encryptedData, iv) + // Process the room data update asynchronously + this.processRoomDataUpdate(roomID, encryptedData, socket.id) + } + + async processRoomDataUpdate(roomID, encryptedData, socketId) { const decryptedData = JSON.parse(Utils.convertArrayBufferToString(encryptedData)) - const socketData = await this.socketDataManager.getSocketData(socket.id) + const socketData = await this.socketDataManager.getSocketData(socketId) if (!socketData) return const userSocketsAndIds = await this.getUserSocketsAndIds(roomID) diff --git a/websocket_server/Utils.js b/websocket_server/Utils.js index ac0d606..d230806 100644 --- a/websocket_server/Utils.js +++ b/websocket_server/Utils.js @@ -3,6 +3,8 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ +/* eslint-disable no-console */ + export default class Utils { static convertStringToArrayBuffer(string) { @@ -17,4 +19,24 @@ export default class Utils { return value === 'true' } + /** + * Logs operation details + * @param {string} roomId - Room identifier + * @param {string} message - Log message + * @param {object} [data] - Additional data to log + */ + static logOperation(roomId, message, data = {}) { + console.log(`[${roomId}] ${message}:`, data) + } + + /** + * Logs error details + * @param {string} roomId - Room identifier + * @param {string} message - Error message + * @param {Error} error - Error object + */ + static logError(roomId, message, error) { + console.error(`[${roomId}] ${message}:`, error) + } + }