diff --git a/.github/workflows/node.js.yml b/.github/workflows/node.js.yml index b8c7752..41bd8e2 100644 --- a/.github/workflows/node.js.yml +++ b/.github/workflows/node.js.yml @@ -7,7 +7,6 @@ on: push: branches: [main] pull_request: - branches: [main] jobs: build: diff --git a/index.js b/index.js index 95d64ca..73ac91e 100644 --- a/index.js +++ b/index.js @@ -3,11 +3,9 @@ const { Writable } = require('streamx') const { TypedEmitter } = require('tiny-typed-emitter') const { once } = require('events') const raf = require('random-access-file') -const { discoveryKey } = require('hypercore-crypto') // const log = require('debug')('multi-core-indexer') const { CoreIndexStream } = require('./lib/core-index-stream') const { MultiCoreIndexStream } = require('./lib/multi-core-index-stream') -const { promisify } = require('util') const { pDefer } = require('./lib/utils.js') const DEFAULT_BATCH_SIZE = 100 @@ -43,15 +41,13 @@ class MultiCoreIndexer extends TypedEmitter { #createStorage /** @type {IndexState | undefined} */ #prevEmittedState - /** @type {Set} */ - #storages = new Set() #emitStateBound /** @type {import('./lib/utils.js').DeferredPromise | undefined} */ #pendingIdle /** * - * @param {import('hypercore')[]} cores + * @param {import('hypercore')[]} cores * @param {object} opts * @param {(entries: Entry[]) => Promise} opts.batch * @param {StorageParam} opts.storage @@ -61,9 +57,7 @@ class MultiCoreIndexer extends TypedEmitter { super() this.#createStorage = MultiCoreIndexer.defaultStorage(storage) const coreIndexStreams = cores.map((core) => { - const storage = this.#createStorage(getStorageName(core)) - this.#storages.add(storage) - return new CoreIndexStream(core, storage) + return new CoreIndexStream(core, this.#createStorage) }) this.#indexStream = new MultiCoreIndexStream(coreIndexStreams, { highWaterMark: maxBatch, @@ -99,12 +93,10 @@ class MultiCoreIndexer extends TypedEmitter { /** * Add a core to be indexed - * @param {import('hypercore')} core + * @param {import('hypercore')} core */ addCore(core) { - const storage = this.#createStorage(getStorageName(core)) - this.#storages.add(storage) - const coreIndexStream = new CoreIndexStream(core, storage) + const coreIndexStream = new CoreIndexStream(core, this.#createStorage) this.#indexStream.addStream(coreIndexStream) } @@ -128,13 +120,6 @@ class MultiCoreIndexer extends TypedEmitter { once(this.#indexStream, 'close'), once(this.#writeStream, 'close'), ]) - const storageClosePromises = [] - for (const storage of this.#storages) { - const promisifiedClose = promisify(storage.close.bind(storage)) - storageClosePromises.push(promisifiedClose()) - } - this.#storages.clear() - await Promise.all(storageClosePromises) } /** @param {Entry[]} entries */ @@ -204,9 +189,3 @@ class MultiCoreIndexer extends TypedEmitter { } module.exports = MultiCoreIndexer - -/** @param {{ key: Buffer }} core */ -function getStorageName(core) { - const id = discoveryKey(core.key).toString('hex') - return [id.slice(0, 2), id.slice(2, 4), id].join('/') -} diff --git a/lib/core-index-stream.js b/lib/core-index-stream.js index 323cf38..46cf262 100644 --- a/lib/core-index-stream.js +++ b/lib/core-index-stream.js @@ -2,6 +2,7 @@ const { Readable } = require('streamx') const Bitfield = require('./bitfield') const { pDefer } = require('./utils') +const { promisify } = require('node:util') const kReadPromise = Symbol('readPromise') const kOpenPromise = Symbol('openPromise') @@ -35,7 +36,9 @@ class CoreIndexStream extends Readable { #inProgressBitfield #inProgress = 0 #core + /** @type {import('random-access-storage') | undefined} */ #storage + #createStorage #index = 0 /** @type {Set} */ #downloaded = new Set() @@ -45,10 +48,10 @@ class CoreIndexStream extends Readable { #drained = false /** - * @param {import('hypercore')} core - * @param {import('random-access-storage')} storage + * @param {import('hypercore')} core + * @param {(name: string) => import('random-access-storage')} createStorage */ - constructor(core, storage) { + constructor(core, createStorage) { super({ // Treat as object stream, count each object as size `1` so that the // `remaining` property can use the stream buffer to calculate how many @@ -57,8 +60,7 @@ class CoreIndexStream extends Readable { byteLength: () => 1, }) this.#core = core - this.id = core.key.toString('hex') - this.#storage = storage + this.#createStorage = createStorage this[kHandleAppend] = this[kHandleAppend].bind(this) this[kHandleDownload] = this[kHandleDownload].bind(this) } @@ -73,8 +75,8 @@ class CoreIndexStream extends Readable { return this.#drained } - get key() { - return this.#core.key + get core() { + return this.#core } /** @param {any} cb */ @@ -112,12 +114,18 @@ class CoreIndexStream extends Readable { this.#core.removeListener('append', this[kHandleAppend]) this.#core.removeListener('download', this[kHandleDownload]) await this.#indexedBitfield?.flush() + if (this.#storage) await closeStorage(this.#storage) } async [kOpenPromise]() { + await this.#core.ready() + await this.#core.update({ wait: true }) + const { discoveryKey } = this.#core + /* istanbul ignore next: just to keep TS happy - after core.ready() this is set */ + if (!discoveryKey) throw new Error('Missing discovery key') + this.#storage = this.#createStorage(getStorageName(discoveryKey)) this.#indexedBitfield = await Bitfield.open(this.#storage) this.#inProgressBitfield = await new Bitfield() - await this.#core.update({ wait: true }) this.#core.on('append', this[kHandleAppend]) this.#core.on('download', this[kHandleDownload]) } @@ -177,6 +185,8 @@ class CoreIndexStream extends Readable { if (block === null) return false this.#inProgressBitfield?.set(index, true) this.#inProgress++ + /* istanbul ignore next: this should always be set at this point */ + if (!this.#core.key) throw new Error('Missing core key') const entry = { key: this.#core.key, block, index } this.#readBufferAvailable = this.push(entry) return true @@ -196,3 +206,14 @@ class CoreIndexStream extends Readable { } exports.CoreIndexStream = CoreIndexStream + +/** @param {Buffer} discoveryKey */ +function getStorageName(discoveryKey) { + const id = discoveryKey.toString('hex') + return [id.slice(0, 2), id.slice(2, 4), id].join('/') +} + +/** @param {import('random-access-storage')} storage*/ +function closeStorage(storage) { + return promisify(storage.close.bind(storage))() +} diff --git a/lib/multi-core-index-stream.js b/lib/multi-core-index-stream.js index 0c58d1f..638be0b 100644 --- a/lib/multi-core-index-stream.js +++ b/lib/multi-core-index-stream.js @@ -91,7 +91,15 @@ class MultiCoreIndexStream extends Readable { // Do this so that we can remove this listener when we destroy the stream const handleReadableFn = this[kHandleReadable].bind(this, stream) this.#streams.set(stream, handleReadableFn) - this.#streamsById.set(stream.id, stream) + stream.core + .ready() + .then(() => { + const coreKey = stream.core.key + /* istanbul ignore next: this is set after ready */ + if (!coreKey) return + this.#streamsById.set(coreKey.toString('hex'), stream) + }) + .catch(noop) this.#readable.add(stream) stream.on('readable', handleReadableFn) stream.on('indexing', this[kHandleIndexing]) @@ -180,3 +188,6 @@ class MultiCoreIndexStream extends Readable { } exports.MultiCoreIndexStream = MultiCoreIndexStream + +/* istanbul ignore next: TODO add test for adding broken cores */ +function noop() {} diff --git a/package-lock.json b/package-lock.json index dda4159..b64c65b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,7 +14,6 @@ "b4a": "^1.6.4", "big-sparse-array": "^1.0.2", "debug": "^4.3.3", - "hypercore-crypto": "^3.4.0", "random-access-file": "^4.0.4", "streamx": "^2.15.0", "tiny-typed-emitter": "^2.1.0" @@ -991,6 +990,7 @@ "version": "2.1.4", "resolved": "https://registry.npmjs.org/blake2b/-/blake2b-2.1.4.tgz", "integrity": "sha512-AyBuuJNI64gIvwx13qiICz6H6hpmjvYS5DGkG6jbXMOT8Z3WUJ3V1X0FlhIoT1b/5JtHE3ki+xjtMvu1nn+t9A==", + "dev": true, "dependencies": { "blake2b-wasm": "^2.4.0", "nanoassert": "^2.0.0" @@ -1000,6 +1000,7 @@ "version": "2.4.0", "resolved": "https://registry.npmjs.org/blake2b-wasm/-/blake2b-wasm-2.4.0.tgz", "integrity": "sha512-S1kwmW2ZhZFFFOghcx73+ZajEfKBqhP82JMssxtLVMxlaPea1p9uoLiUZ5WYyHn0KddwbLc+0vh4wR0KBNoT5w==", + "dev": true, "dependencies": { "b4a": "^1.0.1", "nanoassert": "^2.0.0" @@ -1337,6 +1338,7 @@ "version": "1.0.4", "resolved": "https://registry.npmjs.org/chacha20-universal/-/chacha20-universal-1.0.4.tgz", "integrity": "sha512-/IOxdWWNa7nRabfe7+oF+jVkGjlr2xUL4J8l/OvzZhj+c9RpMqoo3Dq+5nU1j/BflRV4BKnaQ4+4oH1yBpQG1Q==", + "dev": true, "dependencies": { "nanoassert": "^2.0.0" } @@ -1552,6 +1554,7 @@ "version": "2.11.0", "resolved": "https://registry.npmjs.org/compact-encoding/-/compact-encoding-2.11.0.tgz", "integrity": "sha512-CRfTuyy9Tg7EwxNKvIq3yFIr2JnJLyVr9Yj234VsDCL59hdXcZH3TdzY/2kwbAqVogIoRBJjnNKCEnXbxTIEeg==", + "dev": true, "dependencies": { "b4a": "^1.3.0" } @@ -2901,6 +2904,7 @@ "version": "3.4.0", "resolved": "https://registry.npmjs.org/hypercore-crypto/-/hypercore-crypto-3.4.0.tgz", "integrity": "sha512-0cZA1B58p1J84TDbTh8DMMIj7Qr7Rzz8NyGIo+ykUhdwD21gtjiiLWoME92QvN+lgbfu0Zfr9vwxT8sRmyg+AA==", + "dev": true, "dependencies": { "b4a": "^1.1.0", "compact-encoding": "^2.5.1", @@ -2911,6 +2915,7 @@ "version": "4.0.0", "resolved": "https://registry.npmjs.org/sodium-universal/-/sodium-universal-4.0.0.tgz", "integrity": "sha512-iKHl8XnBV96k1c75gwwzANFdephw/MDWSjQAjPmBE+du0y3P23Q8uf7AcdcfFsYAMwLg7WVBfSAIBtV/JvRsjA==", + "dev": true, "dependencies": { "blake2b": "^2.1.1", "chacha20-universal": "^1.0.4", @@ -4401,7 +4406,8 @@ "node_modules/nanoassert": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/nanoassert/-/nanoassert-2.0.0.tgz", - "integrity": "sha512-7vO7n28+aYO4J+8w96AzhmU8G+Y/xpPDJz/se19ICsqj/momRbb9mh9ZUtkoJ5X3nTnPdhEJyc0qnM6yAsHBaA==" + "integrity": "sha512-7vO7n28+aYO4J+8w96AzhmU8G+Y/xpPDJz/se19ICsqj/momRbb9mh9ZUtkoJ5X3nTnPdhEJyc0qnM6yAsHBaA==", + "dev": true }, "node_modules/nanobench": { "version": "3.0.0", @@ -4467,6 +4473,7 @@ "version": "4.6.0", "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.6.0.tgz", "integrity": "sha512-NTZVKn9IylLwUzaKjkas1e4u2DLNcV4rdYagA4PWdPwW87Bi7z+BznyKSRwS/761tV/lzCGXplWsiaMjLqP2zQ==", + "devOptional": true, "bin": { "node-gyp-build": "bin.js", "node-gyp-build-optional": "optional.js", @@ -5569,6 +5576,7 @@ "version": "1.2.1", "resolved": "https://registry.npmjs.org/sha256-universal/-/sha256-universal-1.2.1.tgz", "integrity": "sha512-ghn3muhdn1ailCQqqceNxRgkOeZSVfSE13RQWEg6njB+itsFzGVSJv+O//2hvNXZuxVIRyNzrgsZ37SPDdGJJw==", + "dev": true, "dependencies": { "b4a": "^1.0.1", "sha256-wasm": "^2.2.1" @@ -5578,6 +5586,7 @@ "version": "2.2.2", "resolved": "https://registry.npmjs.org/sha256-wasm/-/sha256-wasm-2.2.2.tgz", "integrity": "sha512-qKSGARvao+JQlFiA+sjJZhJ/61gmW/3aNLblB2rsgIxDlDxsJPHo8a1seXj12oKtuHVgJSJJ7QEGBUYQN741lQ==", + "dev": true, "dependencies": { "b4a": "^1.0.1", "nanoassert": "^2.0.0" @@ -5587,6 +5596,7 @@ "version": "1.2.1", "resolved": "https://registry.npmjs.org/sha512-universal/-/sha512-universal-1.2.1.tgz", "integrity": "sha512-kehYuigMoRkIngCv7rhgruLJNNHDnitGTBdkcYbCbooL8Cidj/bS78MDxByIjcc69M915WxcQTgZetZ1JbeQTQ==", + "dev": true, "dependencies": { "b4a": "^1.0.1", "sha512-wasm": "^2.3.1" @@ -5596,6 +5606,7 @@ "version": "2.3.3", "resolved": "https://registry.npmjs.org/sha512-wasm/-/sha512-wasm-2.3.3.tgz", "integrity": "sha512-x8McxarTu+Wv5hu5TQE4qgSSGQ3JOX/OmkhLGVty+ajcEN6zLPqTS6Ljt/N6Gl2jI+3y0wjj+RxDxy8Ca9Krfw==", + "dev": true, "dependencies": { "b4a": "^1.0.1", "nanoassert": "^2.0.0" @@ -5730,6 +5741,7 @@ "version": "1.3.0", "resolved": "https://registry.npmjs.org/siphash24/-/siphash24-1.3.0.tgz", "integrity": "sha512-2HvNPUYcmoFks2IX2wI5D80NspdlgSJlCmPencbBMWhrZnNW3sgiC7owaB2Jj/9IiV9bdN2A00LaM8IcojB8rw==", + "dev": true, "dependencies": { "nanoassert": "^2.0.0" } @@ -5778,6 +5790,7 @@ "version": "0.8.0", "resolved": "https://registry.npmjs.org/sodium-javascript/-/sodium-javascript-0.8.0.tgz", "integrity": "sha512-rEBzR5mPxPES+UjyMDvKPIXy9ImF17KOJ32nJNi9uIquWpS/nfj+h6m05J5yLJaGXjgM72LmQoUbWZVxh/rmGg==", + "dev": true, "dependencies": { "blake2b": "^2.1.1", "chacha20-universal": "^1.0.4", @@ -5792,6 +5805,7 @@ "version": "4.0.4", "resolved": "https://registry.npmjs.org/sodium-native/-/sodium-native-4.0.4.tgz", "integrity": "sha512-faqOKw4WQKK7r/ybn6Lqo1F9+L5T6NlBJJYvpxbZPetpWylUVqz449mvlwIBKBqxEHbWakWuOlUt8J3Qpc4sWw==", + "dev": true, "hasInstallScript": true, "dependencies": { "node-gyp-build": "^4.6.0" @@ -8668,7 +8682,8 @@ "node_modules/xsalsa20": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/xsalsa20/-/xsalsa20-1.2.0.tgz", - "integrity": "sha512-FIr/DEeoHfj7ftfylnoFt3rAIRoWXpx2AoDfrT2qD2wtp7Dp+COajvs/Icb7uHqRW9m60f5iXZwdsJJO3kvb7w==" + "integrity": "sha512-FIr/DEeoHfj7ftfylnoFt3rAIRoWXpx2AoDfrT2qD2wtp7Dp+COajvs/Icb7uHqRW9m60f5iXZwdsJJO3kvb7w==", + "dev": true }, "node_modules/xsalsa20-universal": { "version": "1.0.0", @@ -9580,6 +9595,7 @@ "version": "2.1.4", "resolved": "https://registry.npmjs.org/blake2b/-/blake2b-2.1.4.tgz", "integrity": "sha512-AyBuuJNI64gIvwx13qiICz6H6hpmjvYS5DGkG6jbXMOT8Z3WUJ3V1X0FlhIoT1b/5JtHE3ki+xjtMvu1nn+t9A==", + "dev": true, "requires": { "blake2b-wasm": "^2.4.0", "nanoassert": "^2.0.0" @@ -9589,6 +9605,7 @@ "version": "2.4.0", "resolved": "https://registry.npmjs.org/blake2b-wasm/-/blake2b-wasm-2.4.0.tgz", "integrity": "sha512-S1kwmW2ZhZFFFOghcx73+ZajEfKBqhP82JMssxtLVMxlaPea1p9uoLiUZ5WYyHn0KddwbLc+0vh4wR0KBNoT5w==", + "dev": true, "requires": { "b4a": "^1.0.1", "nanoassert": "^2.0.0" @@ -9840,6 +9857,7 @@ "version": "1.0.4", "resolved": "https://registry.npmjs.org/chacha20-universal/-/chacha20-universal-1.0.4.tgz", "integrity": "sha512-/IOxdWWNa7nRabfe7+oF+jVkGjlr2xUL4J8l/OvzZhj+c9RpMqoo3Dq+5nU1j/BflRV4BKnaQ4+4oH1yBpQG1Q==", + "dev": true, "requires": { "nanoassert": "^2.0.0" } @@ -10001,6 +10019,7 @@ "version": "2.11.0", "resolved": "https://registry.npmjs.org/compact-encoding/-/compact-encoding-2.11.0.tgz", "integrity": "sha512-CRfTuyy9Tg7EwxNKvIq3yFIr2JnJLyVr9Yj234VsDCL59hdXcZH3TdzY/2kwbAqVogIoRBJjnNKCEnXbxTIEeg==", + "dev": true, "requires": { "b4a": "^1.3.0" } @@ -11043,6 +11062,7 @@ "version": "3.4.0", "resolved": "https://registry.npmjs.org/hypercore-crypto/-/hypercore-crypto-3.4.0.tgz", "integrity": "sha512-0cZA1B58p1J84TDbTh8DMMIj7Qr7Rzz8NyGIo+ykUhdwD21gtjiiLWoME92QvN+lgbfu0Zfr9vwxT8sRmyg+AA==", + "dev": true, "requires": { "b4a": "^1.1.0", "compact-encoding": "^2.5.1", @@ -11053,6 +11073,7 @@ "version": "4.0.0", "resolved": "https://registry.npmjs.org/sodium-universal/-/sodium-universal-4.0.0.tgz", "integrity": "sha512-iKHl8XnBV96k1c75gwwzANFdephw/MDWSjQAjPmBE+du0y3P23Q8uf7AcdcfFsYAMwLg7WVBfSAIBtV/JvRsjA==", + "dev": true, "requires": { "blake2b": "^2.1.1", "chacha20-universal": "^1.0.4", @@ -12185,7 +12206,8 @@ "nanoassert": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/nanoassert/-/nanoassert-2.0.0.tgz", - "integrity": "sha512-7vO7n28+aYO4J+8w96AzhmU8G+Y/xpPDJz/se19ICsqj/momRbb9mh9ZUtkoJ5X3nTnPdhEJyc0qnM6yAsHBaA==" + "integrity": "sha512-7vO7n28+aYO4J+8w96AzhmU8G+Y/xpPDJz/se19ICsqj/momRbb9mh9ZUtkoJ5X3nTnPdhEJyc0qnM6yAsHBaA==", + "dev": true }, "nanobench": { "version": "3.0.0", @@ -12242,7 +12264,8 @@ "node-gyp-build": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.6.0.tgz", - "integrity": "sha512-NTZVKn9IylLwUzaKjkas1e4u2DLNcV4rdYagA4PWdPwW87Bi7z+BznyKSRwS/761tV/lzCGXplWsiaMjLqP2zQ==" + "integrity": "sha512-NTZVKn9IylLwUzaKjkas1e4u2DLNcV4rdYagA4PWdPwW87Bi7z+BznyKSRwS/761tV/lzCGXplWsiaMjLqP2zQ==", + "devOptional": true }, "node-preload": { "version": "0.2.1", @@ -13101,6 +13124,7 @@ "version": "1.2.1", "resolved": "https://registry.npmjs.org/sha256-universal/-/sha256-universal-1.2.1.tgz", "integrity": "sha512-ghn3muhdn1ailCQqqceNxRgkOeZSVfSE13RQWEg6njB+itsFzGVSJv+O//2hvNXZuxVIRyNzrgsZ37SPDdGJJw==", + "dev": true, "requires": { "b4a": "^1.0.1", "sha256-wasm": "^2.2.1" @@ -13110,6 +13134,7 @@ "version": "2.2.2", "resolved": "https://registry.npmjs.org/sha256-wasm/-/sha256-wasm-2.2.2.tgz", "integrity": "sha512-qKSGARvao+JQlFiA+sjJZhJ/61gmW/3aNLblB2rsgIxDlDxsJPHo8a1seXj12oKtuHVgJSJJ7QEGBUYQN741lQ==", + "dev": true, "requires": { "b4a": "^1.0.1", "nanoassert": "^2.0.0" @@ -13119,6 +13144,7 @@ "version": "1.2.1", "resolved": "https://registry.npmjs.org/sha512-universal/-/sha512-universal-1.2.1.tgz", "integrity": "sha512-kehYuigMoRkIngCv7rhgruLJNNHDnitGTBdkcYbCbooL8Cidj/bS78MDxByIjcc69M915WxcQTgZetZ1JbeQTQ==", + "dev": true, "requires": { "b4a": "^1.0.1", "sha512-wasm": "^2.3.1" @@ -13128,6 +13154,7 @@ "version": "2.3.3", "resolved": "https://registry.npmjs.org/sha512-wasm/-/sha512-wasm-2.3.3.tgz", "integrity": "sha512-x8McxarTu+Wv5hu5TQE4qgSSGQ3JOX/OmkhLGVty+ajcEN6zLPqTS6Ljt/N6Gl2jI+3y0wjj+RxDxy8Ca9Krfw==", + "dev": true, "requires": { "b4a": "^1.0.1", "nanoassert": "^2.0.0" @@ -13252,6 +13279,7 @@ "version": "1.3.0", "resolved": "https://registry.npmjs.org/siphash24/-/siphash24-1.3.0.tgz", "integrity": "sha512-2HvNPUYcmoFks2IX2wI5D80NspdlgSJlCmPencbBMWhrZnNW3sgiC7owaB2Jj/9IiV9bdN2A00LaM8IcojB8rw==", + "dev": true, "requires": { "nanoassert": "^2.0.0" } @@ -13284,6 +13312,7 @@ "version": "0.8.0", "resolved": "https://registry.npmjs.org/sodium-javascript/-/sodium-javascript-0.8.0.tgz", "integrity": "sha512-rEBzR5mPxPES+UjyMDvKPIXy9ImF17KOJ32nJNi9uIquWpS/nfj+h6m05J5yLJaGXjgM72LmQoUbWZVxh/rmGg==", + "dev": true, "requires": { "blake2b": "^2.1.1", "chacha20-universal": "^1.0.4", @@ -13298,6 +13327,7 @@ "version": "4.0.4", "resolved": "https://registry.npmjs.org/sodium-native/-/sodium-native-4.0.4.tgz", "integrity": "sha512-faqOKw4WQKK7r/ybn6Lqo1F9+L5T6NlBJJYvpxbZPetpWylUVqz449mvlwIBKBqxEHbWakWuOlUt8J3Qpc4sWw==", + "dev": true, "requires": { "node-gyp-build": "^4.6.0" } @@ -15287,7 +15317,8 @@ "xsalsa20": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/xsalsa20/-/xsalsa20-1.2.0.tgz", - "integrity": "sha512-FIr/DEeoHfj7ftfylnoFt3rAIRoWXpx2AoDfrT2qD2wtp7Dp+COajvs/Icb7uHqRW9m60f5iXZwdsJJO3kvb7w==" + "integrity": "sha512-FIr/DEeoHfj7ftfylnoFt3rAIRoWXpx2AoDfrT2qD2wtp7Dp+COajvs/Icb7uHqRW9m60f5iXZwdsJJO3kvb7w==", + "dev": true }, "xsalsa20-universal": { "version": "1.0.0", diff --git a/package.json b/package.json index dc9ab30..6417e34 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,6 @@ "b4a": "^1.6.4", "big-sparse-array": "^1.0.2", "debug": "^4.3.3", - "hypercore-crypto": "^3.4.0", "random-access-file": "^4.0.4", "streamx": "^2.15.0", "tiny-typed-emitter": "^2.1.0" diff --git a/test/multi-core-indexer.test.js b/test/multi-core-indexer.test.js index bd74ef6..62696dc 100644 --- a/test/multi-core-indexer.test.js +++ b/test/multi-core-indexer.test.js @@ -10,6 +10,7 @@ const { sortEntries, } = require('./helpers') const { testKeypairs, expectedStorageNames } = require('./fixtures.js') +const Hypercore = require('hypercore') /** @typedef {import('../lib/types').Entry<'binary'>} Entry */ @@ -539,5 +540,50 @@ test('Consistent storage folders', async (t) => { for (const keyPair of testKeypairs.slice(5)) { indexer.addCore(await create({ keyPair })) } + await indexer.idle() t.same(storageNames.sort(), expectedStorageNames) }) + +test('Works with non-ready cores', async (t) => { + /** @type {Hypercore[]} */ + const cores = [] + for (let i = 0; i < 5; i++) { + cores.push(new Hypercore(() => new ram())) + } + const indexer = new MultiCoreIndexer(cores, { + batch: async () => {}, + storage: () => new ram(), + }) + t.equal(indexer.state.current, 'indexing') + await indexer.idle() + t.pass('indexer.idle() resolves') + await indexer.close() +}) + +test('Indexes all items already in a core - cores not ready', async (t) => { + /** @type {Hypercore[]} */ + const cores = [] + /** @type {Array>} */ + const storages = [] + for (let i = 0; i < 5; i++) { + const storage = ram.reusable() + storages.push(storage) + cores.push(new Hypercore(storage)) + } + const expected = await generateFixtures(cores, 100) + await Promise.all(cores.map((core) => core.close())) + for (let i = 0; i < 5; i++) { + cores[i] = new Hypercore(storages[i]) + } + /** @type {Entry[]} */ + const entries = [] + const indexer = new MultiCoreIndexer(cores, { + batch: async (data) => { + entries.push(...data) + }, + storage: () => new ram(), + }) + await indexer.idle() + t.same(sortEntries(entries), sortEntries(expected)) + await indexer.close() +}) diff --git a/test/unit-tests/core-index-stream.test.js b/test/unit-tests/core-index-stream.test.js index af494bf..fa264de 100644 --- a/test/unit-tests/core-index-stream.test.js +++ b/test/unit-tests/core-index-stream.test.js @@ -9,11 +9,25 @@ const { generateFixture, throttledDrain: throttledIdle, } = require('../helpers') +const Hypercore = require('hypercore') -test('hypercore key', async (t) => { +test('stream.core', async (t) => { const a = await create() - const stream = new CoreIndexStream(a, new ram()) - t.same(stream.key, a.key) + const stream = new CoreIndexStream(a, () => new ram()) + t.same(stream.core, a) +}) + +test('destroy before open', async (t) => { + let storageCreated = false + function createStorage() { + storageCreated = true + return new ram() + } + const a = new Hypercore(() => new ram()) + const stream = new CoreIndexStream(a, createStorage) + stream.destroy() + await once(stream, 'close') + t.equal(storageCreated, false, 'storage never created') }) test('Indexes all items already in a core', async (t) => { @@ -23,7 +37,7 @@ test('Indexes all items already in a core', async (t) => { await a.append(blocks) /** @type {any[]} */ const entries = [] - const stream = new CoreIndexStream(a, new ram()) + const stream = new CoreIndexStream(a, () => new ram()) stream.on('data', (entry) => entries.push(entry)) await once(stream, 'drained') t.same(entries, expected) @@ -31,7 +45,7 @@ test('Indexes all items already in a core', async (t) => { test("Empty core emits 'drained' event", async (t) => { const a = await create() - const stream = new CoreIndexStream(a, new ram()) + const stream = new CoreIndexStream(a, () => new ram()) stream.resume() stream.on('indexing', t.fail) await once(stream, 'drained') @@ -46,7 +60,7 @@ test('.remaining property is accurate', async (t) => { await a.append(blocks) /** @type {any[]} */ const entries = [] - const stream = new CoreIndexStream(a, new ram()) + const stream = new CoreIndexStream(a, () => new ram()) t.equal(stream.remaining, totalBlocks) stream.on('data', (entry) => { entries.push(entry) @@ -63,7 +77,7 @@ test('Indexes items appended after initial index', async (t) => { const blocks = generateFixture(0, 10) /** @type {any[]} */ const entries = [] - const stream = new CoreIndexStream(a, new ram()) + const stream = new CoreIndexStream(a, () => new ram()) stream.on('data', (entry) => entries.push(entry)) await once(stream, 'drained') t.same(entries, [], 'no entries before append') @@ -84,7 +98,7 @@ test('Readable stream from sparse hypercore', async (t) => { const range = b.download({ start: 5, end: 20 }) await range.downloaded() - const stream = new CoreIndexStream(b, new ram()) + const stream = new CoreIndexStream(b, () => new ram()) /** @type {Buffer[]} */ const entries = [] stream.on('data', (entry) => entries.push(entry.block)) @@ -108,7 +122,7 @@ test("'indexing' and 'drained' events are paired", async (t) => { replicate(a, b, t) - const stream = new CoreIndexStream(b, new ram()) + const stream = new CoreIndexStream(b, () => new ram()) let indexingEvents = 0 let idleEvents = 0 stream.on('indexing', () => { @@ -140,7 +154,7 @@ test('Appends from a replicated core are indexed', async (t) => { const range1 = b.download({ start: 0, end: b.length }) await range1.downloaded() - const stream = new CoreIndexStream(b, new ram()) + const stream = new CoreIndexStream(b, () => new ram()) /** @type {Buffer[]} */ const entries = [] stream.on('data', (entry) => entries.push(entry.block)) @@ -160,7 +174,7 @@ test('Maintains index state', async (t) => { const a = await create() /** @type {any[]} */ const entries = [] - const storage = new ram() + const storage = ram.reusable() const stream1 = new CoreIndexStream(a, storage) stream1.on('data', (entry) => { entries.push(entry.block) diff --git a/test/unit-tests/multi-core-index-stream.test.js b/test/unit-tests/multi-core-index-stream.test.js index d3699ff..d06bcfe 100644 --- a/test/unit-tests/multi-core-index-stream.test.js +++ b/test/unit-tests/multi-core-index-stream.test.js @@ -1,3 +1,4 @@ +// @ts-check const { CoreIndexStream } = require('../../lib/core-index-stream') const { MultiCoreIndexStream } = require('../../lib/multi-core-index-stream') const { test } = require('tap') @@ -16,7 +17,9 @@ const { test('Indexes all items already in a core', async (t) => { const cores = await createMultiple(5) const expected = await generateFixtures(cores, 1000) - const indexStreams = cores.map((core) => new CoreIndexStream(core, new ram())) + const indexStreams = cores.map( + (core) => new CoreIndexStream(core, () => new ram()) + ) const entries = [] const stream = new MultiCoreIndexStream(indexStreams) const ws = new Writable({ @@ -40,7 +43,9 @@ test('Indexes all items already in a core', async (t) => { test('Adding index streams after initialization', async (t) => { const cores = await createMultiple(3) const expected = await generateFixtures(cores, 100) - const indexStreams = cores.map((core) => new CoreIndexStream(core, new ram())) + const indexStreams = cores.map( + (core) => new CoreIndexStream(core, () => new ram()) + ) const entries = [] const stream = new MultiCoreIndexStream(indexStreams.slice(0, 2)) stream.addStream(indexStreams[2]) @@ -69,7 +74,9 @@ test('.remaining is as expected', async (t) => { const blockCount = 100 const cores = await createMultiple(coreCount) const expected = await generateFixtures(cores, blockCount) - const indexStreams = cores.map((core) => new CoreIndexStream(core, new ram())) + const indexStreams = cores.map( + (core) => new CoreIndexStream(core, () => new ram()) + ) const entries = [] const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 }) const ws = new Writable({ @@ -100,7 +107,9 @@ test('.remaining is as expected', async (t) => { test('Indexes items appended after initial index', async (t) => { const cores = await createMultiple(5) - const indexStreams = cores.map((core) => new CoreIndexStream(core, new ram())) + const indexStreams = cores.map( + (core) => new CoreIndexStream(core, () => new ram()) + ) const entries = [] const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 }) stream.on('data', (entry) => entries.push(entry)) @@ -132,7 +141,7 @@ test('index sparse hypercores', async (t) => { for (const core of remoteCores) { const range = core.download({ start: 5, end: 20 }) await range.downloaded() - indexStreams.push(new CoreIndexStream(core, new ram())) + indexStreams.push(new CoreIndexStream(core, () => new ram())) } const entries = [] const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 }) @@ -163,7 +172,7 @@ test('Appends from a replicated core are indexed', async (t) => { await remote.update({ wait: true }) const range = remote.download({ start: 0, end: remote.length }) await range.downloaded() - indexStreams.push(new CoreIndexStream(core, new ram())) + indexStreams.push(new CoreIndexStream(core, () => new ram())) } const entries = [] const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 }) @@ -188,7 +197,7 @@ test('Maintains index state', async (t) => { const entries = [] for (const core of cores) { - const storage = new ram() + const storage = ram.reusable() storages.push(storage) const indexStream = new CoreIndexStream(core, storage) indexStream.on('data', ({ index }) => { diff --git a/vendor/types/hypercore-crypto.d.ts b/vendor/types/hypercore-crypto.d.ts deleted file mode 100644 index 75de99d..0000000 --- a/vendor/types/hypercore-crypto.d.ts +++ /dev/null @@ -1,3 +0,0 @@ -declare module 'hypercore-crypto' { - export function discoveryKey(key: Buffer): Buffer -}