diff --git a/index.js b/index.js index e4c8b7e..f88ccd7 100644 --- a/index.js +++ b/index.js @@ -14,12 +14,38 @@ class Updates { this.tick = tick // internal tie breaker clock for same key updates this.clock = clock // engine clock this.map = new Map(entries) + this.locks = new Map() } get size () { return this.map.size } + enter (collection) { + if (collection.trigger !== null) { + if (this.locks.has(collection)) return false + this.locks.set(collection, { resolve: null, promise: null }) + } + + this.mutating++ + return true + } + + exit (collection) { + this.mutating-- + if (collection.trigger === null) return + const { resolve } = this.locks.get(collection) + this.locks.delete(collection) + if (resolve) resolve() + } + + wait (collection) { + const state = this.locks.get(collection) + if (state.promise) return state.promise + state.promise = new Promise((resolve) => { state.resolve = resolve }) + return state.promise + } + ref () { this.refs++ return this @@ -202,6 +228,14 @@ class HyperDB { return this.rootInstance !== null && this.rootInstance !== this } + cork () { + this.engine.cork() + } + + uncork () { + this.engine.uncork() + } + ready () { return this.engine.ready() } @@ -352,34 +386,36 @@ class HyperDB { const collection = this.definition.resolveCollection(collectionName) if (collection === null) return + while (this.updates.enter(collection) === false) await this.updates.wait(collection) + const key = collection.encodeKey(doc) let prevValue = null - this.updates.mutating++ + try { - if (collection.trigger !== null) await this._runTrigger(collection, doc, null) prevValue = await this.engine.get(this.engineSnapshot, key) - } finally { - this.updates.mutating-- - } + if (collection.trigger !== null) await this._runTrigger(collection, doc, null) - if (prevValue === null) { - this.updates.delete(key) - return - } + if (prevValue === null) { + this.updates.delete(key) + return + } - const prevDoc = collection.reconstruct(this.version, key, prevValue) + const prevDoc = collection.reconstruct(this.version, key, prevValue) - const u = this.updates.update(collection, key, null) + const u = this.updates.update(collection, key, null) - for (let i = 0; i < collection.indexes.length; i++) { - const idx = collection.indexes[i] - const del = idx.encodeIndexKeys(prevDoc, this.context) - const ups = [] + for (let i = 0; i < collection.indexes.length; i++) { + const idx = collection.indexes[i] + const del = idx.encodeIndexKeys(prevDoc, this.context) + const ups = [] - u.indexes.push(ups) + u.indexes.push(ups) - for (let j = 0; j < del.length; j++) ups.push({ key: del[j], value: null }) + for (let j = 0; j < del.length; j++) ups.push({ key: del[j], value: null }) + } + } finally { + this.updates.exit(collection) } } @@ -391,39 +427,41 @@ class HyperDB { const collection = this.definition.resolveCollection(collectionName) if (collection === null) throw new Error('Unknown collection: ' + collectionName) + while (this.updates.enter(collection) === false) await this.updates.wait(collection) + const key = collection.encodeKey(doc) const value = collection.encodeValue(this.version, doc) let prevValue = null - this.updates.mutating++ + try { - if (collection.trigger !== null) await this._runTrigger(collection, doc, doc) prevValue = await this.engine.get(this.engineSnapshot, key) - } finally { - this.updates.mutating-- - } + if (collection.trigger !== null) await this._runTrigger(collection, doc, doc) - if (prevValue !== null && b4a.equals(value, prevValue)) return + if (prevValue !== null && b4a.equals(value, prevValue)) return - const prevDoc = prevValue === null ? null : collection.reconstruct(this.version, key, prevValue) + const prevDoc = prevValue === null ? null : collection.reconstruct(this.version, key, prevValue) - const u = this.updates.update(collection, key, value) + const u = this.updates.update(collection, key, value) - u.created = prevValue === null + u.created = prevValue === null - for (let i = 0; i < collection.indexes.length; i++) { - const idx = collection.indexes[i] - const prevKeys = prevDoc ? idx.encodeIndexKeys(prevDoc, this.context) : [] - const nextKeys = idx.encodeIndexKeys(doc, this.context) - const ups = [] + for (let i = 0; i < collection.indexes.length; i++) { + const idx = collection.indexes[i] + const prevKeys = prevDoc ? idx.encodeIndexKeys(prevDoc, this.context) : [] + const nextKeys = idx.encodeIndexKeys(doc, this.context) + const ups = [] - u.indexes.push(ups) + u.indexes.push(ups) - const [del, put] = diffKeys(prevKeys, nextKeys) - const value = put.length === 0 ? null : idx.encodeValue(doc) + const [del, put] = diffKeys(prevKeys, nextKeys) + const value = put.length === 0 ? null : idx.encodeValue(doc) - for (let j = 0; j < del.length; j++) ups.push({ key: del[j], value: null }) - for (let j = 0; j < put.length; j++) ups.push({ key: put[j], value }) + for (let j = 0; j < del.length; j++) ups.push({ key: del[j], value: null }) + for (let j = 0; j < put.length; j++) ups.push({ key: put[j], value }) + } + } finally { + this.updates.exit(collection) } } diff --git a/lib/engine/bee.js b/lib/engine/bee.js index 51b404d..a059f87 100644 --- a/lib/engine/bee.js +++ b/lib/engine/bee.js @@ -69,6 +69,10 @@ module.exports = class BeeEngine { return Promise.all(promises) } + cork () {} + + uncork () {} + get (snapshot, key) { const db = this._getDB(snapshot) return getValue(db, key) diff --git a/lib/engine/rocks.js b/lib/engine/rocks.js index 9b964f9..9d9c54f 100644 --- a/lib/engine/rocks.js +++ b/lib/engine/rocks.js @@ -24,6 +24,7 @@ module.exports = class RocksEngine { this.asap = false this.clock = 0 this.refs = 0 + this.batch = null this.db = typeof storage === 'object' ? storage : new RocksDB(storage) this.db.ready().catch(noop) } @@ -37,6 +38,7 @@ module.exports = class RocksEngine { } close () { + if (this.batch !== null) this.batch.destroy() return this.db.close() } @@ -69,8 +71,19 @@ module.exports = class RocksEngine { return Promise.all(promises) } + cork () { + if (this.batch !== null) return + this.batch = this.db.read() + } + + uncork () { + if (this.batch !== null) this.batch.tryFlush() + this.batch = null + } + get (snapshot, key) { - return this.db.get(key, { snapshot: getSnapshot(snapshot) }) + const options = { snapshot: getSnapshot(snapshot) } + return this.batch === null ? this.db.get(key, options) : this.batch.get(key, options) } createReadStream (snapshot, range, options) { diff --git a/test/basic.js b/test/basic.js index 52d482d..0f6b93c 100644 --- a/test/basic.js +++ b/test/basic.js @@ -247,6 +247,7 @@ test('watch', async function ({ create }, t) { test('basic reopen', async function ({ create }, t) { const storage = await tmp(t) + { const db = await create(definition, { storage }) await db.insert('members', { id: 'maf', age: 34 }) @@ -261,3 +262,18 @@ test('basic reopen', async function ({ create }, t) { await db.close() } }) + +test('cork/uncork', async function ({ create }, t) { + const db = await create() + + db.cork() + const all = [ + db.insert('@db/members', { id: 'maf', age: 34 }), + db.insert('@db/members', { id: 'andrew', age: 30 }) + ] + db.uncork() + + await Promise.all(all) + t.pass('did not crash') + await db.close() +})