Skip to content

Commit

Permalink
chore: use private methods instead of symbol keys (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
EvanHahn authored Jan 17, 2024
1 parent ebdbb49 commit e73630a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 59 deletions.
24 changes: 10 additions & 14 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ const DEFAULT_BATCH_SIZE = 100
// The indexing rate (in entries per second) is calculated as an exponential
// moving average. A factor > 1 will put more weight on previous values.
const MOVING_AVG_FACTOR = 5
const kHandleEntries = Symbol('handleEntries')
const kEmitState = Symbol('emitState')
const kGetState = Symbol('getState')

/** @typedef {string | ((name: string) => import('random-access-storage'))} StorageParam */
/** @typedef {import('./lib/types').ValueEncoding} ValueEncoding */
Expand Down Expand Up @@ -66,15 +63,14 @@ class MultiCoreIndexer extends TypedEmitter {
this.#writeStream = /** @type {Writable<Entry<T>>} */ (
new Writable({
writev: (entries, cb) => {
// @ts-ignore - I don't know why TS does not like this
this[kHandleEntries](entries).then(() => cb(), cb)
this.#handleEntries(entries).then(() => cb(), cb)
},
highWaterMark: maxBatch,
byteLength: () => 1,
})
)
this.#indexStream.pipe(this.#writeStream)
this.#emitStateBound = this[kEmitState].bind(this)
this.#emitStateBound = this.#emitState.bind(this)
// This is needed because the source streams can start indexing before this
// stream starts reading data. This ensures that the indexing state is
// emitted when the source cores first append / download data
Expand All @@ -88,7 +84,7 @@ class MultiCoreIndexer extends TypedEmitter {
* @type {IndexState}
*/
get state() {
return this[kGetState]()
return this.#getState()
}

/**
Expand All @@ -104,7 +100,7 @@ class MultiCoreIndexer extends TypedEmitter {
* Resolves when indexing state is 'idle'
*/
async idle() {
if (this[kGetState]().current === 'idle') return
if (this.#getState().current === 'idle') return
if (!this.#pendingIdle) {
this.#pendingIdle = pDefer()
}
Expand All @@ -123,8 +119,8 @@ class MultiCoreIndexer extends TypedEmitter {
}

/** @param {Entry<T>[]} entries */
async [kHandleEntries](entries) {
this[kEmitState]()
async #handleEntries(entries) {
this.#emitState()
/* istanbul ignore if - not sure this is necessary, but better safe than sorry */
if (!entries.length) return
await this.#batch(entries)
Expand All @@ -140,11 +136,11 @@ class MultiCoreIndexer extends TypedEmitter {
// Set this at the end of batch rather than start so the timing also
// includes the reads from the index streams
this.#rateMeasurementStart = Date.now()
this[kEmitState]()
this.#emitState()
}

[kEmitState]() {
const state = this[kGetState]()
#emitState() {
const state = this.#getState()
if (state.current !== this.#prevEmittedState?.current) {
this.emit(state.current)
}
Expand All @@ -155,7 +151,7 @@ class MultiCoreIndexer extends TypedEmitter {
this.#prevEmittedState = state
}

[kGetState]() {
#getState() {
const remaining = this.#indexStream.remaining
const drained = this.#indexStream.drained
const prevState = this.#state
Expand Down
45 changes: 20 additions & 25 deletions lib/core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,6 @@ const Bitfield = require('./bitfield')
const { pDefer } = require('./utils')
const { promisify } = require('node:util')

const kReadPromise = Symbol('readPromise')
const kOpenPromise = Symbol('openPromise')
const kDestroyPromise = Symbol('destroyPromise')
const kHandleAppend = Symbol('handleAppend')
const kHandleDownload = Symbol('handleDownload')
const kPushEntry = Symbol('pushEntry')

/** @typedef {import('./types').ValueEncoding} ValueEncoding */
/** @typedef {import('./types').JSONValue} JSONValue */
/**
Expand All @@ -30,6 +23,8 @@ const kPushEntry = Symbol('pushEntry')
* @extends {Readable<Entry<T>, Entry<T>, Entry<T>, true, false, import('./types').IndexStreamEvents<Entry<T>>>}
*/
class CoreIndexStream extends Readable {
#handleAppendBound
#handleDownloadBound
/** @type {Bitfield | undefined} */
#indexedBitfield
/** @type {Bitfield | undefined} */
Expand Down Expand Up @@ -61,8 +56,8 @@ class CoreIndexStream extends Readable {
})
this.#core = core
this.#createStorage = createStorage
this[kHandleAppend] = this[kHandleAppend].bind(this)
this[kHandleDownload] = this[kHandleDownload].bind(this)
this.#handleAppendBound = this.#handleAppend.bind(this)
this.#handleDownloadBound = this.#handleDownload.bind(this)
}

get remaining() {
Expand All @@ -81,12 +76,12 @@ class CoreIndexStream extends Readable {

/** @param {any} cb */
_open(cb) {
this[kOpenPromise]().then(cb, cb)
this.#open().then(cb, cb)
}

/** @param {any} cb */
_read(cb) {
this[kReadPromise]().then(cb, cb)
this.#read().then(cb, cb)
}

_predestroy() {
Expand All @@ -96,7 +91,7 @@ class CoreIndexStream extends Readable {

/** @param {any} cb */
_destroy(cb) {
this[kDestroyPromise]().then(cb, cb)
this.#destroy().then(cb, cb)
}

/**
Expand All @@ -110,14 +105,14 @@ class CoreIndexStream extends Readable {
this.#inProgressBitfield?.set(index, false)
}

async [kDestroyPromise]() {
this.#core.removeListener('append', this[kHandleAppend])
this.#core.removeListener('download', this[kHandleDownload])
async #destroy() {
this.#core.removeListener('append', this.#handleAppendBound)
this.#core.removeListener('download', this.#handleDownloadBound)
await this.#indexedBitfield?.flush()
if (this.#storage) await closeStorage(this.#storage)
}

async [kOpenPromise]() {
async #open() {
await this.#core.ready()
await this.#core.update({ wait: true })
const { discoveryKey } = this.#core
Expand All @@ -126,11 +121,11 @@ class CoreIndexStream extends Readable {
this.#storage = this.#createStorage(getStorageName(discoveryKey))
this.#indexedBitfield = await Bitfield.open(this.#storage)
this.#inProgressBitfield = await new Bitfield()
this.#core.on('append', this[kHandleAppend])
this.#core.on('download', this[kHandleDownload])
this.#core.on('append', this.#handleAppendBound)
this.#core.on('download', this.#handleDownloadBound)
}

async [kReadPromise]() {
async #read() {
if (this.#index >= this.#core.length && this.#downloaded.size === 0) {
this.#drained = true
this.emit('drained')
Expand All @@ -142,7 +137,7 @@ class CoreIndexStream extends Readable {
let didPush = false
this.#readBufferAvailable = true
while (this.#readBufferAvailable && this.#index < this.#core.length) {
didPush = (await this[kPushEntry](this.#index)) || didPush
didPush = (await this.#pushEntry(this.#index)) || didPush
// Don't increment this until after the async push above
this.#index++
}
Expand All @@ -151,7 +146,7 @@ class CoreIndexStream extends Readable {
for (const index of this.#downloaded) {
this.#downloaded.delete(index)
didPush =
(await this[kPushEntry](index)) ||
(await this.#pushEntry(index)) ||
/* istanbul ignore next - TODO: Test when hypercore-next supports a core.clear() method */
didPush
// This is for back-pressure, for which there is not a good test yet.
Expand All @@ -166,7 +161,7 @@ class CoreIndexStream extends Readable {
}
if (!didPush && !this.#destroying) {
// If nothing was pushed, queue up another read
await this[kReadPromise]()
await this.#read()
}
await this.#indexedBitfield?.flush()
}
Expand All @@ -177,7 +172,7 @@ class CoreIndexStream extends Readable {
* @param {number} index
* @returns {Promise<boolean>}
*/
async [kPushEntry](index) {
async #pushEntry(index) {
const isProcessed =
this.#indexedBitfield?.get(index) || this.#inProgressBitfield?.get(index)
if (isProcessed) return false
Expand All @@ -192,14 +187,14 @@ class CoreIndexStream extends Readable {
return true
}

async [kHandleAppend]() {
async #handleAppend() {
this.#pending.resolve()
}

/**
* @param {number} index
*/
async [kHandleDownload](index) {
async #handleDownload(index) {
this.#downloaded.add(index)
this.#pending.resolve()
}
Expand Down
37 changes: 17 additions & 20 deletions lib/multi-core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,14 @@ const { once } = require('events')
* @template {ValueEncoding} [T='binary']
* @typedef {import('./core-index-stream').CoreIndexStream<T>} CoreIndexStream
*/
const kReadPromise = Symbol('readPromise')
const kHandleReadable = Symbol('handleReadable')
const kDestroyPromise = Symbol('destroyPromise')
const kHandleIndexing = Symbol('handleIndexing')
const kHandleDrained = Symbol('handleDrained')

/**
* @template {ValueEncoding} [T='binary']
* @extends {Readable<Entry<T>, Entry<T>, Entry<T>, true, false, import('./types').IndexStreamEvents<Entry<T>>>}
*/
class MultiCoreIndexStream extends Readable {
#handleIndexingBound
#handleDrainedBound
/** @type {Map<CoreIndexStream<T>, () => void>} */
#streams = new Map()
/** @type {Map<string, CoreIndexStream<T>>} */
Expand All @@ -49,8 +46,8 @@ class MultiCoreIndexStream extends Readable {
byteLength: () => 1,
})
this.#drained = streams.length === 0
this[kHandleIndexing] = this[kHandleIndexing].bind(this)
this[kHandleDrained] = this[kHandleDrained].bind(this)
this.#handleIndexingBound = this.#handleIndexing.bind(this)
this.#handleDrainedBound = this.#handleDrained.bind(this)
for (const s of streams) {
this.addStream(s)
}
Expand Down Expand Up @@ -89,7 +86,7 @@ class MultiCoreIndexStream extends Readable {
if (this.#streams.has(stream)) return
this.#drained = false
// Do this so that we can remove this listener when we destroy the stream
const handleReadableFn = this[kHandleReadable].bind(this, stream)
const handleReadableFn = this.#handleReadable.bind(this, stream)
this.#streams.set(stream, handleReadableFn)
stream.core
.ready()
Expand All @@ -102,8 +99,8 @@ class MultiCoreIndexStream extends Readable {
.catch(noop)
this.#readable.add(stream)
stream.on('readable', handleReadableFn)
stream.on('indexing', this[kHandleIndexing])
stream.on('drained', this[kHandleDrained])
stream.on('indexing', this.#handleIndexingBound)
stream.on('drained', this.#handleDrainedBound)
}

/** @param {any} cb */
Expand All @@ -113,7 +110,7 @@ class MultiCoreIndexStream extends Readable {

/** @param {any} cb */
_read(cb) {
this[kReadPromise]().then(cb, cb)
this.#read().then(cb, cb)
}

_predestroy() {
Expand All @@ -123,22 +120,22 @@ class MultiCoreIndexStream extends Readable {

/** @param {any} cb */
_destroy(cb) {
this[kDestroyPromise]().then(cb, cb)
this.#destroy().then(cb, cb)
}

async [kDestroyPromise]() {
async #destroy() {
const closePromises = []
for (const [stream, handleReadableFn] of this.#streams) {
stream.off('readable', handleReadableFn)
stream.off('indexing', this[kHandleIndexing])
stream.off('drained', this[kHandleDrained])
stream.off('indexing', this.#handleIndexingBound)
stream.off('drained', this.#handleDrainedBound)
stream.destroy()
closePromises.push(once(stream, 'close'))
}
await Promise.all(closePromises)
}

async [kReadPromise]() {
async #read() {
let didPush = false
if (!this.#readable.size && !this.#destroying) {
await (this.#pending = pDefer()).promise
Expand All @@ -156,12 +153,12 @@ class MultiCoreIndexStream extends Readable {
}
if (!didPush && !this.#destroying) {
// If nothing was pushed, queue up another read
await this[kReadPromise]()
await this.#read()
}
}

/** @param {CoreIndexStream<T>} stream */
[kHandleReadable](stream) {
#handleReadable(stream) {
this.#readable.add(stream)
this.#pending.resolve()
}
Expand All @@ -170,13 +167,13 @@ class MultiCoreIndexStream extends Readable {
// `indexing` event always fires at the start of indexing in the chain of
// streams (the `drained` event should happen at the end of the chain once
// everything is read)
[kHandleIndexing]() {
#handleIndexing() {
if (!this.#drained) return
this.#drained = false
this.emit('indexing')
}

[kHandleDrained]() {
#handleDrained() {
let drained = true
for (const stream of this.#streams.keys()) {
if (!stream.drained) drained = false
Expand Down

0 comments on commit e73630a

Please sign in to comment.