From 88ca184ac7fe113159cf42921d631a7abc77bc7f Mon Sep 17 00:00:00 2001 From: Evan Hahn Date: Wed, 17 Jan 2024 17:48:55 +0000 Subject: [PATCH 1/6] CoreIndexStream.prototype.#read --- lib/core-index-stream.js | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/core-index-stream.js b/lib/core-index-stream.js index 3c4f35e..b4f2b9c 100644 --- a/lib/core-index-stream.js +++ b/lib/core-index-stream.js @@ -4,7 +4,6 @@ const Bitfield = require('./bitfield') const { pDefer } = require('./utils') const { promisify } = require('node:util') -const kReadPromise = Symbol('readPromise') const kOpenPromise = Symbol('openPromise') const kDestroyPromise = Symbol('destroyPromise') const kHandleAppend = Symbol('handleAppend') @@ -86,7 +85,7 @@ class CoreIndexStream extends Readable { /** @param {any} cb */ _read(cb) { - this[kReadPromise]().then(cb, cb) + this.#read().then(cb, cb) } _predestroy() { @@ -130,7 +129,7 @@ class CoreIndexStream extends Readable { this.#core.on('download', this[kHandleDownload]) } - async [kReadPromise]() { + async #read() { if (this.#index >= this.#core.length && this.#downloaded.size === 0) { this.#drained = true this.emit('drained') @@ -166,7 +165,7 @@ class CoreIndexStream extends Readable { } if (!didPush && !this.#destroying) { // If nothing was pushed, queue up another read - await this[kReadPromise]() + await this.#read() } await this.#indexedBitfield?.flush() } From 588cebff006460233b416240bf4ae219f3fe86df Mon Sep 17 00:00:00 2001 From: Evan Hahn Date: Wed, 17 Jan 2024 17:51:19 +0000 Subject: [PATCH 2/6] CoreIndexStream: use private methods for all methods --- lib/core-index-stream.js | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/lib/core-index-stream.js b/lib/core-index-stream.js index b4f2b9c..754e1b1 100644 --- a/lib/core-index-stream.js +++ b/lib/core-index-stream.js @@ -4,11 +4,8 @@ const Bitfield = require('./bitfield') const { pDefer } = require('./utils') const { promisify } = require('node:util') -const kOpenPromise = Symbol('openPromise') -const kDestroyPromise = Symbol('destroyPromise') const kHandleAppend = Symbol('handleAppend') const kHandleDownload = Symbol('handleDownload') -const kPushEntry = Symbol('pushEntry') /** @typedef {import('./types').ValueEncoding} ValueEncoding */ /** @typedef {import('./types').JSONValue} JSONValue */ @@ -80,7 +77,7 @@ class CoreIndexStream extends Readable { /** @param {any} cb */ _open(cb) { - this[kOpenPromise]().then(cb, cb) + this.#open().then(cb, cb) } /** @param {any} cb */ @@ -95,7 +92,7 @@ class CoreIndexStream extends Readable { /** @param {any} cb */ _destroy(cb) { - this[kDestroyPromise]().then(cb, cb) + this.#destroy().then(cb, cb) } /** @@ -109,14 +106,14 @@ class CoreIndexStream extends Readable { this.#inProgressBitfield?.set(index, false) } - async [kDestroyPromise]() { + async #destroy() { this.#core.removeListener('append', this[kHandleAppend]) this.#core.removeListener('download', this[kHandleDownload]) await this.#indexedBitfield?.flush() if (this.#storage) await closeStorage(this.#storage) } - async [kOpenPromise]() { + async #open() { await this.#core.ready() await this.#core.update({ wait: true }) const { discoveryKey } = this.#core @@ -141,7 +138,7 @@ class CoreIndexStream extends Readable { let didPush = false this.#readBufferAvailable = true while (this.#readBufferAvailable && this.#index < this.#core.length) { - didPush = (await this[kPushEntry](this.#index)) || didPush + didPush = (await this.#pushEntry(this.#index)) || didPush // Don't increment this until after the async push above this.#index++ } @@ -150,7 +147,7 @@ class CoreIndexStream extends Readable { for (const index of this.#downloaded) { this.#downloaded.delete(index) didPush = - (await this[kPushEntry](index)) || + (await this.#pushEntry(index)) || /* istanbul ignore next - TODO: Test when hypercore-next supports a core.clear() method */ didPush // This is for back-pressure, for which there is not a good test yet. @@ -176,7 +173,7 @@ class CoreIndexStream extends Readable { * @param {number} index * @returns {Promise} */ - async [kPushEntry](index) { + async #pushEntry(index) { const isProcessed = this.#indexedBitfield?.get(index) || this.#inProgressBitfield?.get(index) if (isProcessed) return false From da565f6a75a4199c1159fff6e705211790d5e3ca Mon Sep 17 00:00:00 2001 From: Evan Hahn Date: Wed, 17 Jan 2024 17:58:14 +0000 Subject: [PATCH 3/6] MultiCoreIndexStream: use private methods --- lib/multi-core-index-stream.js | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/lib/multi-core-index-stream.js b/lib/multi-core-index-stream.js index 638be0b..500a0ef 100644 --- a/lib/multi-core-index-stream.js +++ b/lib/multi-core-index-stream.js @@ -13,9 +13,6 @@ const { once } = require('events') * @template {ValueEncoding} [T='binary'] * @typedef {import('./core-index-stream').CoreIndexStream} CoreIndexStream */ -const kReadPromise = Symbol('readPromise') -const kHandleReadable = Symbol('handleReadable') -const kDestroyPromise = Symbol('destroyPromise') const kHandleIndexing = Symbol('handleIndexing') const kHandleDrained = Symbol('handleDrained') @@ -89,7 +86,7 @@ class MultiCoreIndexStream extends Readable { if (this.#streams.has(stream)) return this.#drained = false // Do this so that we can remove this listener when we destroy the stream - const handleReadableFn = this[kHandleReadable].bind(this, stream) + const handleReadableFn = this.#handleReadable.bind(this, stream) this.#streams.set(stream, handleReadableFn) stream.core .ready() @@ -113,7 +110,7 @@ class MultiCoreIndexStream extends Readable { /** @param {any} cb */ _read(cb) { - this[kReadPromise]().then(cb, cb) + this.#read().then(cb, cb) } _predestroy() { @@ -123,10 +120,10 @@ class MultiCoreIndexStream extends Readable { /** @param {any} cb */ _destroy(cb) { - this[kDestroyPromise]().then(cb, cb) + this.#destroy().then(cb, cb) } - async [kDestroyPromise]() { + async #destroy() { const closePromises = [] for (const [stream, handleReadableFn] of this.#streams) { stream.off('readable', handleReadableFn) @@ -138,7 +135,7 @@ class MultiCoreIndexStream extends Readable { await Promise.all(closePromises) } - async [kReadPromise]() { + async #read() { let didPush = false if (!this.#readable.size && !this.#destroying) { await (this.#pending = pDefer()).promise @@ -156,12 +153,12 @@ class MultiCoreIndexStream extends Readable { } if (!didPush && !this.#destroying) { // If nothing was pushed, queue up another read - await this[kReadPromise]() + await this.#read() } } /** @param {CoreIndexStream} stream */ - [kHandleReadable](stream) { + #handleReadable(stream) { this.#readable.add(stream) this.#pending.resolve() } From 940d375227dfd034503242a712440dad286d09df Mon Sep 17 00:00:00 2001 From: Evan Hahn Date: Wed, 17 Jan 2024 18:00:29 +0000 Subject: [PATCH 4/6] MultiCoreIndexer: use private methods --- index.js | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/index.js b/index.js index 73ac91e..d928100 100644 --- a/index.js +++ b/index.js @@ -12,9 +12,6 @@ const DEFAULT_BATCH_SIZE = 100 // The indexing rate (in entries per second) is calculated as an exponential // moving average. A factor > 1 will put more weight on previous values. const MOVING_AVG_FACTOR = 5 -const kHandleEntries = Symbol('handleEntries') -const kEmitState = Symbol('emitState') -const kGetState = Symbol('getState') /** @typedef {string | ((name: string) => import('random-access-storage'))} StorageParam */ /** @typedef {import('./lib/types').ValueEncoding} ValueEncoding */ @@ -66,15 +63,14 @@ class MultiCoreIndexer extends TypedEmitter { this.#writeStream = /** @type {Writable>} */ ( new Writable({ writev: (entries, cb) => { - // @ts-ignore - I don't know why TS does not like this - this[kHandleEntries](entries).then(() => cb(), cb) + this.#handleEntries(entries).then(() => cb(), cb) }, highWaterMark: maxBatch, byteLength: () => 1, }) ) this.#indexStream.pipe(this.#writeStream) - this.#emitStateBound = this[kEmitState].bind(this) + this.#emitStateBound = this.#emitState.bind(this) // This is needed because the source streams can start indexing before this // stream starts reading data. This ensures that the indexing state is // emitted when the source cores first append / download data @@ -88,7 +84,7 @@ class MultiCoreIndexer extends TypedEmitter { * @type {IndexState} */ get state() { - return this[kGetState]() + return this.#getState() } /** @@ -104,7 +100,7 @@ class MultiCoreIndexer extends TypedEmitter { * Resolves when indexing state is 'idle' */ async idle() { - if (this[kGetState]().current === 'idle') return + if (this.#getState().current === 'idle') return if (!this.#pendingIdle) { this.#pendingIdle = pDefer() } @@ -123,8 +119,8 @@ class MultiCoreIndexer extends TypedEmitter { } /** @param {Entry[]} entries */ - async [kHandleEntries](entries) { - this[kEmitState]() + async #handleEntries(entries) { + this.#emitState() /* istanbul ignore if - not sure this is necessary, but better safe than sorry */ if (!entries.length) return await this.#batch(entries) @@ -140,11 +136,11 @@ class MultiCoreIndexer extends TypedEmitter { // Set this at the end of batch rather than start so the timing also // includes the reads from the index streams this.#rateMeasurementStart = Date.now() - this[kEmitState]() + this.#emitState() } - [kEmitState]() { - const state = this[kGetState]() + #emitState() { + const state = this.#getState() if (state.current !== this.#prevEmittedState?.current) { this.emit(state.current) } @@ -155,7 +151,7 @@ class MultiCoreIndexer extends TypedEmitter { this.#prevEmittedState = state } - [kGetState]() { + #getState() { const remaining = this.#indexStream.remaining const drained = this.#indexStream.drained const prevState = this.#state From 141b463335b5f3b28c927b3a18e71c99954eb7aa Mon Sep 17 00:00:00 2001 From: Evan Hahn Date: Wed, 17 Jan 2024 18:16:30 +0000 Subject: [PATCH 5/6] CoreIndexStream: remove all symbol keys --- lib/core-index-stream.js | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/lib/core-index-stream.js b/lib/core-index-stream.js index 754e1b1..6ef9615 100644 --- a/lib/core-index-stream.js +++ b/lib/core-index-stream.js @@ -4,9 +4,6 @@ const Bitfield = require('./bitfield') const { pDefer } = require('./utils') const { promisify } = require('node:util') -const kHandleAppend = Symbol('handleAppend') -const kHandleDownload = Symbol('handleDownload') - /** @typedef {import('./types').ValueEncoding} ValueEncoding */ /** @typedef {import('./types').JSONValue} JSONValue */ /** @@ -26,6 +23,8 @@ const kHandleDownload = Symbol('handleDownload') * @extends {Readable, Entry, Entry, true, false, import('./types').IndexStreamEvents>>} */ class CoreIndexStream extends Readable { + #handleAppendBound + #handleDownloadBound /** @type {Bitfield | undefined} */ #indexedBitfield /** @type {Bitfield | undefined} */ @@ -57,8 +56,8 @@ class CoreIndexStream extends Readable { }) this.#core = core this.#createStorage = createStorage - this[kHandleAppend] = this[kHandleAppend].bind(this) - this[kHandleDownload] = this[kHandleDownload].bind(this) + this.#handleAppendBound = this.#handleAppend.bind(this) + this.#handleDownloadBound = this.#handleDownload.bind(this) } get remaining() { @@ -107,8 +106,8 @@ class CoreIndexStream extends Readable { } async #destroy() { - this.#core.removeListener('append', this[kHandleAppend]) - this.#core.removeListener('download', this[kHandleDownload]) + this.#core.removeListener('append', this.#handleAppendBound) + this.#core.removeListener('download', this.#handleDownloadBound) await this.#indexedBitfield?.flush() if (this.#storage) await closeStorage(this.#storage) } @@ -122,8 +121,8 @@ class CoreIndexStream extends Readable { this.#storage = this.#createStorage(getStorageName(discoveryKey)) this.#indexedBitfield = await Bitfield.open(this.#storage) this.#inProgressBitfield = await new Bitfield() - this.#core.on('append', this[kHandleAppend]) - this.#core.on('download', this[kHandleDownload]) + this.#core.on('append', this.#handleAppendBound) + this.#core.on('download', this.#handleDownloadBound) } async #read() { @@ -188,14 +187,14 @@ class CoreIndexStream extends Readable { return true } - async [kHandleAppend]() { + async #handleAppend() { this.#pending.resolve() } /** * @param {number} index */ - async [kHandleDownload](index) { + async #handleDownload(index) { this.#downloaded.add(index) this.#pending.resolve() } From 387c873a8577022c47029dcbff0ef64a96016645 Mon Sep 17 00:00:00 2001 From: Evan Hahn Date: Wed, 17 Jan 2024 18:18:53 +0000 Subject: [PATCH 6/6] MultiCoreIndexStream: remove all symbol keys --- lib/multi-core-index-stream.js | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/multi-core-index-stream.js b/lib/multi-core-index-stream.js index 500a0ef..51fd0cd 100644 --- a/lib/multi-core-index-stream.js +++ b/lib/multi-core-index-stream.js @@ -13,14 +13,14 @@ const { once } = require('events') * @template {ValueEncoding} [T='binary'] * @typedef {import('./core-index-stream').CoreIndexStream} CoreIndexStream */ -const kHandleIndexing = Symbol('handleIndexing') -const kHandleDrained = Symbol('handleDrained') /** * @template {ValueEncoding} [T='binary'] * @extends {Readable, Entry, Entry, true, false, import('./types').IndexStreamEvents>>} */ class MultiCoreIndexStream extends Readable { + #handleIndexingBound + #handleDrainedBound /** @type {Map, () => void>} */ #streams = new Map() /** @type {Map>} */ @@ -46,8 +46,8 @@ class MultiCoreIndexStream extends Readable { byteLength: () => 1, }) this.#drained = streams.length === 0 - this[kHandleIndexing] = this[kHandleIndexing].bind(this) - this[kHandleDrained] = this[kHandleDrained].bind(this) + this.#handleIndexingBound = this.#handleIndexing.bind(this) + this.#handleDrainedBound = this.#handleDrained.bind(this) for (const s of streams) { this.addStream(s) } @@ -99,8 +99,8 @@ class MultiCoreIndexStream extends Readable { .catch(noop) this.#readable.add(stream) stream.on('readable', handleReadableFn) - stream.on('indexing', this[kHandleIndexing]) - stream.on('drained', this[kHandleDrained]) + stream.on('indexing', this.#handleIndexingBound) + stream.on('drained', this.#handleDrainedBound) } /** @param {any} cb */ @@ -127,8 +127,8 @@ class MultiCoreIndexStream extends Readable { const closePromises = [] for (const [stream, handleReadableFn] of this.#streams) { stream.off('readable', handleReadableFn) - stream.off('indexing', this[kHandleIndexing]) - stream.off('drained', this[kHandleDrained]) + stream.off('indexing', this.#handleIndexingBound) + stream.off('drained', this.#handleDrainedBound) stream.destroy() closePromises.push(once(stream, 'close')) } @@ -167,13 +167,13 @@ class MultiCoreIndexStream extends Readable { // `indexing` event always fires at the start of indexing in the chain of // streams (the `drained` event should happen at the end of the chain once // everything is read) - [kHandleIndexing]() { + #handleIndexing() { if (!this.#drained) return this.#drained = false this.emit('indexing') } - [kHandleDrained]() { + #handleDrained() { let drained = true for (const stream of this.#streams.keys()) { if (!stream.drained) drained = false