diff --git a/index.js b/index.js index 73ac91e..d928100 100644 --- a/index.js +++ b/index.js @@ -12,9 +12,6 @@ const DEFAULT_BATCH_SIZE = 100 // The indexing rate (in entries per second) is calculated as an exponential // moving average. A factor > 1 will put more weight on previous values. const MOVING_AVG_FACTOR = 5 -const kHandleEntries = Symbol('handleEntries') -const kEmitState = Symbol('emitState') -const kGetState = Symbol('getState') /** @typedef {string | ((name: string) => import('random-access-storage'))} StorageParam */ /** @typedef {import('./lib/types').ValueEncoding} ValueEncoding */ @@ -66,15 +63,14 @@ class MultiCoreIndexer extends TypedEmitter { this.#writeStream = /** @type {Writable>} */ ( new Writable({ writev: (entries, cb) => { - // @ts-ignore - I don't know why TS does not like this - this[kHandleEntries](entries).then(() => cb(), cb) + this.#handleEntries(entries).then(() => cb(), cb) }, highWaterMark: maxBatch, byteLength: () => 1, }) ) this.#indexStream.pipe(this.#writeStream) - this.#emitStateBound = this[kEmitState].bind(this) + this.#emitStateBound = this.#emitState.bind(this) // This is needed because the source streams can start indexing before this // stream starts reading data. This ensures that the indexing state is // emitted when the source cores first append / download data @@ -88,7 +84,7 @@ class MultiCoreIndexer extends TypedEmitter { * @type {IndexState} */ get state() { - return this[kGetState]() + return this.#getState() } /** @@ -104,7 +100,7 @@ class MultiCoreIndexer extends TypedEmitter { * Resolves when indexing state is 'idle' */ async idle() { - if (this[kGetState]().current === 'idle') return + if (this.#getState().current === 'idle') return if (!this.#pendingIdle) { this.#pendingIdle = pDefer() } @@ -123,8 +119,8 @@ class MultiCoreIndexer extends TypedEmitter { } /** @param {Entry[]} entries */ - async [kHandleEntries](entries) { - this[kEmitState]() + async #handleEntries(entries) { + this.#emitState() /* istanbul ignore if - not sure this is necessary, but better safe than sorry */ if (!entries.length) return await this.#batch(entries) @@ -140,11 +136,11 @@ class MultiCoreIndexer extends TypedEmitter { // Set this at the end of batch rather than start so the timing also // includes the reads from the index streams this.#rateMeasurementStart = Date.now() - this[kEmitState]() + this.#emitState() } - [kEmitState]() { - const state = this[kGetState]() + #emitState() { + const state = this.#getState() if (state.current !== this.#prevEmittedState?.current) { this.emit(state.current) } @@ -155,7 +151,7 @@ class MultiCoreIndexer extends TypedEmitter { this.#prevEmittedState = state } - [kGetState]() { + #getState() { const remaining = this.#indexStream.remaining const drained = this.#indexStream.drained const prevState = this.#state diff --git a/lib/core-index-stream.js b/lib/core-index-stream.js index 3c4f35e..6ef9615 100644 --- a/lib/core-index-stream.js +++ b/lib/core-index-stream.js @@ -4,13 +4,6 @@ const Bitfield = require('./bitfield') const { pDefer } = require('./utils') const { promisify } = require('node:util') -const kReadPromise = Symbol('readPromise') -const kOpenPromise = Symbol('openPromise') -const kDestroyPromise = Symbol('destroyPromise') -const kHandleAppend = Symbol('handleAppend') -const kHandleDownload = Symbol('handleDownload') -const kPushEntry = Symbol('pushEntry') - /** @typedef {import('./types').ValueEncoding} ValueEncoding */ /** @typedef {import('./types').JSONValue} JSONValue */ /** @@ -30,6 +23,8 @@ const kPushEntry = Symbol('pushEntry') * @extends {Readable, Entry, Entry, true, false, import('./types').IndexStreamEvents>>} */ class CoreIndexStream extends Readable { + #handleAppendBound + #handleDownloadBound /** @type {Bitfield | undefined} */ #indexedBitfield /** @type {Bitfield | undefined} */ @@ -61,8 +56,8 @@ class CoreIndexStream extends Readable { }) this.#core = core this.#createStorage = createStorage - this[kHandleAppend] = this[kHandleAppend].bind(this) - this[kHandleDownload] = this[kHandleDownload].bind(this) + this.#handleAppendBound = this.#handleAppend.bind(this) + this.#handleDownloadBound = this.#handleDownload.bind(this) } get remaining() { @@ -81,12 +76,12 @@ class CoreIndexStream extends Readable { /** @param {any} cb */ _open(cb) { - this[kOpenPromise]().then(cb, cb) + this.#open().then(cb, cb) } /** @param {any} cb */ _read(cb) { - this[kReadPromise]().then(cb, cb) + this.#read().then(cb, cb) } _predestroy() { @@ -96,7 +91,7 @@ class CoreIndexStream extends Readable { /** @param {any} cb */ _destroy(cb) { - this[kDestroyPromise]().then(cb, cb) + this.#destroy().then(cb, cb) } /** @@ -110,14 +105,14 @@ class CoreIndexStream extends Readable { this.#inProgressBitfield?.set(index, false) } - async [kDestroyPromise]() { - this.#core.removeListener('append', this[kHandleAppend]) - this.#core.removeListener('download', this[kHandleDownload]) + async #destroy() { + this.#core.removeListener('append', this.#handleAppendBound) + this.#core.removeListener('download', this.#handleDownloadBound) await this.#indexedBitfield?.flush() if (this.#storage) await closeStorage(this.#storage) } - async [kOpenPromise]() { + async #open() { await this.#core.ready() await this.#core.update({ wait: true }) const { discoveryKey } = this.#core @@ -126,11 +121,11 @@ class CoreIndexStream extends Readable { this.#storage = this.#createStorage(getStorageName(discoveryKey)) this.#indexedBitfield = await Bitfield.open(this.#storage) this.#inProgressBitfield = await new Bitfield() - this.#core.on('append', this[kHandleAppend]) - this.#core.on('download', this[kHandleDownload]) + this.#core.on('append', this.#handleAppendBound) + this.#core.on('download', this.#handleDownloadBound) } - async [kReadPromise]() { + async #read() { if (this.#index >= this.#core.length && this.#downloaded.size === 0) { this.#drained = true this.emit('drained') @@ -142,7 +137,7 @@ class CoreIndexStream extends Readable { let didPush = false this.#readBufferAvailable = true while (this.#readBufferAvailable && this.#index < this.#core.length) { - didPush = (await this[kPushEntry](this.#index)) || didPush + didPush = (await this.#pushEntry(this.#index)) || didPush // Don't increment this until after the async push above this.#index++ } @@ -151,7 +146,7 @@ class CoreIndexStream extends Readable { for (const index of this.#downloaded) { this.#downloaded.delete(index) didPush = - (await this[kPushEntry](index)) || + (await this.#pushEntry(index)) || /* istanbul ignore next - TODO: Test when hypercore-next supports a core.clear() method */ didPush // This is for back-pressure, for which there is not a good test yet. @@ -166,7 +161,7 @@ class CoreIndexStream extends Readable { } if (!didPush && !this.#destroying) { // If nothing was pushed, queue up another read - await this[kReadPromise]() + await this.#read() } await this.#indexedBitfield?.flush() } @@ -177,7 +172,7 @@ class CoreIndexStream extends Readable { * @param {number} index * @returns {Promise} */ - async [kPushEntry](index) { + async #pushEntry(index) { const isProcessed = this.#indexedBitfield?.get(index) || this.#inProgressBitfield?.get(index) if (isProcessed) return false @@ -192,14 +187,14 @@ class CoreIndexStream extends Readable { return true } - async [kHandleAppend]() { + async #handleAppend() { this.#pending.resolve() } /** * @param {number} index */ - async [kHandleDownload](index) { + async #handleDownload(index) { this.#downloaded.add(index) this.#pending.resolve() } diff --git a/lib/multi-core-index-stream.js b/lib/multi-core-index-stream.js index 638be0b..51fd0cd 100644 --- a/lib/multi-core-index-stream.js +++ b/lib/multi-core-index-stream.js @@ -13,17 +13,14 @@ const { once } = require('events') * @template {ValueEncoding} [T='binary'] * @typedef {import('./core-index-stream').CoreIndexStream} CoreIndexStream */ -const kReadPromise = Symbol('readPromise') -const kHandleReadable = Symbol('handleReadable') -const kDestroyPromise = Symbol('destroyPromise') -const kHandleIndexing = Symbol('handleIndexing') -const kHandleDrained = Symbol('handleDrained') /** * @template {ValueEncoding} [T='binary'] * @extends {Readable, Entry, Entry, true, false, import('./types').IndexStreamEvents>>} */ class MultiCoreIndexStream extends Readable { + #handleIndexingBound + #handleDrainedBound /** @type {Map, () => void>} */ #streams = new Map() /** @type {Map>} */ @@ -49,8 +46,8 @@ class MultiCoreIndexStream extends Readable { byteLength: () => 1, }) this.#drained = streams.length === 0 - this[kHandleIndexing] = this[kHandleIndexing].bind(this) - this[kHandleDrained] = this[kHandleDrained].bind(this) + this.#handleIndexingBound = this.#handleIndexing.bind(this) + this.#handleDrainedBound = this.#handleDrained.bind(this) for (const s of streams) { this.addStream(s) } @@ -89,7 +86,7 @@ class MultiCoreIndexStream extends Readable { if (this.#streams.has(stream)) return this.#drained = false // Do this so that we can remove this listener when we destroy the stream - const handleReadableFn = this[kHandleReadable].bind(this, stream) + const handleReadableFn = this.#handleReadable.bind(this, stream) this.#streams.set(stream, handleReadableFn) stream.core .ready() @@ -102,8 +99,8 @@ class MultiCoreIndexStream extends Readable { .catch(noop) this.#readable.add(stream) stream.on('readable', handleReadableFn) - stream.on('indexing', this[kHandleIndexing]) - stream.on('drained', this[kHandleDrained]) + stream.on('indexing', this.#handleIndexingBound) + stream.on('drained', this.#handleDrainedBound) } /** @param {any} cb */ @@ -113,7 +110,7 @@ class MultiCoreIndexStream extends Readable { /** @param {any} cb */ _read(cb) { - this[kReadPromise]().then(cb, cb) + this.#read().then(cb, cb) } _predestroy() { @@ -123,22 +120,22 @@ class MultiCoreIndexStream extends Readable { /** @param {any} cb */ _destroy(cb) { - this[kDestroyPromise]().then(cb, cb) + this.#destroy().then(cb, cb) } - async [kDestroyPromise]() { + async #destroy() { const closePromises = [] for (const [stream, handleReadableFn] of this.#streams) { stream.off('readable', handleReadableFn) - stream.off('indexing', this[kHandleIndexing]) - stream.off('drained', this[kHandleDrained]) + stream.off('indexing', this.#handleIndexingBound) + stream.off('drained', this.#handleDrainedBound) stream.destroy() closePromises.push(once(stream, 'close')) } await Promise.all(closePromises) } - async [kReadPromise]() { + async #read() { let didPush = false if (!this.#readable.size && !this.#destroying) { await (this.#pending = pDefer()).promise @@ -156,12 +153,12 @@ class MultiCoreIndexStream extends Readable { } if (!didPush && !this.#destroying) { // If nothing was pushed, queue up another read - await this[kReadPromise]() + await this.#read() } } /** @param {CoreIndexStream} stream */ - [kHandleReadable](stream) { + #handleReadable(stream) { this.#readable.add(stream) this.#pending.resolve() } @@ -170,13 +167,13 @@ class MultiCoreIndexStream extends Readable { // `indexing` event always fires at the start of indexing in the chain of // streams (the `drained` event should happen at the end of the chain once // everything is read) - [kHandleIndexing]() { + #handleIndexing() { if (!this.#drained) return this.#drained = false this.emit('indexing') } - [kHandleDrained]() { + #handleDrained() { let drained = true for (const stream of this.#streams.keys()) { if (!stream.drained) drained = false