From 13cd6fdf7284d337ec0a08d1a7dc6bcfcdce7952 Mon Sep 17 00:00:00 2001 From: Andrew Osheroff Date: Fri, 19 Apr 2024 10:24:27 +0200 Subject: [PATCH] Working through experimental lite watcher --- index.js | 244 +++++++++++++++++++++++++++++++++++++++++++++----- package.json | 1 + test/watch.js | 27 ++++++ 3 files changed, 252 insertions(+), 20 deletions(-) diff --git a/index.js b/index.js index 82fad45..1e8cab4 100644 --- a/index.js +++ b/index.js @@ -4,6 +4,7 @@ const mutexify = require('mutexify/promise') const b4a = require('b4a') const safetyCatch = require('safety-catch') const ReadyResource = require('ready-resource') +const SignalPromise = require('signal-promise') const debounce = require('debounceify') const Xache = require('xache') @@ -376,9 +377,10 @@ class Hyperbee extends ReadyResource { this._sub = !!this.prefix this._checkout = opts.checkout || 0 this._view = !!opts._view + this._liteWatchers = !!opts.liteWatchers - this._onappendBound = this._view ? null : this._onappend.bind(this) - this._ontruncateBound = this._view ? null : this._ontruncate.bind(this) + this._onappendBound = this._view ? null : debounce(this._onappend.bind(this)) + this._ontruncateBound = this._view ? null : debounce(this._ontruncate.bind(this)) this._watchers = this._onappendBound ? [] : null this._entryWatchers = this._onappendBound ? [] : null this._sessions = opts.sessions !== false @@ -524,7 +526,7 @@ class Hyperbee extends ReadyResource { watch (range, opts) { if (!this._watchers) throw new Error('Can only watch the main bee instance') - return new Watcher(this, range, opts) + return this._liteWatchers ? new LiteWatcher(this, range, opts) : new DiffWatcher(this, range, opts) } async getAndWatch (key, opts) { @@ -541,26 +543,35 @@ class Hyperbee extends ReadyResource { return watcher } - _onappend () { - for (const watcher of this._watchers) { - watcher._onappend() - } - - for (const watcher of this._entryWatchers) { - watcher._onappend() + async _onappend () { + const b = this.batch() + try { + for (const watcher of this._watchers) { + watcher._onappend(b) + } + for (const watcher of this._entryWatchers) { + watcher._onappend(b) + } + await b.close() + } catch (err) { + safetyCatch(err) } } - _ontruncate () { - for (const watcher of this._watchers) { - watcher._ontruncate() - } - - for (const watcher of this._entryWatchers) { - watcher._ontruncate() + async _ontruncate () { + const b = this.batch() + try { + for (const watcher of this._watchers) { + watcher._ontruncate(b) + } + for (const watcher of this._entryWatchers) { + watcher._ontruncate(b) + } + this._keyCache.gc(this.core.length) + await b.close() + } catch (err) { + safetyCatch(err) } - - this._keyCache.gc(this.core.length) } _makeSnapshot () { @@ -1167,7 +1178,192 @@ class EntryWatcher extends ReadyResource { } } -class Watcher extends ReadyResource { +class LiteWatcher extends ReadyResource { + constructor (bee, range, opts = {}) { + super() + + this.keyEncoding = opts.keyEncoding || bee.keyEncoding + this.valueEncoding = opts.valueEncoding || bee.valueEncoding + this.index = bee._watchers.push(this) - 1 + this.bee = bee + this.core = bee.core + + this.range = range + this._encodedRange = null + if (this.range && this.keyEncoding) { + this._encodedRange = encRange(this.keyEncoding, this.range) + } else { + this._encodedRange = this.range + } + + this._currentBatch = null + this._currentLength = 1 + this._currentFork = -1 + this._lock = mutexify() + this._signal = new SignalPromise() + this._flowing = false + this._eager = !!opts.eager + this._updateOnce = !!opts.updateOnce + this._onchange = opts.onchange || null + + this.on('newListener', autoFlowOnUpdate) + + this.ready().catch(safetyCatch) + } + + async _consume () { + if (this._flowing) return + try { + for await (const _ of this) {} // eslint-disable-line + } catch {} + } + + async _open () { + await this.bee.ready() + + this._currentLength = this.core.length + this._currentFork = this.core.fork + + if (this._onchange) { + if (this._eager) await this._onchange() + this._consume() + } + } + + [Symbol.asyncIterator] () { + this._flowing = true + return this + } + + _ontruncate (batch) { + if (!this.core.isAutobase) return + this._currentBatch = batch + this._signal.notify() + } + + _onappend (batch) { + // TODO: this is a light hack / fix for non-sparse session reporting .length's inside batches + // the better solution is propably just to change non-sparse sessions to not report a fake length + if (!this.core.isAutobase && (!this.core.core || this.core.core.tree.length !== this.core.length)) return + this._currentBatch = batch + this._signal.notify() + } + + async next () { + try { + return await this._next() + } catch (err) { + if (this.closing) return { value: undefined, done: true } + await this.close() + throw err + } + } + + async _shouldTriggerRange (delta) { + const promises = [] + for (let i = this._currentLength - delta; i < this._currentLength; i++) { + promises.push(this._currentBatch.getKey(i)) + } + + const keys = await Promise.all(promises) + if (this.closing) return false + + for (const key of keys) { + if (rangeContains(this._encodedRange, key)) return true + } + return false + } + + async _next () { + const release = await this._lock() + + try { + if (this.closing) return { value: undefined, done: true } + + if (!this.opened) await this.ready() + + while (true) { + if (this.closing) return { value: undefined, done: true } + + if (this._updateOnce) { + this._updateOnce = false + await this.bee.update({ wait: true }) + } + + if (this.closing) return { value: undefined, done: true } + + const prevFork = this._currentFork + const prevLength = this._currentLength + + this._currentFork = this.core.fork + this._currentLength = this.core.length + + const delta = this._currentLength - prevLength + + if (this._currentFork !== prevFork) { + return await this._yield() + } + + if (delta !== 0) { + if (delta < 0) { + // Notify on all truncations + return await this._yield() + } else { + if (!this.range || (delta > 5)) { + // If the delta is large, or no range is specified, always notify + return await this._yield() + } else { + // Otherwise load the range and see if the nodes are in-range + const shouldTrigger = await this._shouldTriggerRange(delta) + if (this.closing) return { value: undefined, done: true } + if (shouldTrigger) return await this._yield() + } + } + } + + await this._signal.wait() + } + } finally { + release() + } + } + + async _yield () { + if (this._onchange) { + try { + await this._onchange() + } catch (err) { + safetyCatch(err) + } + } + this.emit('update') + return { done: false, value: null } + } + + async return () { + await this.close() + return { done: true } + } + + async _close () { + const top = this.bee._watchers.pop() + if (top !== this) { + top.index = this.index + this.bee._watchers[top.index] = top + } + + this._onappend() // Continue execution being closed + + const release = await this._lock() + release() + } + + destroy () { + return this.close() + } +} + +class DiffWatcher extends ReadyResource { constructor (bee, range, opts = {}) { super() @@ -1546,6 +1742,14 @@ function sameValue (a, b) { return a === b || (a !== null && b !== null && b4a.equals(a, b)) } +function rangeContains (range, key) { + if (range.gt && (b4a.compare(range.gt, key) >= 0)) return false + if (range.gte && (b4a.compare(range.gte, key) > 0)) return false + if (range.lt && (b4a.compare(range.lt, key) <= 0)) return false + if (range.lte && (b4a.compare(range.lte, key) < 0)) return false + return true +} + function noop () {} module.exports = Hyperbee diff --git a/package.json b/package.json index b25f002..f42aeaa 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,7 @@ "protocol-buffers-encodings": "^1.2.0", "ready-resource": "^1.0.0", "safety-catch": "^1.0.2", + "signal-promise": "^1.0.3", "streamx": "^2.12.4", "xache": "^1.2.1" }, diff --git a/test/watch.js b/test/watch.js index 2cc72e1..3df532b 100644 --- a/test/watch.js +++ b/test/watch.js @@ -701,3 +701,30 @@ test('watch uses the bee`s encodings by default', async function (t) { break } }) + +test('lite watcher triggers on ranges only', async function (t) { + t.plan(1) + + const db = await createRange(50) + + const watcher = db.watch({ gte: '14' }, { lite: true }) + t.teardown(() => watcher.destroy()) + + // + could be simpler but could be a helper for other tests + let next = watcher.next() + let onchange = null + next.then(data => { + next = watcher.next() + onchange(data) + }) + + onchange = () => t.fail('should not trigger changes') + await db.put('13') + await eventFlush() + onchange = null + + onchange = () => t.pass('change') + await db.put('14') + await eventFlush() + onchange = null +})