From 905e2fc45c3229139ba40900f23b368cbcc5a2d3 Mon Sep 17 00:00:00 2001 From: Gregor MacLennan Date: Mon, 27 Nov 2023 12:33:51 +0900 Subject: [PATCH] feat!: discoveryId instead of key for entries BREAKING CHANGE: this changes the type of `Entry` to have the core discoveryId (the discovery key as a hex string) instead of the core key. Since our mapeo-core code changed to using the discovery key for version ids, we need to calculate the discovery key for each entry, which has a performance cost (it's a hash operation). This change avoids the need for any hashing when processing entries. --- index.js | 4 +-- lib/core-index-stream.js | 14 +++++----- lib/multi-core-index-stream.js | 6 ++--- lib/types.ts | 2 +- test/helpers/index.js | 18 +++++++------ test/multi-core-indexer.test.js | 4 +++ test/tsconfig.json | 8 ++++++ test/unit-tests/core-index-stream.test.js | 26 +++++++++++-------- .../multi-core-index-stream.test.js | 26 +++++++++++++------ 9 files changed, 69 insertions(+), 39 deletions(-) create mode 100644 test/tsconfig.json diff --git a/index.js b/index.js index 73ac91e..902056f 100644 --- a/index.js +++ b/index.js @@ -128,8 +128,8 @@ class MultiCoreIndexer extends TypedEmitter { /* istanbul ignore if - not sure this is necessary, but better safe than sorry */ if (!entries.length) return await this.#batch(entries) - for (const { key, index } of entries) { - this.#indexStream.setIndexed(key.toString('hex'), index) + for (const { discoveryId, index } of entries) { + this.#indexStream.setIndexed(discoveryId, index) } const batchTime = Date.now() - this.#rateMeasurementStart // Current rate entries per second diff --git a/lib/core-index-stream.js b/lib/core-index-stream.js index 46cf262..a9deeaf 100644 --- a/lib/core-index-stream.js +++ b/lib/core-index-stream.js @@ -36,6 +36,8 @@ class CoreIndexStream extends Readable { #inProgressBitfield #inProgress = 0 #core + /** @type {string | undefined} */ + #discoveryId /** @type {import('random-access-storage') | undefined} */ #storage #createStorage @@ -123,7 +125,8 @@ class CoreIndexStream extends Readable { const { discoveryKey } = this.#core /* istanbul ignore next: just to keep TS happy - after core.ready() this is set */ if (!discoveryKey) throw new Error('Missing discovery key') - this.#storage = this.#createStorage(getStorageName(discoveryKey)) + this.#discoveryId = discoveryKey.toString('hex') + this.#storage = this.#createStorage(getStorageName(this.#discoveryId)) this.#indexedBitfield = await Bitfield.open(this.#storage) this.#inProgressBitfield = await new Bitfield() this.#core.on('append', this[kHandleAppend]) @@ -186,8 +189,8 @@ class CoreIndexStream extends Readable { this.#inProgressBitfield?.set(index, true) this.#inProgress++ /* istanbul ignore next: this should always be set at this point */ - if (!this.#core.key) throw new Error('Missing core key') - const entry = { key: this.#core.key, block, index } + if (!this.#discoveryId) throw new Error('Missing core key') + const entry = { discoveryId: this.#discoveryId, block, index } this.#readBufferAvailable = this.push(entry) return true } @@ -207,9 +210,8 @@ class CoreIndexStream extends Readable { exports.CoreIndexStream = CoreIndexStream -/** @param {Buffer} discoveryKey */ -function getStorageName(discoveryKey) { - const id = discoveryKey.toString('hex') +/** @param {string} id */ +function getStorageName(id) { return [id.slice(0, 2), id.slice(2, 4), id].join('/') } diff --git a/lib/multi-core-index-stream.js b/lib/multi-core-index-stream.js index 638be0b..6afab9a 100644 --- a/lib/multi-core-index-stream.js +++ b/lib/multi-core-index-stream.js @@ -94,10 +94,10 @@ class MultiCoreIndexStream extends Readable { stream.core .ready() .then(() => { - const coreKey = stream.core.key + const discoveryKey = stream.core.discoveryKey /* istanbul ignore next: this is set after ready */ - if (!coreKey) return - this.#streamsById.set(coreKey.toString('hex'), stream) + if (!discoveryKey) return + this.#streamsById.set(discoveryKey.toString('hex'), stream) }) .catch(noop) this.#readable.add(stream) diff --git a/lib/types.ts b/lib/types.ts index a6d7e18..545308b 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -28,7 +28,7 @@ export type ValueEncoding = 'binary' | 'utf-8' | 'json' export interface Entry { index: number - key: Buffer + discoveryId: string block: T extends 'binary' ? Buffer : T extends 'utf-8' ? string : JSONValue } diff --git a/test/helpers/index.js b/test/helpers/index.js index b75f395..27a895b 100644 --- a/test/helpers/index.js +++ b/test/helpers/index.js @@ -78,7 +78,9 @@ async function generateFixtures(cores, count) { const offset = core.length const blocks = generateFixture(offset, offset + count) await core.append(blocks) - entries.push.apply(entries, blocksToExpected(blocks, core.key, offset)) + // @ts-expect-error - we know discoveryKey is set here + const discoveryId = core.discoveryKey.toString('hex') + entries.push.apply(entries, blocksToExpected(blocks, discoveryId, offset)) } return entries } @@ -119,8 +121,8 @@ async function throttledDrain(emitter) { * @returns number */ function sort(a, b) { - const aKey = a.key.toString('hex') + a.block.toString() - const bKey = b.key.toString('hex') + b.block.toString() + const aKey = a.discoveryId + a.block.toString() + const bKey = b.discoveryId + b.block.toString() return aKey < bKey ? -1 : aKey > bKey ? 1 : 0 } @@ -132,12 +134,12 @@ function sortEntries(e) { /** * * @param {Buffer[]} blocks - * @param {Buffer} key - * @returns + * @param {string} discoveryId + * @returns {Entry[]} */ -function blocksToExpected(blocks, key, offset = 0) { +function blocksToExpected(blocks, discoveryId, offset = 0) { return blocks.map((block, i) => ({ - key, + discoveryId, block, index: i + offset, })) @@ -159,7 +161,7 @@ async function createMultiple(n) { function logEntries(e) { console.log( sortEntries(e).map((e) => ({ - key: e.key.toString('hex'), + discoveryId: e.discoveryId.slice(0, 7), block: e.block.toString(), index: e.index, })) diff --git a/test/multi-core-indexer.test.js b/test/multi-core-indexer.test.js index 62696dc..b8ff1a3 100644 --- a/test/multi-core-indexer.test.js +++ b/test/multi-core-indexer.test.js @@ -206,6 +206,7 @@ test('Appends from a replicated core are indexed', async (t) => { const remote = (remoteCores[i] = await create(core.key)) replicate(core, remoteCores[i], t) await remote.update({ wait: true }) + // @ts-ignore - Hypercore typings are missing await remote.download({ start: 0, end: remote.length }).downloaded() } /** @type {Entry[]} */ @@ -495,6 +496,7 @@ test('Closing before batch complete should resume on next start', async (t) => { await /** @type {Promise} */ ( new Promise((res) => { indexer1.on('index-state', onIndexState) + /** @param {import('../lib/types').IndexState} state */ function onIndexState(state) { if (state.remaining > 2500) return indexer1.off('index-state', onIndexState) @@ -524,11 +526,13 @@ test('Closing before batch complete should resume on next start', async (t) => { // This checks that storage names do not change between versions, which would be a breaking change test('Consistent storage folders', async (t) => { + /** @type {string[]} */ const storageNames = [] const cores = [] for (const keyPair of testKeypairs.slice(0, 5)) { cores.push(await create({ keyPair })) } + /** @param {string} name */ function createStorage(name) { storageNames.push(name) return new ram() diff --git a/test/tsconfig.json b/test/tsconfig.json new file mode 100644 index 0000000..b89072a --- /dev/null +++ b/test/tsconfig.json @@ -0,0 +1,8 @@ +{ + "compilerOptions": { + "noEmit": true, + "emitDeclarationOnly": false + }, + "extends": "../tsconfig.json", + "include": ["**/*.js"] +} diff --git a/test/unit-tests/core-index-stream.test.js b/test/unit-tests/core-index-stream.test.js index fa264de..9a9fac8 100644 --- a/test/unit-tests/core-index-stream.test.js +++ b/test/unit-tests/core-index-stream.test.js @@ -33,7 +33,8 @@ test('destroy before open', async (t) => { test('Indexes all items already in a core', async (t) => { const a = await create() const blocks = generateFixture(0, 10) - const expected = blocksToExpected(blocks, a.key) + // @ts-ignore + const expected = blocksToExpected(blocks, a.discoveryKey.toString('hex')) await a.append(blocks) /** @type {any[]} */ const entries = [] @@ -56,7 +57,8 @@ test('.remaining property is accurate', async (t) => { const totalBlocks = 100 const a = await create() const blocks = generateFixture(0, totalBlocks) - const expected = blocksToExpected(blocks, a.key) + // @ts-ignore + const expected = blocksToExpected(blocks, a.discoveryKey.toString('hex')) await a.append(blocks) /** @type {any[]} */ const entries = [] @@ -81,7 +83,8 @@ test('Indexes items appended after initial index', async (t) => { stream.on('data', (entry) => entries.push(entry)) await once(stream, 'drained') t.same(entries, [], 'no entries before append') - const expected = blocksToExpected(blocks, a.key) + // @ts-ignore + const expected = blocksToExpected(blocks, a.discoveryKey.toString('hex')) await a.append(blocks) await once(stream, 'drained') t.same(entries, expected) @@ -96,7 +99,7 @@ test('Readable stream from sparse hypercore', async (t) => { replicate(a, b, t) const range = b.download({ start: 5, end: 20 }) - await range.downloaded() + await range.done() const stream = new CoreIndexStream(b, () => new ram()) /** @type {Buffer[]} */ @@ -106,7 +109,7 @@ test('Readable stream from sparse hypercore', async (t) => { t.same(entries, blocks.slice(5, 20)) const range2 = b.download({ start: 50, end: 60 }) - await Promise.all([range2.downloaded(), throttledIdle(stream)]) + await Promise.all([range2.done(), throttledIdle(stream)]) t.same( entries.sort(), @@ -118,6 +121,7 @@ test("'indexing' and 'drained' events are paired", async (t) => { const a = await create() const blocks = generateFixture(0, 100) await a.append(blocks) + // @ts-ignore const b = await create(a.key) replicate(a, b, t) @@ -136,7 +140,7 @@ test("'indexing' and 'drained' events are paired", async (t) => { stream.resume() const range = b.download({ start: 0, end: a.length }) - await Promise.all([range.downloaded(), throttledIdle(stream)]) + await Promise.all([range.done(), throttledIdle(stream)]) t.equal(indexingEvents, idleEvents) // This is just to check that we're actually testing something @@ -152,7 +156,7 @@ test('Appends from a replicated core are indexed', async (t) => { replicate(a, b, t) await b.update({ wait: true }) const range1 = b.download({ start: 0, end: b.length }) - await range1.downloaded() + await range1.done() const stream = new CoreIndexStream(b, () => new ram()) /** @type {Buffer[]} */ @@ -200,12 +204,12 @@ test('Maintains index state', async (t) => { /** * * @param {Buffer[]} blocks - * @param {Buffer} key - * @returns + * @param {string} discoveryId + * @returns {import('../../types/index.js').Entry[]} */ -function blocksToExpected(blocks, key) { +function blocksToExpected(blocks, discoveryId) { return blocks.map((block, i) => ({ - key, + discoveryId, block, index: i, })) diff --git a/test/unit-tests/multi-core-index-stream.test.js b/test/unit-tests/multi-core-index-stream.test.js index d06bcfe..6b539ff 100644 --- a/test/unit-tests/multi-core-index-stream.test.js +++ b/test/unit-tests/multi-core-index-stream.test.js @@ -14,12 +14,15 @@ const { sortEntries, } = require('../helpers') +/** @typedef {import('../../types/index.js').Entry} Entry */ + test('Indexes all items already in a core', async (t) => { const cores = await createMultiple(5) const expected = await generateFixtures(cores, 1000) const indexStreams = cores.map( (core) => new CoreIndexStream(core, () => new ram()) ) + /** @type {Entry[]} */ const entries = [] const stream = new MultiCoreIndexStream(indexStreams) const ws = new Writable({ @@ -46,6 +49,7 @@ test('Adding index streams after initialization', async (t) => { const indexStreams = cores.map( (core) => new CoreIndexStream(core, () => new ram()) ) + /** @type {Entry[]} */ const entries = [] const stream = new MultiCoreIndexStream(indexStreams.slice(0, 2)) stream.addStream(indexStreams[2]) @@ -77,13 +81,14 @@ test('.remaining is as expected', async (t) => { const indexStreams = cores.map( (core) => new CoreIndexStream(core, () => new ram()) ) + /** @type {Entry[]} */ const entries = [] const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 }) const ws = new Writable({ writev: (data, cb) => { entries.push(...data) - for (const { key, index } of data) { - stream.setIndexed(key.toString('hex'), index) + for (const { discoveryId, index } of data) { + stream.setIndexed(discoveryId, index) } t.equal( stream.remaining, @@ -110,6 +115,7 @@ test('Indexes items appended after initial index', async (t) => { const indexStreams = cores.map( (core) => new CoreIndexStream(core, () => new ram()) ) + /** @type {Entry[]} */ const entries = [] const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 }) stream.on('data', (entry) => entries.push(entry)) @@ -126,7 +132,9 @@ test('Indexes items appended after initial index', async (t) => { test('index sparse hypercores', async (t) => { const coreCount = 5 const localCores = await createMultiple(coreCount) + /** @type {Entry[]} */ const expected = [] + /** @type {Entry[]} */ const expected2 = [] const indexStreams = [] const remoteCores = Array(coreCount) @@ -140,9 +148,10 @@ test('index sparse hypercores', async (t) => { for (const core of remoteCores) { const range = core.download({ start: 5, end: 20 }) - await range.downloaded() + await range.done() indexStreams.push(new CoreIndexStream(core, () => new ram())) } + /** @type {Entry[]} */ const entries = [] const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 }) stream.on('data', (entry) => entries.push(entry)) @@ -152,9 +161,7 @@ test('index sparse hypercores', async (t) => { await Promise.all([ throttledDrain(stream), - ...remoteCores.map((core) => - core.download({ start: 50, end: 60 }).downloaded() - ), + ...remoteCores.map((core) => core.download({ start: 50, end: 60 }).done()), ]) t.same(sortEntries(entries), sortEntries([...expected, ...expected2])) @@ -171,9 +178,10 @@ test('Appends from a replicated core are indexed', async (t) => { replicate(core, remoteCores[i], t) await remote.update({ wait: true }) const range = remote.download({ start: 0, end: remote.length }) - await range.downloaded() + await range.done() indexStreams.push(new CoreIndexStream(core, () => new ram())) } + /** @type {Entry[]} */ const entries = [] const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 }) stream.on('data', (entry) => entries.push(entry)) @@ -192,8 +200,10 @@ test('Appends from a replicated core are indexed', async (t) => { test('Maintains index state', async (t) => { const cores = await createMultiple(5) + /** @type {Array<(name: string) => import('random-access-storage')>} */ const storages = [] await generateFixtures(cores, 1000) + /** @type {Entry[]} */ const entries = [] for (const core of cores) { @@ -213,7 +223,7 @@ test('Maintains index state', async (t) => { const stream = new MultiCoreIndexStream(indexStreams) stream.on('data', (entry) => { entries.push(entry) - stream.setIndexed(entry.key.toString('hex'), entry.index) + stream.setIndexed(entry.discoveryId, entry.index) }) const expectedPromise = generateFixtures(cores, 1000)