Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic passive watcher #1

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,41 @@
# Passive Watcher
# Passive Core Watcher

Run conditional logic on a corestore's hypercores when they open. Useful for example when one hypercore being active implies other hypercores should be active too.

```
npm i passive-core-watcher
```

## API

#### `const watcher = new PassiveCoreWatcher(corestore, { watch, open })`

Create a new passive core watcher.

`corestore` is a Corestore

`watch` is a (possibly async) function, returning a boolean for a given `Hypercore.Core` object indicating whether the Hypercore should be watched.

`open` is a (possibly async) function containing the intended side effects for Hypercores which should be watched. Its input is a Hypercore session. It is a weak session, closing when no other sessions exist.

The session emits a `close` event when it is closing. If teardown of the side effects is needed, a listener needs to be defined in the `open` function:

`session.on('close', () => { /* run teardown logic for side effects */ } )`

#### `await watcher.ready()`

Setup the passive watcher, so it starts listening for new hypercores.

This also checks which of the already-opened hypercores need to be watched, and calls the `open` function for those.

#### `await watcher.close()`

Stops watching the corestore for new cores, and closes all weak sessions.

#### `await watcher.ensureTracked(key)`

`key` is a hypercore key (buffer, z32 or hex format).

If the hypercore is open in the corestore, it is added to the watcher lifecycle, and the `open` method is called (does nothing otherwise).

Useful when the `watch` condition might be false when the core first opens, but became true later.
68 changes: 68 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
const ReadyResource = require('ready-resource')
const Hypercore = require('hypercore')
const b4a = require('b4a')
const HypCrypto = require('hypercore-crypto')
const IdEnc = require('hypercore-id-encoding')

class PassiveCoreWatcher extends ReadyResource {
constructor (corestore, { watch, open }) {
super()

this.store = corestore
this.watch = watch
this.open = open

this._oncoreopenBound = this._oncoreopen.bind(this)
this._openCores = new Map()
}

async _open () {
this.store.watch(this._oncoreopenBound)
await Promise.all(
[...this.store.cores.map.values()].map(this._oncoreopenBound)
)
}

async _close () {
this.store.unwatch(this._oncoreopenBound)
await Promise.allSettled([...this._openCores.values()].map(c => c.close()))
}

async _oncoreopen (core) { // not allowed to throw
try {
if (await this.watch(core)) {
await core.ready()
await this.ensureTracked(core.key)
}
} catch (e) {
this.emit('oncoreopen-error', e)
}
}

async ensureTracked (key) {
key = IdEnc.decode(key)

const discKey = b4a.toString(HypCrypto.discoveryKey(key), 'hex')
const core = this.store.cores.get(discKey)
if (!core) return // not in corestore atm (will rerun when oncoreopen runs)
if (this._openCores.has(discKey)) return // Already processed

const session = new Hypercore({ core, weak: true })
this._openCores.set(discKey, session)
// Must be sync up till the line above, for the accounting

await session.ready()
if (session.closing) { // race condition (insta close)
this._openCores.delete(discKey)
return
}

session.on('close', () => {
this._openCores.delete(discKey)
})

await this.open(session)
}
}

module.exports = PassiveCoreWatcher
7 changes: 7 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,12 @@
"homepage": "https://github.com/holepunchto/passive-core-watcher#readme",
"devDependencies": {
"standard": "^17.1.2"
},
"dependencies": {
"b4a": "^1.6.7",
"hypercore": "holepunchto/hypercore#rocksdb",
"hypercore-crypto": "^3.4.2",
"hypercore-id-encoding": "^1.3.0",
"ready-resource": "^1.1.1"
}
}
Loading