From 9896f03907f2a1c1dc26b785a2a10ea45ca493b1 Mon Sep 17 00:00:00 2001 From: Max Bischof Date: Wed, 18 Sep 2024 15:12:27 +0200 Subject: [PATCH 1/3] Only persist in s3 when doc is not deleted --- src/api.js | 13 ++++++++++++- src/redis/io-redis.js | 14 ++++++++++++++ src/redis/node-redis.js | 13 +++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/src/api.js b/src/api.js index 597f90c..a198141 100644 --- a/src/api.js +++ b/src/api.js @@ -237,6 +237,10 @@ export class Api { const tasks = [] const reclaimedTasks = await this.redis.reclaimTasks(this.consumername, this.redisTaskDebounce, tryClaimCount) + const deletedDocEntries = await this.redis.getDeletedDocEntries() + const deletedDocNames = deletedDocEntries?.map(entry => { + return entry.message.docName + }) reclaimedTasks?.messages.forEach(m => { const stream = m?.message.compact @@ -252,6 +256,10 @@ export class Api { const streamlen = await this.redis.tryClearTask(task) if (streamlen === 0) { logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream }) + + const deleteEntryId = deletedDocEntries.find(entry => entry.message.docName === task.stream)?.id.toString() + + if (deleteEntryId) this.redis.deleteDeleteDocEntry(deleteEntryId) } else { const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix) // @todo, make sure that awareness by this.getDoc is eventually destroyed, or doesn't @@ -270,8 +278,11 @@ export class Api { } catch (e) { console.error(e) } + logWorker('persisting doc') - await this.store.persistDoc(room, docid, ydoc) + if(!deletedDocNames.includes(task.stream)) { + await this.store.persistDoc(room, docid, ydoc) + } } await promise.all([ storeReferences && docChanged ? this.store.deleteReferences(room, docid, storeReferences) : promise.resolve(), diff --git a/src/redis/io-redis.js b/src/redis/io-redis.js index fb11726..ec5e5a1 100644 --- a/src/redis/io-redis.js +++ b/src/redis/io-redis.js @@ -132,6 +132,20 @@ export class IoRedisAdapter { return reclaimedTasksRes } + async getDeletedDocEntries() { + const deletedDocEntries = await this.redis.xrange('delete', '-', '+') + const transformedDeletedTasks = transformStreamMessagesReply(deletedDocEntries) + + return transformedDeletedTasks + } + + /** + * @param {string} id + */ + async deleteDeleteDocEntry(id) { + this.redis.xdel('delete', id) + } + /** * @param {{ stream: import("ioredis").RedisKey; id: any; }} task */ diff --git a/src/redis/node-redis.js b/src/redis/node-redis.js index 8118ce3..256187f 100644 --- a/src/redis/node-redis.js +++ b/src/redis/node-redis.js @@ -133,6 +133,19 @@ export class NodeRedisAdapter { return reclaimedTasks } + async getDeletedDocEntries() { + const deletedDocEntries = await this.redis.xRange('delete', '-', '+'); + + return deletedDocEntries + } + + /** + * @param {string} id + */ + async deleteDeleteDocEntry(id) { + this.redis.xDel('delete', id) + } + /** * @param {{ stream: import("ioredis").RedisKey; id: any; }} task */ From 7d48e08d18ec78c9ab90063a7d867ec7f191319c Mon Sep 17 00:00:00 2001 From: SevenWaysDP Date: Thu, 19 Sep 2024 14:28:52 +0200 Subject: [PATCH 2/3] BC-7851 - refactor redis connection logic and add remove doc from storage --- src/api.js | 13 +++++++------ src/redis/io-redis.js | 14 +++++++------- src/redis/node-redis.js | 16 ++++++++-------- src/storage.js | 11 ++++++++++- src/storage/s3.js | 22 +++++++++++++++++----- 5 files changed, 49 insertions(+), 27 deletions(-) diff --git a/src/api.js b/src/api.js index a198141..2cd7ba9 100644 --- a/src/api.js +++ b/src/api.js @@ -115,17 +115,15 @@ export class Api { * Minimum lifetime of y* update messages in redis streams. */ this.redisMinMessageLifetime = number.parseInt(env.getConf('redis-min-message-lifetime') || '60000') // default: 1 minute - this.redisWorkerStreamName = this.prefix + ':worker' - this.redisWorkerGroupName = this.prefix + ':worker' this._destroyed = false if (redisInstance instanceof IoRedis) { /** * @type {IoRedisAdapter | NodeRedisAdapter} */ - this.redis = new IoRedisAdapter(redisInstance, this.redisWorkerStreamName, this.redisWorkerGroupName) + this.redis = new IoRedisAdapter(redisInstance, this.prefix) } else if (redisInstance.constructor.name === 'Commander') { - this.redis = new NodeRedisAdapter(redisInstance, this.redisWorkerStreamName, this.redisWorkerGroupName) + this.redis = new NodeRedisAdapter(redisInstance, this.prefix) } else { throw new Error('Invalid redis instance'); } @@ -254,14 +252,17 @@ export class Api { logWorker('Accepted tasks ', { tasks }) await promise.all(tasks.map(async task => { const streamlen = await this.redis.tryClearTask(task) + const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix) if (streamlen === 0) { logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream }) const deleteEntryId = deletedDocEntries.find(entry => entry.message.docName === task.stream)?.id.toString() - if (deleteEntryId) this.redis.deleteDeleteDocEntry(deleteEntryId) + if (deleteEntryId) { + this.redis.deleteDeleteDocEntry(deleteEntryId) + this.store.deleteDocument(room, docid) + } } else { - const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix) // @todo, make sure that awareness by this.getDoc is eventually destroyed, or doesn't // register a timeout anymore logWorker('requesting doc from store') diff --git a/src/redis/io-redis.js b/src/redis/io-redis.js index ec5e5a1..e55bd9d 100644 --- a/src/redis/io-redis.js +++ b/src/redis/io-redis.js @@ -8,12 +8,12 @@ export class IoRedisAdapter { /** * * @param { Redis } redis - * @param { string } redisWorkerStreamName - * @param { string } redisWorkerGroupName + * @param { string } prefix */ - constructor(redis, redisWorkerStreamName, redisWorkerGroupName) { - this.redisWorkerStreamName = redisWorkerStreamName - this.redisWorkerGroupName = redisWorkerGroupName + constructor(redis, prefix) { + this.redisDeleteStreamName = prefix + ':delete' + this.redisWorkerStreamName = prefix + ':worker' + this.redisWorkerGroupName = prefix + ':worker' this.redis = redis this.redis.defineCommand('addMessage', { @@ -133,7 +133,7 @@ export class IoRedisAdapter { } async getDeletedDocEntries() { - const deletedDocEntries = await this.redis.xrange('delete', '-', '+') + const deletedDocEntries = await this.redis.xrange(this.redisDeleteStreamName, '-', '+') const transformedDeletedTasks = transformStreamMessagesReply(deletedDocEntries) return transformedDeletedTasks @@ -143,7 +143,7 @@ export class IoRedisAdapter { * @param {string} id */ async deleteDeleteDocEntry(id) { - this.redis.xdel('delete', id) + this.redis.xdel(this.redisDeleteStreamName, id) } /** diff --git a/src/redis/node-redis.js b/src/redis/node-redis.js index 256187f..6c80beb 100644 --- a/src/redis/node-redis.js +++ b/src/redis/node-redis.js @@ -6,13 +6,13 @@ export class NodeRedisAdapter { /** * - * @param { import('redis').RedisClientType } redis - * @param { string } redisWorkerStreamName - * @param { string } redisWorkerGroupName + * @param { import('redis').RedisClientType } redis + * @param { string } prefix */ - constructor(redis, redisWorkerStreamName, redisWorkerGroupName) { - this.redisWorkerStreamName = redisWorkerStreamName, - this.redisWorkerGroupName = redisWorkerGroupName, + constructor(redis, prefix) { + this.redisDeleteStreamName = prefix + ':delete' + this.redisWorkerStreamName = prefix + ':worker' + this.redisWorkerGroupName = prefix + ':worker' this.redis = redis this.addMessageScript = node_redis.defineScript({ NUMBER_OF_KEYS: 1, @@ -134,7 +134,7 @@ export class NodeRedisAdapter { } async getDeletedDocEntries() { - const deletedDocEntries = await this.redis.xRange('delete', '-', '+'); + const deletedDocEntries = await this.redis.xRange(this.redisDeleteStreamName, '-', '+'); return deletedDocEntries } @@ -143,7 +143,7 @@ export class NodeRedisAdapter { * @param {string} id */ async deleteDeleteDocEntry(id) { - this.redis.xDel('delete', id) + this.redis.xDel(this.redisDeleteStreamName, id) } /** diff --git a/src/storage.js b/src/storage.js index 37abc13..b7e0cdd 100644 --- a/src/storage.js +++ b/src/storage.js @@ -1,5 +1,5 @@ -import * as Y from 'yjs' import * as err from 'lib0/error' +import * as Y from 'yjs' export class AbstractStorage { /** @@ -44,6 +44,15 @@ export class AbstractStorage { err.methodUnimplemented() } + /** + * @param {string} room + * @param {string} docname + * @return {Promise} + */ + deleteDocument (room, docname) { + err.methodUnimplemented() + } + async destroy () { } } diff --git a/src/storage/s3.js b/src/storage/s3.js index 912d81f..8807f50 100644 --- a/src/storage/s3.js +++ b/src/storage/s3.js @@ -1,10 +1,10 @@ -import * as Y from 'yjs' -import * as random from 'lib0/random' -import * as promise from 'lib0/promise' -import * as minio from 'minio' import * as env from 'lib0/environment' -import * as number from 'lib0/number' import * as logging from 'lib0/logging' +import * as number from 'lib0/number' +import * as promise from 'lib0/promise' +import * as random from 'lib0/random' +import * as minio from 'minio' +import * as Y from 'yjs' const log = logging.createModuleLogger('@y/redis/s3') @@ -141,6 +141,18 @@ export class S3Storage { await this.client.removeObjects(this.bucketName, storeReferences) } + /** + * + * @param {string} room + * @param {string} docname + */ + async deleteDocument(room, docname) { + const objNames = await this.client.listObjectsV2(this.bucketName, encodeS3ObjectName(room, docname, ''), true).toArray() + const objectsList = objNames.map(obj => obj.name) + + await this.client.removeObjects(this.bucketName, objectsList); + } + async destroy () { } } From 2189c1bb3644d0e5d42331fbe956fb93bcea3fff Mon Sep 17 00:00:00 2001 From: Max Bischof Date: Mon, 23 Sep 2024 07:55:47 +0200 Subject: [PATCH 3/3] Fix indentation --- src/storage/s3.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/s3.js b/src/storage/s3.js index 8807f50..5c950ac 100644 --- a/src/storage/s3.js +++ b/src/storage/s3.js @@ -147,7 +147,7 @@ export class S3Storage { * @param {string} docname */ async deleteDocument(room, docname) { - const objNames = await this.client.listObjectsV2(this.bucketName, encodeS3ObjectName(room, docname, ''), true).toArray() + const objNames = await this.client.listObjectsV2(this.bucketName, encodeS3ObjectName(room, docname, ''), true).toArray() const objectsList = objNames.map(obj => obj.name) await this.client.removeObjects(this.bucketName, objectsList);