Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bc 7851 #15

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions src/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,15 @@ export class Api {
* Minimum lifetime of y* update messages in redis streams.
*/
this.redisMinMessageLifetime = number.parseInt(env.getConf('redis-min-message-lifetime') || '60000') // default: 1 minute
this.redisWorkerStreamName = this.prefix + ':worker'
this.redisWorkerGroupName = this.prefix + ':worker'
this._destroyed = false

if (redisInstance instanceof IoRedis) {
/**
* @type {IoRedisAdapter | NodeRedisAdapter}
*/
this.redis = new IoRedisAdapter(redisInstance, this.redisWorkerStreamName, this.redisWorkerGroupName)
this.redis = new IoRedisAdapter(redisInstance, this.prefix)
} else if (redisInstance.constructor.name === 'Commander') {
this.redis = new NodeRedisAdapter(redisInstance, this.redisWorkerStreamName, this.redisWorkerGroupName)
this.redis = new NodeRedisAdapter(redisInstance, this.prefix)
} else {
throw new Error('Invalid redis instance');
}
Expand Down Expand Up @@ -237,6 +235,10 @@ export class Api {
const tasks = []

const reclaimedTasks = await this.redis.reclaimTasks(this.consumername, this.redisTaskDebounce, tryClaimCount)
const deletedDocEntries = await this.redis.getDeletedDocEntries()
const deletedDocNames = deletedDocEntries?.map(entry => {
return entry.message.docName
})

reclaimedTasks?.messages.forEach(m => {
const stream = m?.message.compact
Expand All @@ -250,10 +252,17 @@ export class Api {
logWorker('Accepted tasks ', { tasks })
await promise.all(tasks.map(async task => {
const streamlen = await this.redis.tryClearTask(task)
const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix)
if (streamlen === 0) {
logWorker('Stream still empty, removing recurring task from queue ', { stream: task.stream })

const deleteEntryId = deletedDocEntries.find(entry => entry.message.docName === task.stream)?.id.toString()

if (deleteEntryId) {
this.redis.deleteDeleteDocEntry(deleteEntryId)
this.store.deleteDocument(room, docid)
}
} else {
const { room, docid } = decodeRedisRoomStreamName(task.stream, this.prefix)
// @todo, make sure that awareness by this.getDoc is eventually destroyed, or doesn't
// register a timeout anymore
logWorker('requesting doc from store')
Expand All @@ -270,8 +279,11 @@ export class Api {
} catch (e) {
console.error(e)
}

logWorker('persisting doc')
await this.store.persistDoc(room, docid, ydoc)
if(!deletedDocNames.includes(task.stream)) {
await this.store.persistDoc(room, docid, ydoc)
}
}
await promise.all([
storeReferences && docChanged ? this.store.deleteReferences(room, docid, storeReferences) : promise.resolve(),
Expand Down
24 changes: 19 additions & 5 deletions src/redis/io-redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ export class IoRedisAdapter {
/**
*
* @param { Redis } redis
* @param { string } redisWorkerStreamName
* @param { string } redisWorkerGroupName
* @param { string } prefix
*/
constructor(redis, redisWorkerStreamName, redisWorkerGroupName) {
this.redisWorkerStreamName = redisWorkerStreamName
this.redisWorkerGroupName = redisWorkerGroupName
constructor(redis, prefix) {
this.redisDeleteStreamName = prefix + ':delete'
this.redisWorkerStreamName = prefix + ':worker'
this.redisWorkerGroupName = prefix + ':worker'
this.redis = redis

this.redis.defineCommand('addMessage', {
Expand Down Expand Up @@ -132,6 +132,20 @@ export class IoRedisAdapter {
return reclaimedTasksRes
}

async getDeletedDocEntries() {
const deletedDocEntries = await this.redis.xrange(this.redisDeleteStreamName, '-', '+')
const transformedDeletedTasks = transformStreamMessagesReply(deletedDocEntries)

return transformedDeletedTasks
}

/**
* @param {string} id
*/
async deleteDeleteDocEntry(id) {
this.redis.xdel(this.redisDeleteStreamName, id)
}

/**
* @param {{ stream: import("ioredis").RedisKey; id: any; }} task
*/
Expand Down
25 changes: 19 additions & 6 deletions src/redis/node-redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ export class NodeRedisAdapter {

/**
*
* @param { import('redis').RedisClientType } redis
* @param { string } redisWorkerStreamName
* @param { string } redisWorkerGroupName
* @param { import('redis').RedisClientType } redis
* @param { string } prefix
*/
constructor(redis, redisWorkerStreamName, redisWorkerGroupName) {
this.redisWorkerStreamName = redisWorkerStreamName,
this.redisWorkerGroupName = redisWorkerGroupName,
constructor(redis, prefix) {
this.redisDeleteStreamName = prefix + ':delete'
this.redisWorkerStreamName = prefix + ':worker'
this.redisWorkerGroupName = prefix + ':worker'
this.redis = redis
this.addMessageScript = node_redis.defineScript({
NUMBER_OF_KEYS: 1,
Expand Down Expand Up @@ -133,6 +133,19 @@ export class NodeRedisAdapter {
return reclaimedTasks
}

async getDeletedDocEntries() {
const deletedDocEntries = await this.redis.xRange(this.redisDeleteStreamName, '-', '+');

return deletedDocEntries
}

/**
* @param {string} id
*/
async deleteDeleteDocEntry(id) {
this.redis.xDel(this.redisDeleteStreamName, id)
}

/**
* @param {{ stream: import("ioredis").RedisKey; id: any; }} task
*/
Expand Down
11 changes: 10 additions & 1 deletion src/storage.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as Y from 'yjs'
import * as err from 'lib0/error'
import * as Y from 'yjs'

export class AbstractStorage {
/**
Expand Down Expand Up @@ -44,6 +44,15 @@ export class AbstractStorage {
err.methodUnimplemented()
}

/**
* @param {string} room
* @param {string} docname
* @return {Promise<void>}
*/
deleteDocument (room, docname) {
err.methodUnimplemented()
}

async destroy () {
}
}
22 changes: 17 additions & 5 deletions src/storage/s3.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import * as Y from 'yjs'
import * as random from 'lib0/random'
import * as promise from 'lib0/promise'
import * as minio from 'minio'
import * as env from 'lib0/environment'
import * as number from 'lib0/number'
import * as logging from 'lib0/logging'
import * as number from 'lib0/number'
import * as promise from 'lib0/promise'
import * as random from 'lib0/random'
import * as minio from 'minio'
import * as Y from 'yjs'

const log = logging.createModuleLogger('@y/redis/s3')

Expand Down Expand Up @@ -141,6 +141,18 @@ export class S3Storage {
await this.client.removeObjects(this.bucketName, storeReferences)
}

/**
*
* @param {string} room
* @param {string} docname
*/
async deleteDocument(room, docname) {
const objNames = await this.client.listObjectsV2(this.bucketName, encodeS3ObjectName(room, docname, ''), true).toArray()
const objectsList = objNames.map(obj => obj.name)

await this.client.removeObjects(this.bucketName, objectsList);
}

async destroy () {
}
}