Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(breaking) Sync lifecycle #2

Merged
merged 7 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ npm i passive-core-watcher

#### `const watcher = new PassiveCoreWatcher(corestore, { watch, open })`

Create a new passive core watcher.
Create a new passive core watcher, and start watching new cores. Existing cores are also processed (e.g. `open` will be called for all existing cores for which `watch` returns true).

`corestore` is a Corestore

Expand All @@ -22,13 +22,7 @@ The session emits a `close` event when it is closing. If teardown of the side ef

`session.on('close', () => { /* run teardown logic for side effects */ } )`

#### `await watcher.ready()`

Setup the passive watcher, so it starts listening for new hypercores.

This also checks which of the already-opened hypercores need to be watched, and calls the `open` function for those.

#### `await watcher.close()`
#### `watcher.destroy()`

Stops watching the corestore for new cores, and closes all weak sessions.

Expand Down
29 changes: 18 additions & 11 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,38 @@
const ReadyResource = require('ready-resource')
const { EventEmitter } = require('events')
const Hypercore = require('hypercore')
const b4a = require('b4a')
const HypCrypto = require('hypercore-crypto')
const IdEnc = require('hypercore-id-encoding')
const safetyCatch = require('safety-catch')

class PassiveCoreWatcher extends ReadyResource {
class PassiveCoreWatcher extends EventEmitter {
constructor (corestore, { watch, open }) {
super()

this.store = corestore
this.watch = watch
this.open = open

this._oncoreopenBound = this._oncoreopen.bind(this)
this._openCores = new Map()
}

async _open () {
this.store.watch(this._oncoreopenBound)
await Promise.all(
[...this.store.cores.map.values()].map(this._oncoreopenBound)
)
this.destroyed = false

// Give time to add event handlers
queueMicrotask(() => {
if (this.destroyed) return
this.store.watch(this._oncoreopenBound)
for (const core of this.store.cores.map.values()) {
this._oncoreopenBound(core) // never rejects
}
})
}

async _close () {
destroy () {
this.destroyed = true
this.store.unwatch(this._oncoreopenBound)
await Promise.allSettled([...this._openCores.values()].map(c => c.close()))
for (const core of this._openCores.values()) {
core.close().catch(safetyCatch)
}
}

async _oncoreopen (core) { // not allowed to throw
Expand Down
16 changes: 13 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "Run conditional logic on a corestore's hypercores",
"main": "index.js",
"scripts": {
"test": "standard"
"test": "standard && brittle test.js"
},
"repository": {
"type": "git",
Expand All @@ -13,6 +13,12 @@
"files": [
"index.js"
],
"imports": {
"events": {
"bare": "bare-events",
"default": "events"
}
},
"keywords": [
"Corestore",
"hypercore"
Expand All @@ -24,13 +30,17 @@
},
"homepage": "https://github.com/holepunchto/passive-core-watcher#readme",
"devDependencies": {
"standard": "^17.1.2"
"brittle": "^3.10.0",
"corestore": "^7.0.0",
"standard": "^17.1.2",
"test-tmp": "^1.4.0"
},
"dependencies": {
"b4a": "^1.6.7",
"bare-events": "^2.5.4",
"hypercore": "^11.0.0",
"hypercore-crypto": "^3.4.2",
"hypercore-id-encoding": "^1.3.0",
"ready-resource": "^1.1.1"
"safety-catch": "^1.0.2"
}
}
78 changes: 78 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
const test = require('brittle')
const Corestore = require('corestore')
const tmpDir = require('test-tmp')
const b4a = require('b4a')

const PassiveCoreWatcher = require('.')

test('basic watcher', async (t) => {
const store = new Corestore(await tmpDir())

const watching = []
const watch = () => true
const open = (weakSession) => {
watching.push(weakSession)
weakSession.on('close', () => {
watching.pop(weakSession)
})
}
const watcher = new PassiveCoreWatcher(store, { watch, open })

const core = store.get({ name: 'c1' })
const core2 = store.get({ name: 'c2' })
await Promise.all([core.ready(), core2.ready()])

t.is(watching.length, 2, 'watching 2 cores')

await core.close()

// Sync, but needs event loop to trigger
await new Promise(resolve => setImmediate(resolve))
t.is(watching.length, 1, 'core close triggered')

watcher.destroy()

await new Promise(resolve => setImmediate(resolve))
t.is(watching.length, 0, 'weakSessions close when watcher closes')
t.is(core2.closed, false, 'Core itself did not close')
})

test('processes already-opened cores', async (t) => {
const store = new Corestore(await tmpDir())
const core = store.get({ name: 'c1' })
const core2 = store.get({ name: 'c2' })
await Promise.all([core.ready(), core2.ready()])

const watching = []
const watch = () => true
const open = (weakSession) => {
watching.push(weakSession)
}
const watcher = new PassiveCoreWatcher(store, { watch, open })

await new Promise(resolve => setImmediate(resolve))
t.is(watching.length, 2)

watcher.destroy()
})

test('does not process cores for which watch returns false', async (t) => {
const store = new Corestore(await tmpDir())
const core = store.get({ name: 'c1' })
const core2 = store.get({ name: 'c2' })

await Promise.all([core.ready(), core2.ready()])

const watching = []
const watch = (c) => b4a.equals(c.key, core.key)

const open = (weakSession) => {
watching.push(weakSession)
}
const watcher = new PassiveCoreWatcher(store, { watch, open })

await new Promise(resolve => setImmediate(resolve))
t.alike(watching.map(c => c.key), [core.key])

watcher.destroy()
})
Loading