Skip to content

Commit

Permalink
Data management improvements
Browse files Browse the repository at this point in the history
Signed-off-by: Hoang Pham <[email protected]>
  • Loading branch information
hweihwang committed Nov 7, 2024
1 parent 3d37460 commit 8bcc170
Show file tree
Hide file tree
Showing 8 changed files with 568 additions and 36 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
/css/
/vendor/
/node_modules/
/backup/

.php-cs-fixer.cache
.phpunit.result.cache
Expand Down
8 changes: 7 additions & 1 deletion websocket_server/ApiService.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ export default class ApiService {
console.log(`[${roomID}] Saving room data to server: ${roomData.length} elements, ${Object.keys(files).length} files`)

const url = `${this.NEXTCLOUD_URL}/index.php/apps/whiteboard/${roomID}`
const body = { data: { elements: roomData, files: this.cleanupFiles(roomData, files) } }
const body = {
data: {
elements: roomData,
files: this.cleanupFiles(roomData, files),
savedAt: Date.now(),
},
}
const options = this.fetchOptions('PUT', null, body, roomID, lastEditedUser)
return this.fetchData(url, options)
}
Expand Down
233 changes: 233 additions & 0 deletions websocket_server/BackupManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/**
* SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

/* eslint-disable no-console */

import fs from 'fs/promises'
import path from 'path'
import crypto from 'crypto'
import zlib from 'zlib'
import { promisify } from 'util'

const gzip = promisify(zlib.gzip)
const gunzip = promisify(zlib.gunzip)

export default class BackupManager {

constructor(options = {}) {
const { backupDir = './backup', maxBackupsPerRoom = 5 } = options
this.backupDir = backupDir
this.maxBackupsPerRoom = maxBackupsPerRoom
this.locks = new Map()
this.lockTimeout = options.lockTimeout || 5000 // 5 seconds
this.lockRetryInterval = options.lockRetryInterval || 50 // 50ms
this.init()
}

async init() {
try {
await fs.mkdir(this.backupDir, { recursive: true })
await this.cleanupTemporaryFiles()
} catch (error) {
console.error('Failed to initialize BackupManager:', error)
throw error
}
}

async cleanupTemporaryFiles() {
try {
const files = await fs.readdir(this.backupDir)
const tmpFiles = files.filter((f) => f.endsWith('.tmp'))
await Promise.all(
tmpFiles.map((file) =>
fs
.unlink(path.join(this.backupDir, file))
.catch(console.error),
),
)
} catch (error) {
console.error('Failed to cleanup temporary files:', error)
}
}

async acquireLock(roomId) {
const startTime = Date.now()
while (this.locks.get(roomId)) {
if (Date.now() - startTime > this.lockTimeout) {
throw new Error(`Lock acquisition timeout for room ${roomId}`)
}
await new Promise((resolve) =>
setTimeout(resolve, this.lockRetryInterval),
)
}
this.locks.set(roomId, Date.now())
}

async releaseLock(roomId) {
this.locks.delete(roomId)
}

sanitizeRoomId(roomId) {
return roomId.replace(/[^a-zA-Z0-9-_]/g, '_')

Check failure on line 73 in websocket_server/BackupManager.js

View workflow job for this annotation

GitHub Actions / test

Unhandled error

TypeError: roomId.replace is not a function ❯ BackupManager.sanitizeRoomId websocket_server/BackupManager.js:73:17 ❯ BackupManager.getLatestBackup websocket_server/BackupManager.js:127:32 ❯ BackupManager.recoverFromBackup websocket_server/BackupManager.js:213:29 ❯ RoomDataManager.tryRecoverFromBackup websocket_server/RoomDataManager.js:300:47 ❯ RoomDataManager.fetchRoomData websocket_server/RoomDataManager.js:259:22 ❯ processTicksAndRejections node:internal/process/task_queues:95:5 ❯ RoomDataManager.determineLatestData websocket_server/RoomDataManager.js:114:11 ❯ RoomDataManager.syncRoomData websocket_server/RoomDataManager.js:70:24 ❯ SocketManager.joinRoomHandler websocket_server/SocketManager.js:173:16 This error originated in "tests/integration/socket.spec.mjs" test file. It doesn't mean the error was thrown inside the file itself, but while it was running. The latest test that might've caused the error is "join room". It might mean one of the following: - The error was thrown, while Vitest was running this test. - If the error occurred after the test had been completed, this was the last documented test before it was thrown.
}

calculateChecksum(data) {
return crypto
.createHash('sha256')
.update(typeof data === 'string' ? data : JSON.stringify(data))
.digest('hex')
}

async createBackup(roomId, data) {
if (!roomId || !data) {
throw new Error('Invalid backup parameters')
}

const sanitizedRoomId = this.sanitizeRoomId(roomId)

try {
await this.acquireLock(sanitizedRoomId)

const backupData = this.prepareBackupData(sanitizedRoomId, data)
await this.writeBackupFile(sanitizedRoomId, backupData)
await this.cleanupOldBackups(sanitizedRoomId)

return backupData.id
} finally {
await this.releaseLock(sanitizedRoomId)
}
}

prepareBackupData(roomId, data) {
return {
id: crypto.randomUUID(),
timestamp: Date.now(),
roomId,
checksum: this.calculateChecksum(data),
data,
savedAt: data.savedAt || Date.now(),
}
}

async writeBackupFile(roomId, backupData) {
const backupFile = path.join(
this.backupDir,
`${roomId}_${backupData.timestamp}.bak`,
)
const tempFile = `${backupFile}.tmp`

const compressed = await gzip(JSON.stringify(backupData))
await fs.writeFile(tempFile, compressed)
await fs.rename(tempFile, backupFile)
}

async getLatestBackup(roomId) {
const sanitizedRoomId = this.sanitizeRoomId(roomId)
const files = await fs.readdir(this.backupDir)
const roomBackups = files
.filter(
(f) =>
f.startsWith(`${sanitizedRoomId}_`) && f.endsWith('.bak'),
)
.sort()
.reverse()

if (roomBackups.length === 0) return null

try {
const compressed = await fs.readFile(
path.join(this.backupDir, roomBackups[0]),
)
const decompressed = await gunzip(compressed)
const backup = JSON.parse(decompressed.toString())

// Verify checksum
const calculatedChecksum = this.calculateChecksum(backup.data)
if (calculatedChecksum !== backup.checksum) {
throw new Error('Backup data corruption detected')
}

return backup
} catch (error) {
console.error(
`Failed to read latest backup for room ${sanitizedRoomId}:`,
error,
)
throw error
}
}

async cleanupOldBackups(roomId) {
const sanitizedRoomId = this.sanitizeRoomId(roomId)

try {
const files = await fs.readdir(this.backupDir)
const roomBackups = files
.filter(
(f) =>
f.startsWith(`${sanitizedRoomId}_`)
&& f.endsWith('.bak'),
)
.sort()
.reverse()

if (roomBackups.length <= this.maxBackupsPerRoom) {
return
}

const filesToDelete = roomBackups.slice(this.maxBackupsPerRoom)
await Promise.all(
filesToDelete.map((file) =>
fs
.unlink(path.join(this.backupDir, file))
.catch((error) => {
console.error(
`Failed to delete backup ${file}:`,
error,
)
// Don't throw to continue with other deletions
}),
),
)
} catch (error) {
console.error(`Failed to cleanup old backups for ${roomId}:`, error)
// Don't throw as this is a non-critical operation
}
}

async getAllBackups(roomId) {
const sanitizedRoomId = this.sanitizeRoomId(roomId)
const files = await fs.readdir(this.backupDir)
return files
.filter(
(f) =>
f.startsWith(`${sanitizedRoomId}_`) && f.endsWith('.bak'),
)
.sort()
.reverse()
}

async recoverFromBackup(roomId) {
const backup = await this.getLatestBackup(roomId)
if (!backup) {
console.log(`No backup found for room ${roomId}`)
return null
}
return backup.data
}

async isDataFresher(roomId, serverData) {
const latestBackup = await this.getLatestBackup(roomId)

if (!latestBackup) return true

// Simply compare savedAt timestamps
const serverTimestamp = serverData?.savedAt || 0
const backupTimestamp = latestBackup.savedAt || 0

return serverTimestamp >= backupTimestamp
}

}
5 changes: 4 additions & 1 deletion websocket_server/LRUCacheStrategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export default class LRUCacheStrategy extends StorageStrategy {
ttlAutopurge: true,
dispose: async (value, key) => {
console.log(`[${key}] Disposing room`)

if (value?.data && value?.lastEditedUser) {
try {
await this.apiService.saveRoomDataToServer(
Expand Down Expand Up @@ -53,7 +54,9 @@ export default class LRUCacheStrategy extends StorageStrategy {
}

getRooms() {
const rooms = Array.from(this.cache.values()).filter((room) => room instanceof Room)
const rooms = Array.from(this.cache.values()).filter(
(room) => room instanceof Room,
)

return rooms
}
Expand Down
Loading

0 comments on commit 8bcc170

Please sign in to comment.