From c527116022799cae65bd357346e13a88c03f557a Mon Sep 17 00:00:00 2001 From: HDegroote <75906619+HDegroote@users.noreply.github.com> Date: Fri, 24 Jan 2025 12:27:59 +0100 Subject: [PATCH 1/7] Sync lifecycle (simplify) --- index.js | 20 +++++++++++--------- package.json | 1 - 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/index.js b/index.js index 818df91..c971bdd 100644 --- a/index.js +++ b/index.js @@ -1,13 +1,13 @@ -const ReadyResource = require('ready-resource') +const { EventEmitter } = require('events') const Hypercore = require('hypercore') const b4a = require('b4a') const HypCrypto = require('hypercore-crypto') const IdEnc = require('hypercore-id-encoding') +const safetyCatch = require('safety-catch') -class PassiveCoreWatcher extends ReadyResource { +class PassiveCoreWatcher extends EventEmitter { constructor (corestore, { watch, open }) { super() - this.store = corestore this.watch = watch this.open = open @@ -16,16 +16,18 @@ class PassiveCoreWatcher extends ReadyResource { this._openCores = new Map() } - async _open () { + ready () { this.store.watch(this._oncoreopenBound) - await Promise.all( - [...this.store.cores.map.values()].map(this._oncoreopenBound) - ) + for (const core of this.store.cores.map.values()) { + this._oncoreopenBound(core) // never rejects + } } - async _close () { + close () { this.store.unwatch(this._oncoreopenBound) - await Promise.allSettled([...this._openCores.values()].map(c => c.close())) + for (const core of this._openCores.values()) { + core.close().catch(safetyCatch) + } } async _oncoreopen (core) { // not allowed to throw diff --git a/package.json b/package.json index 6213b3e..9629f1b 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,5 @@ "hypercore": "^11.0.0", "hypercore-crypto": "^3.4.2", "hypercore-id-encoding": "^1.3.0", - "ready-resource": "^1.1.1" } } From cc19e79f0d01a318a18f70452c300fc4403d08e1 Mon Sep 17 00:00:00 2001 From: HDegroote <75906619+HDegroote@users.noreply.github.com> Date: Fri, 24 Jan 2025 12:35:10 +0100 Subject: [PATCH 2/7] Add safety catch dep --- package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/package.json b/package.json index 9629f1b..afc3941 100644 --- a/package.json +++ b/package.json @@ -31,5 +31,6 @@ "hypercore": "^11.0.0", "hypercore-crypto": "^3.4.2", "hypercore-id-encoding": "^1.3.0", + "safety-catch": "^1.0.2" } } From 228f09d8f5319368fb783ae4fdcee39caf815387 Mon Sep 17 00:00:00 2001 From: HDegroote <75906619+HDegroote@users.noreply.github.com> Date: Fri, 24 Jan 2025 13:55:35 +0100 Subject: [PATCH 3/7] Setup in constructor --- README.md | 10 ++-------- index.js | 4 +--- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 2768c53..4fe19c7 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ npm i passive-core-watcher #### `const watcher = new PassiveCoreWatcher(corestore, { watch, open })` -Create a new passive core watcher. +Create a new passive core watcher, and start watching new cores. Existing cores are also processed (e.g. `open` will be called for all existing cores for which `watch` returns true). `corestore` is a Corestore @@ -22,13 +22,7 @@ The session emits a `close` event when it is closing. If teardown of the side ef `session.on('close', () => { /* run teardown logic for side effects */ } )` -#### `await watcher.ready()` - -Setup the passive watcher, so it starts listening for new hypercores. - -This also checks which of the already-opened hypercores need to be watched, and calls the `open` function for those. - -#### `await watcher.close()` +#### `watcher.destroy()` Stops watching the corestore for new cores, and closes all weak sessions. diff --git a/index.js b/index.js index c971bdd..c2f4757 100644 --- a/index.js +++ b/index.js @@ -14,16 +14,14 @@ class PassiveCoreWatcher extends EventEmitter { this._oncoreopenBound = this._oncoreopen.bind(this) this._openCores = new Map() - } - ready () { this.store.watch(this._oncoreopenBound) for (const core of this.store.cores.map.values()) { this._oncoreopenBound(core) // never rejects } } - close () { + destroy () { this.store.unwatch(this._oncoreopenBound) for (const core of this._openCores.values()) { core.close().catch(safetyCatch) From bbc3db744382525d716e2ca4ed28604c90c22bc3 Mon Sep 17 00:00:00 2001 From: HDegroote <75906619+HDegroote@users.noreply.github.com> Date: Fri, 24 Jan 2025 14:50:56 +0100 Subject: [PATCH 4/7] Add basic tests --- package.json | 7 +++-- test.js | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 test.js diff --git a/package.json b/package.json index afc3941..4fe4445 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "description": "Run conditional logic on a corestore's hypercores", "main": "index.js", "scripts": { - "test": "standard" + "test": "standard && brittle test.js" }, "repository": { "type": "git", @@ -24,7 +24,10 @@ }, "homepage": "https://github.com/holepunchto/passive-core-watcher#readme", "devDependencies": { - "standard": "^17.1.2" + "brittle": "^3.10.0", + "corestore": "^7.0.0", + "standard": "^17.1.2", + "test-tmp": "^1.4.0" }, "dependencies": { "b4a": "^1.6.7", diff --git a/test.js b/test.js new file mode 100644 index 0000000..f299a5e --- /dev/null +++ b/test.js @@ -0,0 +1,78 @@ +const test = require('brittle') +const Corestore = require('corestore') +const tmpDir = require('test-tmp') +const b4a = require('b4a') + +const PassiveCoreWatcher = require('.') + +test('basic watcher', async (t) => { + const store = new Corestore(await tmpDir()) + + const watching = [] + const watch = () => true + const open = (weakSession) => { + watching.push(weakSession) + weakSession.on('close', () => { + watching.pop(weakSession) + }) + } + const watcher = new PassiveCoreWatcher(store, { watch, open }) + + const core = store.get({ name: 'c1' }) + const core2 = store.get({ name: 'c2' }) + await Promise.all([core.ready(), core2.ready()]) + + t.is(watching.length, 2, 'watching 2 cores') + + await core.close() + + // Sync, but needs event loop to trigger + await new Promise(resolve => setImmediate(resolve)) + t.is(watching.length, 1, 'core close triggered') + + watcher.destroy() + + await new Promise(resolve => setImmediate(resolve)) + t.is(watching.length, 0, 'weakSessions close when watcher closes') + t.is(core2.closed, false, 'Core itself did not close') +}) + +test('processes already-opened cores', async (t) => { + const store = new Corestore(await tmpDir()) + const core = store.get({ name: 'c1' }) + const core2 = store.get({ name: 'c2' }) + await Promise.all([core.ready(), core2.ready()]) + + const watching = [] + const watch = () => true + const open = (weakSession) => { + watching.push(weakSession) + } + const watcher = new PassiveCoreWatcher(store, { watch, open }) + + await new Promise(resolve => setImmediate(resolve)) + t.is(watching.length, 2) + + watcher.destroy() +}) + +test('does not process cores for which watch returns false', async (t) => { + const store = new Corestore(await tmpDir()) + const core = store.get({ name: 'c1' }) + const core2 = store.get({ name: 'c2' }) + + await Promise.all([core.ready(), core2.ready()]) + + const watching = [] + const watch = (c) => b4a.equals(c.key, core.key) + + const open = (weakSession) => { + watching.push(weakSession) + } + const watcher = new PassiveCoreWatcher(store, { watch, open }) + + await new Promise(resolve => setImmediate(resolve)) + t.alike(watching.map(c => c.key), [core.key]) + + watcher.destroy() +}) From d8a0559542ba4695b3fe9bd38251dbd61d5f704e Mon Sep 17 00:00:00 2001 From: HDegroote <75906619+HDegroote@users.noreply.github.com> Date: Fri, 24 Jan 2025 14:55:32 +0100 Subject: [PATCH 5/7] Wait a tick to process existing cores --- index.js | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/index.js b/index.js index c2f4757..d332ea3 100644 --- a/index.js +++ b/index.js @@ -15,10 +15,13 @@ class PassiveCoreWatcher extends EventEmitter { this._oncoreopenBound = this._oncoreopen.bind(this) this._openCores = new Map() - this.store.watch(this._oncoreopenBound) - for (const core of this.store.cores.map.values()) { - this._oncoreopenBound(core) // never rejects - } + // Give time to add event handlers + setImmediate(() => { + this.store.watch(this._oncoreopenBound) + for (const core of this.store.cores.map.values()) { + this._oncoreopenBound(core) // never rejects + } + }) } destroy () { From 14e79d76b038bbd614339f6c6360a9a82f703264 Mon Sep 17 00:00:00 2001 From: HDegroote <75906619+HDegroote@users.noreply.github.com> Date: Sun, 26 Jan 2025 14:04:20 +0100 Subject: [PATCH 6/7] Fix state: use queueMicroTask and check if destroyed --- index.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index d332ea3..308bf86 100644 --- a/index.js +++ b/index.js @@ -15,8 +15,11 @@ class PassiveCoreWatcher extends EventEmitter { this._oncoreopenBound = this._oncoreopen.bind(this) this._openCores = new Map() + this.destroyed = false + // Give time to add event handlers - setImmediate(() => { + queueMicrotask(() => { + if (this.destroyed) return this.store.watch(this._oncoreopenBound) for (const core of this.store.cores.map.values()) { this._oncoreopenBound(core) // never rejects @@ -25,6 +28,7 @@ class PassiveCoreWatcher extends EventEmitter { } destroy () { + this.destroyed = true this.store.unwatch(this._oncoreopenBound) for (const core of this._openCores.values()) { core.close().catch(safetyCatch) From 45b44b23a119f286ddffff623f4df907bdef07e5 Mon Sep 17 00:00:00 2001 From: HDegroote <75906619+HDegroote@users.noreply.github.com> Date: Mon, 27 Jan 2025 11:12:05 +0100 Subject: [PATCH 7/7] Add bare-events import map --- package.json | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/package.json b/package.json index 4fe4445..917cfde 100644 --- a/package.json +++ b/package.json @@ -13,6 +13,12 @@ "files": [ "index.js" ], + "imports": { + "events": { + "bare": "bare-events", + "default": "events" + } + }, "keywords": [ "Corestore", "hypercore" @@ -31,6 +37,7 @@ }, "dependencies": { "b4a": "^1.6.7", + "bare-events": "^2.5.4", "hypercore": "^11.0.0", "hypercore-crypto": "^3.4.2", "hypercore-id-encoding": "^1.3.0",