diff --git a/README.md b/README.md index a7acfb4..e7ea184 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,14 @@ string, that should return a is used to store the index state of each hypercore. (Index state is stored as a bitfield). +#### opts.reindex + +_Optional_\ +Type: `boolean` + +If `true`, the cores, and any new ones that are added, will be reindexed from +scratch. + #### opts.maxBatch _Optional_\ diff --git a/index.js b/index.js index 3eb0ed3..2ad59ff 100644 --- a/index.js +++ b/index.js @@ -34,6 +34,7 @@ class MultiCoreIndexer extends TypedEmitter { #rateMeasurementStart = Date.now() #rate = 0 #createStorage + #reindex /** @type {IndexState | undefined} */ #prevEmittedState #emitStateBound @@ -46,13 +47,18 @@ class MultiCoreIndexer extends TypedEmitter { * @param {object} opts * @param {(entries: Entry[]) => Promise} opts.batch * @param {StorageParam} opts.storage + * @param {boolean} [opts.reindex] * @param {number} [opts.maxBatch=100] */ - constructor(cores, { batch, maxBatch = DEFAULT_BATCH_SIZE, storage }) { + constructor( + cores, + { batch, maxBatch = DEFAULT_BATCH_SIZE, storage, reindex = false } + ) { super() this.#createStorage = MultiCoreIndexer.defaultStorage(storage) + this.#reindex = reindex const coreIndexStreams = cores.map((core) => { - return new CoreIndexStream(core, this.#createStorage) + return new CoreIndexStream(core, this.#createStorage, reindex) }) this.#indexStream = new MultiCoreIndexStream(coreIndexStreams, { highWaterMark: maxBatch, @@ -95,7 +101,11 @@ class MultiCoreIndexer extends TypedEmitter { */ addCore(core) { this.#assertOpen('Cannot add core after closing') - const coreIndexStream = new CoreIndexStream(core, this.#createStorage) + const coreIndexStream = new CoreIndexStream( + core, + this.#createStorage, + this.#reindex + ) this.#indexStream.addStream(coreIndexStream) } diff --git a/lib/core-index-stream.js b/lib/core-index-stream.js index 1e09322..6b2884c 100644 --- a/lib/core-index-stream.js +++ b/lib/core-index-stream.js @@ -45,8 +45,9 @@ class CoreIndexStream extends Readable { /** * @param {import('hypercore')} core * @param {(name: string) => import('random-access-storage')} createStorage + * @param {boolean} reindex */ - constructor(core, createStorage) { + constructor(core, createStorage, reindex) { super({ // Treat as object stream, count each object as size `1` so that the // `remaining` property can use the stream buffer to calculate how many @@ -59,10 +60,15 @@ class CoreIndexStream extends Readable { this.#handleDownloadBound = this.#handleDownload.bind(this) this.#createStorage = async () => { await this.#core.ready() + const { discoveryKey } = this.#core /* c8 ignore next: just to keep TS happy - after core.ready() this is set */ if (!discoveryKey) throw new Error('Missing discovery key') - return createStorage(getStorageName(discoveryKey)) + const storageName = getStorageName(discoveryKey) + + if (reindex) await unlinkStorage(createStorage(storageName)) + + return createStorage(storageName) } } diff --git a/test/multi-core-indexer.test.js b/test/multi-core-indexer.test.js index fad72f5..a7a5b48 100644 --- a/test/multi-core-indexer.test.js +++ b/test/multi-core-indexer.test.js @@ -399,6 +399,48 @@ test('Entries are re-indexed if index storage unlinked', async () => { await indexer2.close() }) +test('Entries can be explicitly reindexed with a startup option', async (t) => { + const cores = await createMultiple(3) + const [core1, core2, core3] = cores + const expectedIn1And2 = new Set(await generateFixtures([core1, core2], 3)) + const expectedIn3 = new Set(await generateFixtures([core3], 3)) + const allExpected = new Set([...expectedIn1And2, ...expectedIn3]) + + const storage = ram.reusable() + + /** @type {Set} */ const entriesBeforeReindex = new Set() + const indexer1 = new MultiCoreIndexer(cores, { + batch: async (entries) => { + for (const entry of entries) entriesBeforeReindex.add(entry) + }, + storage, + }) + await indexer1.idle() + await indexer1.close() + assert.deepEqual( + entriesBeforeReindex, + allExpected, + 'test setup: entries are indexed once' + ) + + /** @type {Set} */ const entriesAfterReindex = new Set() + const indexer2 = new MultiCoreIndexer([core1, core2], { + batch: async (entries) => { + for (const entry of entries) entriesAfterReindex.add(entry) + }, + storage, + reindex: true, + }) + t.after(() => indexer2.close()) + + await indexer2.idle() + assert.deepEqual(entriesAfterReindex, expectedIn1And2) + + indexer2.addCore(core3) + await indexer2.idle() + assert.deepEqual(entriesAfterReindex, allExpected) +}) + test('Entries are batched to batchMax when indexing is slower than Hypercore reads', async () => { const cores = await createMultiple(5) await generateFixtures(cores, 500) diff --git a/test/unit-tests/core-index-stream.test.js b/test/unit-tests/core-index-stream.test.js index 58b2efa..2c24f34 100644 --- a/test/unit-tests/core-index-stream.test.js +++ b/test/unit-tests/core-index-stream.test.js @@ -14,7 +14,7 @@ const Hypercore = require('hypercore') test('stream.core', async () => { const a = await create() - const stream = new CoreIndexStream(a, () => new ram()) + const stream = new CoreIndexStream(a, () => new ram(), false) assert.deepEqual(stream.core, a) }) @@ -25,7 +25,7 @@ test('destroy before open', async () => { return new ram() } const a = new Hypercore(() => new ram()) - const stream = new CoreIndexStream(a, createStorage) + const stream = new CoreIndexStream(a, createStorage, false) stream.destroy() await once(stream, 'close') assert.equal(storageCreated, false, 'storage never created') @@ -38,7 +38,7 @@ test('unlink before open', async () => { return new ram() } const core = new Hypercore(() => new ram()) - const stream = new CoreIndexStream(core, createStorage) + const stream = new CoreIndexStream(core, createStorage, false) await stream.unlink() assert.equal(storageCreated, true, 'storage was created') }) @@ -50,15 +50,41 @@ test('Indexes all items already in a core', async () => { await a.append(blocks) /** @type {any[]} */ const entries = [] - const stream = new CoreIndexStream(a, () => new ram()) + const stream = new CoreIndexStream(a, () => new ram(), false) stream.on('data', (entry) => entries.push(entry)) await once(stream, 'drained') assert.deepEqual(entries, expected) }) +test('Re-indexing all items in a core', async () => { + const core = await create() + const blocks = generateFixture(0, 10) + const expected = blocksToExpected(blocks, core.key) + await core.append(blocks) + + const storage = ram.reusable() + + const stream1 = new CoreIndexStream(core, storage, false) + stream1.on('data', (entry) => { + stream1.setIndexed(entry.index) + }) + await once(stream1, 'drained') + await stream1.destroy() + + /** @type {any[]} */ + const entries = [] + const stream2 = new CoreIndexStream(core, storage, true) + stream2.on('data', (entry) => { + entries.push(entry) + }) + await once(stream2, 'drained') + + assert.deepEqual(entries, expected) +}) + test("Empty core emits 'drained' event", async () => { const a = await create() - const stream = new CoreIndexStream(a, () => new ram()) + const stream = new CoreIndexStream(a, () => new ram(), false) stream.resume() stream.on('indexing', assert.fail) await once(stream, 'drained') @@ -72,7 +98,7 @@ test('.remaining property is accurate', async () => { await a.append(blocks) /** @type {any[]} */ const entries = [] - const stream = new CoreIndexStream(a, () => new ram()) + const stream = new CoreIndexStream(a, () => new ram(), false) assert.equal(stream.remaining, totalBlocks) stream.on('data', (entry) => { entries.push(entry) @@ -89,7 +115,7 @@ test('Indexes items appended after initial index', async () => { const blocks = generateFixture(0, 10) /** @type {any[]} */ const entries = [] - const stream = new CoreIndexStream(a, () => new ram()) + const stream = new CoreIndexStream(a, () => new ram(), false) stream.on('data', (entry) => entries.push(entry)) await once(stream, 'drained') assert.deepEqual(entries, [], 'no entries before append') @@ -110,7 +136,7 @@ test('Readable stream from sparse hypercore', async () => { const range = b.download({ start: 5, end: 20 }) await range.downloaded() - const stream = new CoreIndexStream(b, () => new ram()) + const stream = new CoreIndexStream(b, () => new ram(), false) /** @type {Buffer[]} */ const entries = [] stream.on('data', (entry) => entries.push(entry.block)) @@ -134,7 +160,7 @@ test("'indexing' and 'drained' events are paired", async () => { replicate(a, b) - const stream = new CoreIndexStream(b, () => new ram()) + const stream = new CoreIndexStream(b, () => new ram(), false) let indexingEvents = 0 let idleEvents = 0 stream.on('indexing', () => { @@ -166,7 +192,7 @@ test('Appends from a replicated core are indexed', async () => { const range1 = b.download({ start: 0, end: b.length }) await range1.downloaded() - const stream = new CoreIndexStream(b, () => new ram()) + const stream = new CoreIndexStream(b, () => new ram(), false) /** @type {Buffer[]} */ const entries = [] stream.on('data', (entry) => entries.push(entry.block)) @@ -187,7 +213,7 @@ test('Maintains index state', async () => { /** @type {any[]} */ const entries = [] const storage = ram.reusable() - const stream1 = new CoreIndexStream(a, storage) + const stream1 = new CoreIndexStream(a, storage, false) stream1.on('data', (entry) => { entries.push(entry.block) stream1.setIndexed(entry.index) @@ -200,7 +226,7 @@ test('Maintains index state', async () => { stream1.destroy() await once(stream1, 'close') await a.append(blocks.slice(500, 1000)) - const stream2 = new CoreIndexStream(a, storage) + const stream2 = new CoreIndexStream(a, storage, false) stream2.on('data', (entry) => { entries.push(entry.block) stream2.setIndexed(entry.index) diff --git a/test/unit-tests/multi-core-index-stream.test.js b/test/unit-tests/multi-core-index-stream.test.js index a9c1859..9b1e187 100644 --- a/test/unit-tests/multi-core-index-stream.test.js +++ b/test/unit-tests/multi-core-index-stream.test.js @@ -19,7 +19,7 @@ test('Indexes all items already in a core', async () => { const cores = await createMultiple(5) const expected = await generateFixtures(cores, 1000) const indexStreams = cores.map( - (core) => new CoreIndexStream(core, () => new ram()) + (core) => new CoreIndexStream(core, () => new ram(), false) ) const entries = [] const stream = new MultiCoreIndexStream(indexStreams) @@ -44,7 +44,7 @@ test('Adding index streams after initialization', async () => { const cores = await createMultiple(3) const expected = await generateFixtures(cores, 100) const indexStreams = cores.map( - (core) => new CoreIndexStream(core, () => new ram()) + (core) => new CoreIndexStream(core, () => new ram(), false) ) const entries = [] const stream = new MultiCoreIndexStream(indexStreams.slice(0, 2)) @@ -74,7 +74,7 @@ test('.remaining is as expected', async () => { const cores = await createMultiple(coreCount) const expected = await generateFixtures(cores, blockCount) const indexStreams = cores.map( - (core) => new CoreIndexStream(core, () => new ram()) + (core) => new CoreIndexStream(core, () => new ram(), false) ) const entries = [] const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 }) @@ -106,7 +106,7 @@ test('.remaining is as expected', async () => { test('Indexes items appended after initial index', async () => { const cores = await createMultiple(5) const indexStreams = cores.map( - (core) => new CoreIndexStream(core, () => new ram()) + (core) => new CoreIndexStream(core, () => new ram(), false) ) const entries = [] const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 }) @@ -138,7 +138,7 @@ test('index sparse hypercores', async () => { for (const core of remoteCores) { const range = core.download({ start: 5, end: 20 }) await range.downloaded() - indexStreams.push(new CoreIndexStream(core, () => new ram())) + indexStreams.push(new CoreIndexStream(core, () => new ram(), false)) } const entries = [] const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 }) @@ -172,7 +172,7 @@ test('Appends from a replicated core are indexed', async () => { await remote.update({ wait: true }) const range = remote.download({ start: 0, end: remote.length }) await range.downloaded() - indexStreams.push(new CoreIndexStream(core, () => new ram())) + indexStreams.push(new CoreIndexStream(core, () => new ram(), false)) } const entries = [] const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 }) @@ -202,7 +202,7 @@ test('Maintains index state', async () => { for (const core of cores) { const storage = ram.reusable() storages.push(storage) - const indexStream = new CoreIndexStream(core, storage) + const indexStream = new CoreIndexStream(core, storage, false) indexStream.on('data', ({ index }) => { indexStream.setIndexed(index) }) @@ -211,7 +211,7 @@ test('Maintains index state', async () => { } const indexStreams = cores.map( - (core, i) => new CoreIndexStream(core, storages[i]) + (core, i) => new CoreIndexStream(core, storages[i], false) ) const stream = new MultiCoreIndexStream(indexStreams) stream.on('data', (entry) => {