Skip to content

Commit

Permalink
upgrade to rocks 3.0 (#28)
Browse files Browse the repository at this point in the history
* upgrade to rocks 3.0

* unused prop
  • Loading branch information
mafintosh authored Dec 20, 2024
1 parent 745fe05 commit 267252c
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 137 deletions.
45 changes: 17 additions & 28 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,15 @@ class HyperDB {
}

cork () {
this.engine.cork()
this.engineSnapshot.cork()
}

uncork () {
this.engine.uncork()
this.engineSnapshot.uncork()
}

ready () {
return this.engine.ready()
return this.engineSnapshot.ready()
}

close () {
Expand All @@ -272,7 +272,7 @@ class HyperDB {
this.updates.unref()
this.updates = null

if (this.engineSnapshot) this.engineSnapshot.unref()
this.engineSnapshot.unref()
this.engineSnapshot = null

if (--this.engine.refs === 0) await this.engine.close()
Expand All @@ -282,9 +282,7 @@ class HyperDB {
}

_createSnapshot (rootInstance, writable, context) {
const snapshot = this.engineSnapshot === null
? this.engine.snapshot()
: this.engineSnapshot.ref()
const snapshot = this.engineSnapshot.ref()

return new HyperDB(this.engine, this.definition, {
version: this.version,
Expand Down Expand Up @@ -366,8 +364,7 @@ class HyperDB {
}

async get (collectionName, doc) {
const snap = this.engineSnapshot
if (snap !== null) snap.ref()
const snap = this.engineSnapshot.ref()

try {
const collection = this.definition.resolveCollection(collectionName)
Expand All @@ -390,7 +387,7 @@ class HyperDB {
const key = b4a.isBuffer(doc) ? doc : collection.encodeKey(doc)

const u = this.updates.get(key)
const value = u !== null ? u.value : await this.engine.get(snap, key)
const value = u !== null ? u.value : await snap.get(key)

return value === null ? null : collection.reconstruct(this.version, key, value)
}
Expand All @@ -404,7 +401,7 @@ class HyperDB {
const u = this.updates.getIndex(index, key)
if (u !== null) return index.collection.reconstruct(this.version, u.key, u.value)

const value = await this.engine.get(snap, key)
const value = await snap.get(key)
if (value === null) return null

return this._getCollection(index.collection, snap, index.reconstruct(key, value))
Expand All @@ -425,15 +422,13 @@ class HyperDB {

while (this.updates.enter(collection) === false) await this.updates.wait(collection)

const snap = this.engineSnapshot
const snap = this.engineSnapshot.ref()
const key = collection.encodeKey(doc)

if (snap !== null) snap.ref()

let prevValue = null

try {
prevValue = await this.engine.get(snap, key)
prevValue = await this.engineSnapshot.get(key)
if (collection.trigger !== null) await this._runTrigger(collection, doc, null)

if (prevValue === null) {
Expand All @@ -455,7 +450,7 @@ class HyperDB {
for (let j = 0; j < del.length; j++) ups.push({ key: del[j], value: null })
}
} finally {
if (snap !== null) snap.unref()
snap.unref()
this.updates.exit(collection)
}
}
Expand All @@ -470,16 +465,14 @@ class HyperDB {

while (this.updates.enter(collection) === false) await this.updates.wait(collection)

const snap = this.engineSnapshot
const snap = this.engineSnapshot.ref()
const key = collection.encodeKey(doc)
const value = collection.encodeValue(this.version, doc)

if (snap !== null) snap.ref()

let prevValue = null

try {
prevValue = await this.engine.get(snap, key)
prevValue = await this.engineSnapshot.get(key)
if (collection.trigger !== null) await this._runTrigger(collection, doc, doc)

if (prevValue !== null && b4a.equals(value, prevValue)) return
Expand All @@ -505,25 +498,21 @@ class HyperDB {
for (let j = 0; j < put.length; j++) ups.push({ key: put[j], value })
}
} finally {
if (snap !== null) snap.unref()
snap.unref()
this.updates.exit(collection)
}
}

update () {
maybeClosed(this)

if (this.engineSnapshot !== null && !this.engine.outdated(this.engineSnapshot)) {
return
}
if (!this.engine.outdated(this.engineSnapshot)) return

if (this.updates.refs > 1) this.updates = this.updates.detach()
this.updates.flush()

if (this.engineSnapshot !== null) {
this.engineSnapshot.unref()
this.engineSnapshot = this.engine.snapshot()
}
this.engineSnapshot.unref()
this.engineSnapshot = this.engine.snapshot()

if (this.watchers !== null) {
for (const fn of this.watchers) fn()
Expand Down
77 changes: 35 additions & 42 deletions lib/engine/bee.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,39 @@ class BeeSnapshot {
this.snapshot = null
}
}

cork () {}

uncork () {}

getIndirectRange (reconstruct, entries) {
const promises = new Array(entries.length)

for (let i = 0; i < promises.length; i++) {
const { key, value } = entries[i]
promises[i] = getWrapped(this.snapshot, key, reconstruct(key, value))
}

return promises
}

getBatch (keys) {
const promises = new Array(keys.length)

for (let i = 0; i < keys.length; i++) {
promises[i] = getValue(this.snapshot, keys[i])
}

return Promise.all(promises)
}

get (key) {
return getValue(this.snapshot, key)
}

createReadStream (range, options) {
return this.snapshot.createReadStream(range, options)
}
}

class ChangesStream extends Readable {
Expand Down Expand Up @@ -112,7 +145,8 @@ module.exports = class BeeEngine {
}

changes (snapshot, version, definition, range) {
return new ChangesStream(this._getDB(snapshot), version, definition, range)
const db = snapshot === null ? this.db : snapshot.snapshot
return new ChangesStream(db, version, definition, range)
}

snapshot () {
Expand All @@ -123,43 +157,6 @@ module.exports = class BeeEngine {
return snap === null || this.core.length !== snap.snapshot.core.length || this.core.fork !== snap.snapshot.core.fork
}

getIndirectRange (snapshot, reconstruct, entries) {
const db = this._getDB(snapshot)
const promises = new Array(entries.length)

for (let i = 0; i < promises.length; i++) {
const { key, value } = entries[i]
promises[i] = getWrapped(db, key, reconstruct(key, value))
}

return promises
}

getBatch (snapshot, keys) {
const db = this._getDB(snapshot)
const promises = new Array(keys.length)

for (let i = 0; i < keys.length; i++) {
promises[i] = getValue(db, keys[i])
}

return Promise.all(promises)
}

cork () {}

uncork () {}

get (snapshot, key) {
const db = this._getDB(snapshot)
return getValue(db, key)
}

createReadStream (snapshot, range, options) {
const db = this._getDB(snapshot)
return db.createReadStream(range, options)
}

async commit (updates) {
this.clock++

Expand All @@ -179,10 +176,6 @@ module.exports = class BeeEngine {

await batch.flush()
}

_getDB (snapshot) {
return snapshot === null ? this.db : snapshot.snapshot
}
}

async function getWrapped (db, key, value) {
Expand Down
Loading

0 comments on commit 267252c

Please sign in to comment.