Skip to content

Commit

Permalink
Cork and uncork (#13)
Browse files Browse the repository at this point in the history
* add cork/uncork for perf batching and lock triggers on their collection

* add quick test

* fix standard
  • Loading branch information
mafintosh authored Oct 21, 2024
1 parent af4735f commit 838427c
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 37 deletions.
110 changes: 74 additions & 36 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,38 @@ class Updates {
this.tick = tick // internal tie breaker clock for same key updates
this.clock = clock // engine clock
this.map = new Map(entries)
this.locks = new Map()
}

get size () {
return this.map.size
}

enter (collection) {
if (collection.trigger !== null) {
if (this.locks.has(collection)) return false
this.locks.set(collection, { resolve: null, promise: null })
}

this.mutating++
return true
}

exit (collection) {
this.mutating--
if (collection.trigger === null) return
const { resolve } = this.locks.get(collection)
this.locks.delete(collection)
if (resolve) resolve()
}

wait (collection) {
const state = this.locks.get(collection)
if (state.promise) return state.promise
state.promise = new Promise((resolve) => { state.resolve = resolve })
return state.promise
}

ref () {
this.refs++
return this
Expand Down Expand Up @@ -202,6 +228,14 @@ class HyperDB {
return this.rootInstance !== null && this.rootInstance !== this
}

cork () {
this.engine.cork()
}

uncork () {
this.engine.uncork()
}

ready () {
return this.engine.ready()
}
Expand Down Expand Up @@ -352,34 +386,36 @@ class HyperDB {
const collection = this.definition.resolveCollection(collectionName)
if (collection === null) return

while (this.updates.enter(collection) === false) await this.updates.wait(collection)

const key = collection.encodeKey(doc)

let prevValue = null
this.updates.mutating++

try {
if (collection.trigger !== null) await this._runTrigger(collection, doc, null)
prevValue = await this.engine.get(this.engineSnapshot, key)
} finally {
this.updates.mutating--
}
if (collection.trigger !== null) await this._runTrigger(collection, doc, null)

if (prevValue === null) {
this.updates.delete(key)
return
}
if (prevValue === null) {
this.updates.delete(key)
return
}

const prevDoc = collection.reconstruct(this.version, key, prevValue)
const prevDoc = collection.reconstruct(this.version, key, prevValue)

const u = this.updates.update(collection, key, null)
const u = this.updates.update(collection, key, null)

for (let i = 0; i < collection.indexes.length; i++) {
const idx = collection.indexes[i]
const del = idx.encodeIndexKeys(prevDoc, this.context)
const ups = []
for (let i = 0; i < collection.indexes.length; i++) {
const idx = collection.indexes[i]
const del = idx.encodeIndexKeys(prevDoc, this.context)
const ups = []

u.indexes.push(ups)
u.indexes.push(ups)

for (let j = 0; j < del.length; j++) ups.push({ key: del[j], value: null })
for (let j = 0; j < del.length; j++) ups.push({ key: del[j], value: null })
}
} finally {
this.updates.exit(collection)
}
}

Expand All @@ -391,39 +427,41 @@ class HyperDB {
const collection = this.definition.resolveCollection(collectionName)
if (collection === null) throw new Error('Unknown collection: ' + collectionName)

while (this.updates.enter(collection) === false) await this.updates.wait(collection)

const key = collection.encodeKey(doc)
const value = collection.encodeValue(this.version, doc)

let prevValue = null
this.updates.mutating++

try {
if (collection.trigger !== null) await this._runTrigger(collection, doc, doc)
prevValue = await this.engine.get(this.engineSnapshot, key)
} finally {
this.updates.mutating--
}
if (collection.trigger !== null) await this._runTrigger(collection, doc, doc)

if (prevValue !== null && b4a.equals(value, prevValue)) return
if (prevValue !== null && b4a.equals(value, prevValue)) return

const prevDoc = prevValue === null ? null : collection.reconstruct(this.version, key, prevValue)
const prevDoc = prevValue === null ? null : collection.reconstruct(this.version, key, prevValue)

const u = this.updates.update(collection, key, value)
const u = this.updates.update(collection, key, value)

u.created = prevValue === null
u.created = prevValue === null

for (let i = 0; i < collection.indexes.length; i++) {
const idx = collection.indexes[i]
const prevKeys = prevDoc ? idx.encodeIndexKeys(prevDoc, this.context) : []
const nextKeys = idx.encodeIndexKeys(doc, this.context)
const ups = []
for (let i = 0; i < collection.indexes.length; i++) {
const idx = collection.indexes[i]
const prevKeys = prevDoc ? idx.encodeIndexKeys(prevDoc, this.context) : []
const nextKeys = idx.encodeIndexKeys(doc, this.context)
const ups = []

u.indexes.push(ups)
u.indexes.push(ups)

const [del, put] = diffKeys(prevKeys, nextKeys)
const value = put.length === 0 ? null : idx.encodeValue(doc)
const [del, put] = diffKeys(prevKeys, nextKeys)
const value = put.length === 0 ? null : idx.encodeValue(doc)

for (let j = 0; j < del.length; j++) ups.push({ key: del[j], value: null })
for (let j = 0; j < put.length; j++) ups.push({ key: put[j], value })
for (let j = 0; j < del.length; j++) ups.push({ key: del[j], value: null })
for (let j = 0; j < put.length; j++) ups.push({ key: put[j], value })
}
} finally {
this.updates.exit(collection)
}
}

Expand Down
4 changes: 4 additions & 0 deletions lib/engine/bee.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ module.exports = class BeeEngine {
return Promise.all(promises)
}

cork () {}

uncork () {}

get (snapshot, key) {
const db = this._getDB(snapshot)
return getValue(db, key)
Expand Down
15 changes: 14 additions & 1 deletion lib/engine/rocks.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ module.exports = class RocksEngine {
this.asap = false
this.clock = 0
this.refs = 0
this.batch = null
this.db = typeof storage === 'object' ? storage : new RocksDB(storage)
this.db.ready().catch(noop)
}
Expand All @@ -37,6 +38,7 @@ module.exports = class RocksEngine {
}

close () {
if (this.batch !== null) this.batch.destroy()
return this.db.close()
}

Expand Down Expand Up @@ -69,8 +71,19 @@ module.exports = class RocksEngine {
return Promise.all(promises)
}

cork () {
if (this.batch !== null) return
this.batch = this.db.read()
}

uncork () {
if (this.batch !== null) this.batch.tryFlush()
this.batch = null
}

get (snapshot, key) {
return this.db.get(key, { snapshot: getSnapshot(snapshot) })
const options = { snapshot: getSnapshot(snapshot) }
return this.batch === null ? this.db.get(key, options) : this.batch.get(key, options)
}

createReadStream (snapshot, range, options) {
Expand Down
16 changes: 16 additions & 0 deletions test/basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ test('watch', async function ({ create }, t) {

test('basic reopen', async function ({ create }, t) {
const storage = await tmp(t)

{
const db = await create(definition, { storage })
await db.insert('members', { id: 'maf', age: 34 })
Expand All @@ -261,3 +262,18 @@ test('basic reopen', async function ({ create }, t) {
await db.close()
}
})

test('cork/uncork', async function ({ create }, t) {
const db = await create()

db.cork()
const all = [
db.insert('@db/members', { id: 'maf', age: 34 }),
db.insert('@db/members', { id: 'andrew', age: 30 })
]
db.uncork()

await Promise.all(all)
t.pass('did not crash')
await db.close()
})

0 comments on commit 838427c

Please sign in to comment.