Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) Experimental "Lite" Watcher #148

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 224 additions & 20 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const mutexify = require('mutexify/promise')
const b4a = require('b4a')
const safetyCatch = require('safety-catch')
const ReadyResource = require('ready-resource')
const SignalPromise = require('signal-promise')
const debounce = require('debounceify')
const Xache = require('xache')

Expand Down Expand Up @@ -376,9 +377,10 @@ class Hyperbee extends ReadyResource {
this._sub = !!this.prefix
this._checkout = opts.checkout || 0
this._view = !!opts._view
this._liteWatchers = !!opts.liteWatchers

this._onappendBound = this._view ? null : this._onappend.bind(this)
this._ontruncateBound = this._view ? null : this._ontruncate.bind(this)
this._onappendBound = this._view ? null : debounce(this._onappend.bind(this))
this._ontruncateBound = this._view ? null : debounce(this._ontruncate.bind(this))
this._watchers = this._onappendBound ? [] : null
this._entryWatchers = this._onappendBound ? [] : null
this._sessions = opts.sessions !== false
Expand Down Expand Up @@ -524,7 +526,7 @@ class Hyperbee extends ReadyResource {

watch (range, opts) {
if (!this._watchers) throw new Error('Can only watch the main bee instance')
return new Watcher(this, range, opts)
return this._liteWatchers ? new LiteWatcher(this, range, opts) : new DiffWatcher(this, range, opts)
}

async getAndWatch (key, opts) {
Expand All @@ -541,26 +543,35 @@ class Hyperbee extends ReadyResource {
return watcher
}

_onappend () {
for (const watcher of this._watchers) {
watcher._onappend()
}

for (const watcher of this._entryWatchers) {
watcher._onappend()
async _onappend () {
const b = this.batch()
try {
for (const watcher of this._watchers) {
watcher._onappend(b)
}
for (const watcher of this._entryWatchers) {
watcher._onappend(b)
}
await b.close()
} catch (err) {
safetyCatch(err)
}
}

_ontruncate () {
for (const watcher of this._watchers) {
watcher._ontruncate()
}

for (const watcher of this._entryWatchers) {
watcher._ontruncate()
async _ontruncate () {
const b = this.batch()
try {
for (const watcher of this._watchers) {
watcher._ontruncate(b)
}
for (const watcher of this._entryWatchers) {
watcher._ontruncate(b)
}
this._keyCache.gc(this.core.length)
await b.close()
} catch (err) {
safetyCatch(err)
}

this._keyCache.gc(this.core.length)
}

_makeSnapshot () {
Expand Down Expand Up @@ -1167,7 +1178,192 @@ class EntryWatcher extends ReadyResource {
}
}

class Watcher extends ReadyResource {
class LiteWatcher extends ReadyResource {
constructor (bee, range, opts = {}) {
super()

this.keyEncoding = opts.keyEncoding || bee.keyEncoding
this.valueEncoding = opts.valueEncoding || bee.valueEncoding
this.index = bee._watchers.push(this) - 1
this.bee = bee
this.core = bee.core

this.range = range
this._encodedRange = null
if (this.range && this.keyEncoding) {
this._encodedRange = encRange(this.keyEncoding, this.range)
} else {
this._encodedRange = this.range
}

this._currentBatch = null
this._currentLength = 1
this._currentFork = -1
this._lock = mutexify()
this._signal = new SignalPromise()
this._flowing = false
this._eager = !!opts.eager
this._updateOnce = !!opts.updateOnce
this._onchange = opts.onchange || null

this.on('newListener', autoFlowOnUpdate)

this.ready().catch(safetyCatch)
}

async _consume () {
if (this._flowing) return
try {
for await (const _ of this) {} // eslint-disable-line
} catch {}
}

async _open () {
await this.bee.ready()

this._currentLength = this.core.length
this._currentFork = this.core.fork

if (this._onchange) {
if (this._eager) await this._onchange()
this._consume()
}
}

[Symbol.asyncIterator] () {
this._flowing = true
return this
}

_ontruncate (batch) {
if (!this.core.isAutobase) return
this._currentBatch = batch
this._signal.notify()
}

_onappend (batch) {
// TODO: this is a light hack / fix for non-sparse session reporting .length's inside batches
// the better solution is propably just to change non-sparse sessions to not report a fake length
if (!this.core.isAutobase && (!this.core.core || this.core.core.tree.length !== this.core.length)) return
this._currentBatch = batch
this._signal.notify()
}

async next () {
try {
return await this._next()
} catch (err) {
if (this.closing) return { value: undefined, done: true }
await this.close()
throw err
}
}

async _shouldTriggerRange (delta) {
const promises = []
for (let i = this._currentLength - delta; i < this._currentLength; i++) {
promises.push(this._currentBatch.getKey(i))
}

const keys = await Promise.all(promises)
if (this.closing) return false

for (const key of keys) {
if (rangeContains(this._encodedRange, key)) return true
}
return false
}

async _next () {
const release = await this._lock()

try {
if (this.closing) return { value: undefined, done: true }

if (!this.opened) await this.ready()

while (true) {
if (this.closing) return { value: undefined, done: true }

if (this._updateOnce) {
this._updateOnce = false
await this.bee.update({ wait: true })
}

if (this.closing) return { value: undefined, done: true }

const prevFork = this._currentFork
const prevLength = this._currentLength

this._currentFork = this.core.fork
this._currentLength = this.core.length

const delta = this._currentLength - prevLength

if (this._currentFork !== prevFork) {
return await this._yield()
}

if (delta !== 0) {
if (delta < 0) {
// Notify on all truncations
return await this._yield()
} else {
if (!this.range || (delta > 5)) {
// If the delta is large, or no range is specified, always notify
return await this._yield()
} else {
// Otherwise load the range and see if the nodes are in-range
const shouldTrigger = await this._shouldTriggerRange(delta)
if (this.closing) return { value: undefined, done: true }
if (shouldTrigger) return await this._yield()
}
}
}

await this._signal.wait()
}
} finally {
release()
}
}

async _yield () {
if (this._onchange) {
try {
await this._onchange()
} catch (err) {
safetyCatch(err)
}
}
this.emit('update')
return { done: false, value: null }
}

async return () {
await this.close()
return { done: true }
}

async _close () {
const top = this.bee._watchers.pop()
if (top !== this) {
top.index = this.index
this.bee._watchers[top.index] = top
}

this._onappend() // Continue execution being closed

const release = await this._lock()
release()
}

destroy () {
return this.close()
}
}

class DiffWatcher extends ReadyResource {
constructor (bee, range, opts = {}) {
super()

Expand Down Expand Up @@ -1546,6 +1742,14 @@ function sameValue (a, b) {
return a === b || (a !== null && b !== null && b4a.equals(a, b))
}

function rangeContains (range, key) {
if (range.gt && (b4a.compare(range.gt, key) >= 0)) return false
if (range.gte && (b4a.compare(range.gte, key) > 0)) return false
if (range.lt && (b4a.compare(range.lt, key) <= 0)) return false
if (range.lte && (b4a.compare(range.lte, key) < 0)) return false
return true
}

function noop () {}

module.exports = Hyperbee
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"protocol-buffers-encodings": "^1.2.0",
"ready-resource": "^1.0.0",
"safety-catch": "^1.0.2",
"signal-promise": "^1.0.3",
"streamx": "^2.12.4",
"xache": "^1.2.1"
},
Expand Down
27 changes: 27 additions & 0 deletions test/watch.js
Original file line number Diff line number Diff line change
Expand Up @@ -701,3 +701,30 @@ test('watch uses the bee`s encodings by default', async function (t) {
break
}
})

test('lite watcher triggers on ranges only', async function (t) {
t.plan(1)

const db = await createRange(50)

const watcher = db.watch({ gte: '14' }, { lite: true })
t.teardown(() => watcher.destroy())

// + could be simpler but could be a helper for other tests
let next = watcher.next()
let onchange = null
next.then(data => {
next = watcher.next()
onchange(data)
})

onchange = () => t.fail('should not trigger changes')
await db.put('13')
await eventFlush()
onchange = null

onchange = () => t.pass('change')
await db.put('14')
await eventFlush()
onchange = null
})
Loading