Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use private methods instead of symbol keys #30

Merged
merged 6 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ const DEFAULT_BATCH_SIZE = 100
// The indexing rate (in entries per second) is calculated as an exponential
// moving average. A factor > 1 will put more weight on previous values.
const MOVING_AVG_FACTOR = 5
const kHandleEntries = Symbol('handleEntries')
const kEmitState = Symbol('emitState')
const kGetState = Symbol('getState')

/** @typedef {string | ((name: string) => import('random-access-storage'))} StorageParam */
/** @typedef {import('./lib/types').ValueEncoding} ValueEncoding */
Expand Down Expand Up @@ -66,15 +63,14 @@ class MultiCoreIndexer extends TypedEmitter {
this.#writeStream = /** @type {Writable<Entry<T>>} */ (
new Writable({
writev: (entries, cb) => {
// @ts-ignore - I don't know why TS does not like this
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why, but I was able to remove this @ts-ignore.

this[kHandleEntries](entries).then(() => cb(), cb)
this.#handleEntries(entries).then(() => cb(), cb)
},
highWaterMark: maxBatch,
byteLength: () => 1,
})
)
this.#indexStream.pipe(this.#writeStream)
this.#emitStateBound = this[kEmitState].bind(this)
this.#emitStateBound = this.#emitState.bind(this)
// This is needed because the source streams can start indexing before this
// stream starts reading data. This ensures that the indexing state is
// emitted when the source cores first append / download data
Expand All @@ -88,7 +84,7 @@ class MultiCoreIndexer extends TypedEmitter {
* @type {IndexState}
*/
get state() {
return this[kGetState]()
return this.#getState()
}

/**
Expand All @@ -104,7 +100,7 @@ class MultiCoreIndexer extends TypedEmitter {
* Resolves when indexing state is 'idle'
*/
async idle() {
if (this[kGetState]().current === 'idle') return
if (this.#getState().current === 'idle') return
if (!this.#pendingIdle) {
this.#pendingIdle = pDefer()
}
Expand All @@ -123,8 +119,8 @@ class MultiCoreIndexer extends TypedEmitter {
}

/** @param {Entry<T>[]} entries */
async [kHandleEntries](entries) {
this[kEmitState]()
async #handleEntries(entries) {
this.#emitState()
/* istanbul ignore if - not sure this is necessary, but better safe than sorry */
if (!entries.length) return
await this.#batch(entries)
Expand All @@ -140,11 +136,11 @@ class MultiCoreIndexer extends TypedEmitter {
// Set this at the end of batch rather than start so the timing also
// includes the reads from the index streams
this.#rateMeasurementStart = Date.now()
this[kEmitState]()
this.#emitState()
}

[kEmitState]() {
const state = this[kGetState]()
#emitState() {
const state = this.#getState()
if (state.current !== this.#prevEmittedState?.current) {
this.emit(state.current)
}
Expand All @@ -155,7 +151,7 @@ class MultiCoreIndexer extends TypedEmitter {
this.#prevEmittedState = state
}

[kGetState]() {
#getState() {
const remaining = this.#indexStream.remaining
const drained = this.#indexStream.drained
const prevState = this.#state
Expand Down
45 changes: 20 additions & 25 deletions lib/core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,6 @@ const Bitfield = require('./bitfield')
const { pDefer } = require('./utils')
const { promisify } = require('node:util')

const kReadPromise = Symbol('readPromise')
const kOpenPromise = Symbol('openPromise')
const kDestroyPromise = Symbol('destroyPromise')
const kHandleAppend = Symbol('handleAppend')
const kHandleDownload = Symbol('handleDownload')
const kPushEntry = Symbol('pushEntry')

/** @typedef {import('./types').ValueEncoding} ValueEncoding */
/** @typedef {import('./types').JSONValue} JSONValue */
/**
Expand All @@ -30,6 +23,8 @@ const kPushEntry = Symbol('pushEntry')
* @extends {Readable<Entry<T>, Entry<T>, Entry<T>, true, false, import('./types').IndexStreamEvents<Entry<T>>>}
*/
class CoreIndexStream extends Readable {
#handleAppendBound
#handleDownloadBound
/** @type {Bitfield | undefined} */
#indexedBitfield
/** @type {Bitfield | undefined} */
Expand Down Expand Up @@ -61,8 +56,8 @@ class CoreIndexStream extends Readable {
})
this.#core = core
this.#createStorage = createStorage
this[kHandleAppend] = this[kHandleAppend].bind(this)
this[kHandleDownload] = this[kHandleDownload].bind(this)
this.#handleAppendBound = this.#handleAppend.bind(this)
this.#handleDownloadBound = this.#handleDownload.bind(this)
}

get remaining() {
Expand All @@ -81,12 +76,12 @@ class CoreIndexStream extends Readable {

/** @param {any} cb */
_open(cb) {
this[kOpenPromise]().then(cb, cb)
this.#open().then(cb, cb)
}

/** @param {any} cb */
_read(cb) {
this[kReadPromise]().then(cb, cb)
this.#read().then(cb, cb)
}

_predestroy() {
Expand All @@ -96,7 +91,7 @@ class CoreIndexStream extends Readable {

/** @param {any} cb */
_destroy(cb) {
this[kDestroyPromise]().then(cb, cb)
this.#destroy().then(cb, cb)
}

/**
Expand All @@ -110,14 +105,14 @@ class CoreIndexStream extends Readable {
this.#inProgressBitfield?.set(index, false)
}

async [kDestroyPromise]() {
this.#core.removeListener('append', this[kHandleAppend])
this.#core.removeListener('download', this[kHandleDownload])
async #destroy() {
this.#core.removeListener('append', this.#handleAppendBound)
this.#core.removeListener('download', this.#handleDownloadBound)
await this.#indexedBitfield?.flush()
if (this.#storage) await closeStorage(this.#storage)
}

async [kOpenPromise]() {
async #open() {
await this.#core.ready()
await this.#core.update({ wait: true })
const { discoveryKey } = this.#core
Expand All @@ -126,11 +121,11 @@ class CoreIndexStream extends Readable {
this.#storage = this.#createStorage(getStorageName(discoveryKey))
this.#indexedBitfield = await Bitfield.open(this.#storage)
this.#inProgressBitfield = await new Bitfield()
this.#core.on('append', this[kHandleAppend])
this.#core.on('download', this[kHandleDownload])
this.#core.on('append', this.#handleAppendBound)
this.#core.on('download', this.#handleDownloadBound)
}

async [kReadPromise]() {
async #read() {
if (this.#index >= this.#core.length && this.#downloaded.size === 0) {
this.#drained = true
this.emit('drained')
Expand All @@ -142,7 +137,7 @@ class CoreIndexStream extends Readable {
let didPush = false
this.#readBufferAvailable = true
while (this.#readBufferAvailable && this.#index < this.#core.length) {
didPush = (await this[kPushEntry](this.#index)) || didPush
didPush = (await this.#pushEntry(this.#index)) || didPush
// Don't increment this until after the async push above
this.#index++
}
Expand All @@ -151,7 +146,7 @@ class CoreIndexStream extends Readable {
for (const index of this.#downloaded) {
this.#downloaded.delete(index)
didPush =
(await this[kPushEntry](index)) ||
(await this.#pushEntry(index)) ||
/* istanbul ignore next - TODO: Test when hypercore-next supports a core.clear() method */
didPush
// This is for back-pressure, for which there is not a good test yet.
Expand All @@ -166,7 +161,7 @@ class CoreIndexStream extends Readable {
}
if (!didPush && !this.#destroying) {
// If nothing was pushed, queue up another read
await this[kReadPromise]()
await this.#read()
}
await this.#indexedBitfield?.flush()
}
Expand All @@ -177,7 +172,7 @@ class CoreIndexStream extends Readable {
* @param {number} index
* @returns {Promise<boolean>}
*/
async [kPushEntry](index) {
async #pushEntry(index) {
const isProcessed =
this.#indexedBitfield?.get(index) || this.#inProgressBitfield?.get(index)
if (isProcessed) return false
Expand All @@ -192,14 +187,14 @@ class CoreIndexStream extends Readable {
return true
}

async [kHandleAppend]() {
async #handleAppend() {
this.#pending.resolve()
}

/**
* @param {number} index
*/
async [kHandleDownload](index) {
async #handleDownload(index) {
this.#downloaded.add(index)
this.#pending.resolve()
}
Expand Down
37 changes: 17 additions & 20 deletions lib/multi-core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,14 @@ const { once } = require('events')
* @template {ValueEncoding} [T='binary']
* @typedef {import('./core-index-stream').CoreIndexStream<T>} CoreIndexStream
*/
const kReadPromise = Symbol('readPromise')
const kHandleReadable = Symbol('handleReadable')
const kDestroyPromise = Symbol('destroyPromise')
const kHandleIndexing = Symbol('handleIndexing')
const kHandleDrained = Symbol('handleDrained')

/**
* @template {ValueEncoding} [T='binary']
* @extends {Readable<Entry<T>, Entry<T>, Entry<T>, true, false, import('./types').IndexStreamEvents<Entry<T>>>}
*/
class MultiCoreIndexStream extends Readable {
#handleIndexingBound
#handleDrainedBound
/** @type {Map<CoreIndexStream<T>, () => void>} */
#streams = new Map()
/** @type {Map<string, CoreIndexStream<T>>} */
Expand All @@ -49,8 +46,8 @@ class MultiCoreIndexStream extends Readable {
byteLength: () => 1,
})
this.#drained = streams.length === 0
this[kHandleIndexing] = this[kHandleIndexing].bind(this)
this[kHandleDrained] = this[kHandleDrained].bind(this)
this.#handleIndexingBound = this.#handleIndexing.bind(this)
this.#handleDrainedBound = this.#handleDrained.bind(this)
for (const s of streams) {
this.addStream(s)
}
Expand Down Expand Up @@ -89,7 +86,7 @@ class MultiCoreIndexStream extends Readable {
if (this.#streams.has(stream)) return
this.#drained = false
// Do this so that we can remove this listener when we destroy the stream
const handleReadableFn = this[kHandleReadable].bind(this, stream)
const handleReadableFn = this.#handleReadable.bind(this, stream)
this.#streams.set(stream, handleReadableFn)
stream.core
.ready()
Expand All @@ -102,8 +99,8 @@ class MultiCoreIndexStream extends Readable {
.catch(noop)
this.#readable.add(stream)
stream.on('readable', handleReadableFn)
stream.on('indexing', this[kHandleIndexing])
stream.on('drained', this[kHandleDrained])
stream.on('indexing', this.#handleIndexingBound)
stream.on('drained', this.#handleDrainedBound)
}

/** @param {any} cb */
Expand All @@ -113,7 +110,7 @@ class MultiCoreIndexStream extends Readable {

/** @param {any} cb */
_read(cb) {
this[kReadPromise]().then(cb, cb)
this.#read().then(cb, cb)
}

_predestroy() {
Expand All @@ -123,22 +120,22 @@ class MultiCoreIndexStream extends Readable {

/** @param {any} cb */
_destroy(cb) {
this[kDestroyPromise]().then(cb, cb)
this.#destroy().then(cb, cb)
}

async [kDestroyPromise]() {
async #destroy() {
const closePromises = []
for (const [stream, handleReadableFn] of this.#streams) {
stream.off('readable', handleReadableFn)
stream.off('indexing', this[kHandleIndexing])
stream.off('drained', this[kHandleDrained])
stream.off('indexing', this.#handleIndexingBound)
stream.off('drained', this.#handleDrainedBound)
stream.destroy()
closePromises.push(once(stream, 'close'))
}
await Promise.all(closePromises)
}

async [kReadPromise]() {
async #read() {
let didPush = false
if (!this.#readable.size && !this.#destroying) {
await (this.#pending = pDefer()).promise
Expand All @@ -156,12 +153,12 @@ class MultiCoreIndexStream extends Readable {
}
if (!didPush && !this.#destroying) {
// If nothing was pushed, queue up another read
await this[kReadPromise]()
await this.#read()
}
}

/** @param {CoreIndexStream<T>} stream */
[kHandleReadable](stream) {
#handleReadable(stream) {
this.#readable.add(stream)
this.#pending.resolve()
}
Expand All @@ -170,13 +167,13 @@ class MultiCoreIndexStream extends Readable {
// `indexing` event always fires at the start of indexing in the chain of
// streams (the `drained` event should happen at the end of the chain once
// everything is read)
[kHandleIndexing]() {
#handleIndexing() {
if (!this.#drained) return
this.#drained = false
this.emit('indexing')
}

[kHandleDrained]() {
#handleDrained() {
let drained = true
for (const stream of this.#streams.keys()) {
if (!stream.drained) drained = false
Expand Down
Loading