From c1cfe024b826ba2b1457d594dafab3f916cb9812 Mon Sep 17 00:00:00 2001 From: Graham Fisher Date: Sat, 18 May 2024 20:17:55 -0700 Subject: [PATCH 1/6] add pMapIterable preserveOrder option, tests, and docs --- index.d.ts | 33 +++++++++ index.js | 166 ++++++++++++++++++++++++++++++++++---------- readme.md | 33 +++++++++ test.js | 197 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 388 insertions(+), 41 deletions(-) diff --git a/index.d.ts b/index.d.ts index 075093a..10dbf17 100644 --- a/index.d.ts +++ b/index.d.ts @@ -53,6 +53,39 @@ export type IterableOptions = BaseOptions & { Default: `options.concurrency` */ readonly backpressure?: number; + + /** + Whether the output iterable should produce the results of the `mapper` on elements of the `input` iterable in the same order as the elements were produced. + + If `false`, `mapper` results will be produced in the order they are available, which may not match the order the `mapper` inputs were produced by the `input` iterable, but may improve throughput. + + @example + ``` + import {pMapIterable} from 'p-map'; + import delay from 'delay'; + + const orderPreservingIterator = pMapIterable([[1, 100], [2, 10], [3, 5]], async ([value, delayMs]) => { + await delay(delayMs); + return value; + }, {concurrency: 2, preserveOrder: true}); + // t=0ms + await orderPreservingIterator.next(); // 1 produced at t=100ms + await orderPreservingIterator.next(); // 2 produced at t=100ms + await orderPreservingIterator.next(); // 3 produced at t=105ms + + const throughputOptimizingIterator = pMapIterable([[1, 100], [2, 10], [3, 5]], async ([value, delayMs]) => { + await delay(delayMs); + return value; + }, {concurrency: 2, preserveOrder: false}); + // t=0ms + await throughputOptimizingIterator.next(); // 2 produced at t=10ms + await throughputOptimizingIterator.next(); // 3 produced at t=15ms + await throughputOptimizingIterator.next(); // 1 produced at t=100ms + ``` + + @default `true` + */ + readonly preserveOrder?: boolean; }; type MaybePromise = T | Promise; diff --git a/index.js b/index.js index 2f7d91c..38fb136 100644 --- a/index.js +++ b/index.js @@ -168,6 +168,7 @@ export function pMapIterable( { concurrency = Number.POSITIVE_INFINITY, backpressure = concurrency, + preserveOrder = true, } = {}, ) { if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) { @@ -186,84 +187,177 @@ export function pMapIterable( throw new TypeError(`Expected \`backpressure\` to be an integer from \`concurrency\` (${concurrency}) and up or \`Infinity\`, got \`${backpressure}\` (${typeof backpressure})`); } + if (typeof preserveOrder !== 'boolean') { + throw new TypeError(`Expected \`preserveOrder\` to be a boolean, got \`${preserveOrder}\` (${typeof preserveOrder})`); + } + return { async * [Symbol.asyncIterator]() { const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator](); const promises = []; + const promisesIndexFromInputIndex = {}; + const inputIndexFromPromisesIndex = []; let runningMappersCount = 0; let isDone = false; - let index = 0; + let inputIndex = 0; + let outputIndex = 0; // Only used when `preserveOrder: false` + + const nextPromise = preserveOrder + // Treat `promises` as a queue + ? () => { + // May be undefined bc of `pMapSkip`s + while (promisesIndexFromInputIndex[outputIndex] === undefined) { + outputIndex += 1; + } - function trySpawn() { - if (isDone || !(runningMappersCount < concurrency && promises.length < backpressure)) { - return; + return promises[promisesIndexFromInputIndex[outputIndex++]]; } + // Treat `promises` as a pool (order doesn't matter) + : () => Promise.race(promises); + + function popPromise(inputIndex) { + // Swap the fulfilled promise with the last element to avoid an O(n) shift to the `promises` array + const tail = promises.pop(); + const tailInputIndex = inputIndexFromPromisesIndex.pop(); + const promisesIndex = promisesIndexFromInputIndex[inputIndex]; + delete promisesIndexFromInputIndex[inputIndex]; + + if (promisesIndex !== promises.length) { + promises[promisesIndex] = tail; + inputIndexFromPromisesIndex[promisesIndex] = tailInputIndex; + promisesIndexFromInputIndex[tailInputIndex] = promisesIndex; + } + } - const promise = (async () => { - const {done, value} = await iterator.next(); + async function mapNext(promisesIndex) { + let next = iterator.next(); + + const myInputIndex = inputIndex++; // Save this promise's index before `trySpawn`ing others + runningMappersCount++; + promisesIndexFromInputIndex[myInputIndex] = promisesIndex; + inputIndexFromPromisesIndex[promisesIndex] = myInputIndex; + + if (isPromiseLike(next)) { + // Optimization: if our concurrency and/or backpressure is bounded (so that we won't infinitely recurse), + // and we need to `await` the next `iterator` element, we first eagerly spawn more `mapNext` promises, + // so that these promises can begin `await`ing their respective `iterator` elements (if needed) and `mapper` results in parallel. + // This may entail memory usage closer to `options.backpressure` than necessary, but this space was already allocated to `pMapIterable` via + // `options.concurrency` and `options.backpressure`. + // This may also cause iteration well past the end of the `iterator`: we don't inspect the `iterator`'s response before `trySpawn`ing + // (because we are `trySpawn`ing before `await`ing the response), which will request the next `iterator` element, so we may end up spawning many promises which resolve to `done`. + // However, the time needed to `await` and ignore these `done` promises is presumed to be small relative to the time needed to perform common + // `async` operations like disk reads, network requests, etc. + // Overall, this can reduce the total time taken to process all elements. + if (backpressure !== Number.POSITIVE_INFINITY) { + // Spawn if still below concurrency and backpressure limit + trySpawn(); + } - if (done) { - return {done: true}; + try { + next = await next; + } catch (error) { + isDone = true; + return {result: {error}, inputIndex: myInputIndex}; } + } - runningMappersCount++; + let {done, value} = next; - // Spawn if still below concurrency and backpressure limit - trySpawn(); + if (done) { + isDone = true; + return {result: {done: true}, inputIndex: myInputIndex}; + } - try { - const returnValue = await mapper(await value, index++); + // Spawn if still below concurrency and backpressure limit + trySpawn(); - runningMappersCount--; + let returnValue; + try { + if (isPromiseLike(value)) { + value = await value; + } - if (returnValue === pMapSkip) { - const index = promises.indexOf(promise); + returnValue = mapper(value, myInputIndex); + if (isPromiseLike(returnValue)) { + returnValue = await returnValue; + } + } catch (error) { + isDone = true; + return {result: {error}, inputIndex: myInputIndex}; + } - if (index > 0) { - promises.splice(index, 1); - } - } + runningMappersCount--; + + if (returnValue === pMapSkip) { + // If `preserveOrder: true`, resolve to the next inputIndex's promise, in case we are already being `await`ed + // NOTE: no chance that `myInputIndex + 1`-spawning code is waiting to be executed in another part of the event loop, + // but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly `mapNext` and + // this potentially-currently-awaited promise resolves to the result of mapping a later element than a different member of + // `promises`, i.e. `promises` resolve out of order), because all `trySpawn`/`mapNext` calls execute the bookkeeping synchronously, + // before any `await`s. + if (preserveOrder && promisesIndexFromInputIndex[myInputIndex + 1] !== undefined) { + popPromise(myInputIndex); + return promises[promisesIndexFromInputIndex[myInputIndex + 1]]; + } - // Spawn if still below backpressure limit and just dropped below concurrency limit - trySpawn(); + // Otherwise, start mapping the next input element + delete promisesIndexFromInputIndex[myInputIndex]; + // Not necessary to `delete inputIndexFromPromisesIndex[promisesIndex]` since `inputIndexFromPromisesIndex[promisesIndex]` is only used + // when this promise resolves, but by that point this recursive `mapNext(promisesIndex)` call will have necessarily overwritten it. + return mapNext(promisesIndex); + } - return {done: false, value: returnValue}; - } catch (error) { - isDone = true; - return {error}; - } - })(); + // Spawn if still below backpressure limit and just dropped below concurrency limit + trySpawn(); + + return {result: {value: returnValue}, inputIndex: myInputIndex}; + } + + function trySpawn() { + if (isDone || !(runningMappersCount < concurrency && promises.length < backpressure)) { + return; + } - promises.push(promise); + // Reserve index in `promises` array: we don't actually have the promise to save yet, + // but we don't want recursive `trySpawn` calls to use this same index. + // This is safe (i.e., the empty slot won't be `await`ed) because we replace the value immediately, + // without yielding to the event loop, so no consumers (namely `getAndRemoveFromPoolNextPromise`) + // can observe the intermediate state. + const promisesIndex = promises.length++; + promises[promisesIndex] = mapNext(promisesIndex); } trySpawn(); while (promises.length > 0) { - const {error, done, value} = await promises[0]; // eslint-disable-line no-await-in-loop - - promises.shift(); + const {result: {error, done, value}, inputIndex} = await nextPromise();// eslint-disable-line no-await-in-loop + popPromise(inputIndex); if (error) { throw error; } if (done) { + // When `preserveOrder: false`, ignore to consume any remaining pending promises in the pool + if (!preserveOrder) { + continue; + } + return; } // Spawn if just dropped below backpressure limit and below the concurrency limit trySpawn(); - if (value === pMapSkip) { - continue; - } - yield value; } }, }; } +function isPromiseLike(p) { + return typeof p === 'object' && p !== null && 'then' in p && typeof p.then === 'function'; +} + export const pMapSkip = Symbol('skip'); diff --git a/readme.md b/readme.md index bdc3f36..1ad0a7d 100644 --- a/readme.md +++ b/readme.md @@ -92,6 +92,39 @@ Maximum number of promises returned by `mapper` that have resolved but not yet c Useful whenever you are consuming the iterable slower than what the mapper function can produce concurrently. For example, to avoid making an overwhelming number of HTTP requests if you are saving each of the results to a database. +##### preserveOrder + +**Only for `pMapIterable`** + +Type: `boolean`\ +Default: `true` + +Whether the output iterable should produce the results of the `mapper` on elements of the `input` iterable in the same order as the elements were produced. +If `false`, `mapper` results will be produced in the order they are available, which may not match the order the `mapper` inputs were produced by the `input` iterable, but may improve throughput. + +```js +import {pMapIterable} from 'p-map'; +import delay from 'delay'; + +const orderPreservingIterator = pMapIterable([[1, 100], [2, 10], [3, 5]], async ([value, delayMs]) => { + await delay(delayMs); + return value; +}, {concurrency: 2, preserveOrder: true}); +// t=0ms +await orderPreservingIterator.next(); // 1 produced at t=100ms +await orderPreservingIterator.next(); // 2 produced at t=100ms +await orderPreservingIterator.next(); // 3 produced at t=105ms + +const throughputOptimizingIterator = pMapIterable([[1, 100], [2, 10], [3, 5]], async ([value, delayMs]) => { + await delay(delayMs); + return value; +}, {concurrency: 2, preserveOrder: false}); +// t=0ms +await throughputOptimizingIterator.next(); // 2 produced at t=10ms +await throughputOptimizingIterator.next(); // 3 produced at t=15ms +await throughputOptimizingIterator.next(); // 1 produced at t=100ms +``` + ##### stopOnError **Only for `pMap`** diff --git a/test.js b/test.js index 9db3013..4f61cff 100644 --- a/test.js +++ b/test.js @@ -607,9 +607,9 @@ test('pMapIterable - concurrency: 2', async t => { assertInRange(t, times.get(10), {start: 0, end: 50}); assertInRange(t, times.get(20), {start: 0, end: 50}); - assertInRange(t, times.get(30), {start: 200, end: 250}); - assertInRange(t, times.get(40), {start: 300, end: 350}); - assertInRange(t, times.get(50), {start: 300, end: 350}); + assertInRange(t, times.get(30), {start: 195, end: 250}); + assertInRange(t, times.get(40), {start: 295, end: 350}); + assertInRange(t, times.get(50), {start: 295, end: 350}); }); test('pMapIterable - backpressure', async t => { @@ -637,10 +637,197 @@ test('pMapIterable - backpressure', async t => { t.is(currentValue, 40); }); -test('pMapIterable - pMapSkip', async t => { +test('pMapIterable - complex pMapSkip pattern - concurrency 1', async t => { t.deepEqual(await collectAsyncIterable(pMapIterable([ + pMapSkip, 1, + 2, + 3, + pMapSkip, + 4, + 5, + pMapSkip, + pMapSkip, + 6, + 7, + 8, + pMapSkip, + ], async value => value)), [1, 2, 3, 4, 5, 6, 7, 8]); +}); + +test('pMapIterable - complex pMapSkip pattern - concurrency 2', async t => { + t.deepEqual(await collectAsyncIterable(pMapIterable([ + pMapSkip, + 1, + 2, + 3, + pMapSkip, + 4, + 5, + pMapSkip, + pMapSkip, + 6, + 7, + 8, + pMapSkip, + ], async value => value, {concurrency: 2})), [1, 2, 3, 4, 5, 6, 7, 8]); +}); + +test('pMapIterable - complex pMapSkip pattern - concurrency 2 - preserveOrder: false', async t => { + const result = await collectAsyncIterable(pMapIterable([ pMapSkip, + 1, 2, - ], async value => value)), [1, 2]); + 3, + pMapSkip, + 4, + 5, + pMapSkip, + pMapSkip, + 6, + 7, + 8, + pMapSkip, + ], async value => value, {concurrency: 2, preserveOrder: false})); + const resultSet = new Set(result); + t.assert(resultSet.has(1)); + t.assert(resultSet.has(2)); + t.assert(resultSet.has(3)); + t.assert(resultSet.has(4)); + t.assert(resultSet.has(5)); + t.assert(resultSet.has(6)); + t.assert(resultSet.has(7)); + t.assert(resultSet.has(8)); + t.assert(result.length === 8); +}); + +test('pMapIterable - async iterable input', async t => { + const result = await collectAsyncIterable(pMapIterable(new AsyncTestData(sharedInput), mapper)); + t.deepEqual(result, [10, 20, 30]); +}); + +test('pMapIterable - pMapSkip + preserveOrder: true - preserves order, even when next input needs to be awaited', async t => { + const result = await collectAsyncIterable(pMapIterable([1, 2, 3], (_value, index) => { + switch (index) { + case 0: { + return pMapSkip; + } + + case 1: { + return delay(100, {value: 2}); + } + + case 2: { + return 3; + } + + default: { + return undefined; + } + } + }, {concurrency: 2, preserveOrder: true})); + t.deepEqual(result, [2, 3]); +}); + +function * promiseGenerator() { + yield (async () => { + await delay(100); + return 1; + })(); + yield (async () => { + await delay(100); + return 2; + })(); + yield (async () => { + await delay(100); + return 3; + })(); +} + +test('pMapIterable - eager spawn when input iterable returns promise', async t => { + const end = timeSpan(); + await collectAsyncIterable(pMapIterable(promiseGenerator(), value => delay(100, {value}), {concurrency: 3})); + assertInRange(t, end(), {start: 195, end: 250}); +}); + +test('pMapIterable - eager spawn when input iterable returns promise incurs little overhead', async t => { + const end = timeSpan(); + await collectAsyncIterable(pMapIterable(promiseGenerator(), value => delay(100, {value}), {concurrency: 100})); + assertInRange(t, end(), {start: 195, end: 250}); +}); + +test('pMapIterable - preserveOrder: false - yields mappings as they resolve', async t => { + const end = timeSpan(); + const result = await collectAsyncIterable(pMapIterable(sharedInput, mapper, {preserveOrder: false})); + t.deepEqual(result, [30, 20, 10]); + assertInRange(t, end(), {start: 295, end: 350}); +}); + +test('pMapIterable - preserveOrder: false - more complex example', async t => { + t.deepEqual(await collectAsyncIterable(pMapIterable([ + [1, 200], + [2, 100], + [3, 150], + [4, 200], + [5, 100], + [6, 75], + ], mapper, {concurrency: 3, preserveOrder: false})), [2, 3, 1, 5, 6, 4]); +}); + +test('pMapIterable - preserveOrder: false - concurrency: 2', async t => { + const input = [100, 200, 10, 36, 13, 45]; + const times = new Map(); + const end = timeSpan(); + + t.deepEqual(await collectAsyncIterable(pMapIterable(input, value => { + times.set(value, end()); + return delay(value, {value}); + }, {concurrency: 2, backpressure: Number.POSITIVE_INFINITY, preserveOrder: false})), [100, 10, 36, 13, 200, 45]); + + assertInRange(t, times.get(100), {start: 0, end: 50}); + assertInRange(t, times.get(200), {start: 0, end: 50}); + assertInRange(t, times.get(10), {start: times.get(100) + 100 - 5, end: times.get(100) + 100 + 50}); + assertInRange(t, times.get(36), {start: times.get(10) + 10 - 5, end: times.get(10) + 10 + 50}); + assertInRange(t, times.get(13), {start: times.get(36) + 36 - 5, end: times.get(36) + 36 + 50}); + assertInRange(t, times.get(45), {start: times.get(13) + 13 - 5, end: times.get(13) + 13 + 50}); +}); + +test('pMapIterable - preserveOrder: false - backpressure', async t => { + // Adjust from 300 to 250 so timings don't align, to deflake + const adjustedLongerSharedInput = [...longerSharedInput]; + adjustedLongerSharedInput[0] = [longerSharedInput[0][0], 250]; + + let currentValue; + + // Concurrency option is forced by an early check + const asyncIterator = pMapIterable(adjustedLongerSharedInput, async value => { + currentValue = await mapper(value); + return currentValue; + }, {backpressure: 2, concurrency: 2, preserveOrder: false})[Symbol.asyncIterator](); + + const {value: value1} = await asyncIterator.next(); + t.is(value1, 20); + + // If backpressure is not respected, than all items will be evaluated in this time + await delay(600); + + t.is(currentValue, 30); + + const {value: value2} = await asyncIterator.next(); + t.is(value2, 10); + + await delay(100); + + t.is(currentValue, 40); +}); + +test('pMapIterable - preserveOrder: false - throws first error to settle', async t => { + await t.throwsAsync(collectAsyncIterable(pMapIterable([ + [async () => { + throw new Error('foo'); + }, 30], + [() => { + throw new Error('bar'); + }, 10], + ], mapper, {preserveOrder: false, concurrency: 2})), {message: 'bar'}); }); From ba50244cc6fe348e27a4ea4619310415f53c53e6 Mon Sep 17 00:00:00 2001 From: Graham Fisher Date: Wed, 12 Jun 2024 18:21:48 -0700 Subject: [PATCH 2/6] pMapIterable: test sync iterables/mappers throwing (and fix the former) --- index.js | 43 +++++++++++++++++++++---------------------- test.js | 19 +++++++++++++++++-- 2 files changed, 38 insertions(+), 24 deletions(-) diff --git a/index.js b/index.js index 38fb136..741f088 100644 --- a/index.js +++ b/index.js @@ -231,39 +231,38 @@ export function pMapIterable( } async function mapNext(promisesIndex) { - let next = iterator.next(); - const myInputIndex = inputIndex++; // Save this promise's index before `trySpawn`ing others runningMappersCount++; promisesIndexFromInputIndex[myInputIndex] = promisesIndex; inputIndexFromPromisesIndex[promisesIndex] = myInputIndex; - if (isPromiseLike(next)) { - // Optimization: if our concurrency and/or backpressure is bounded (so that we won't infinitely recurse), - // and we need to `await` the next `iterator` element, we first eagerly spawn more `mapNext` promises, - // so that these promises can begin `await`ing their respective `iterator` elements (if needed) and `mapper` results in parallel. - // This may entail memory usage closer to `options.backpressure` than necessary, but this space was already allocated to `pMapIterable` via - // `options.concurrency` and `options.backpressure`. - // This may also cause iteration well past the end of the `iterator`: we don't inspect the `iterator`'s response before `trySpawn`ing - // (because we are `trySpawn`ing before `await`ing the response), which will request the next `iterator` element, so we may end up spawning many promises which resolve to `done`. - // However, the time needed to `await` and ignore these `done` promises is presumed to be small relative to the time needed to perform common - // `async` operations like disk reads, network requests, etc. - // Overall, this can reduce the total time taken to process all elements. - if (backpressure !== Number.POSITIVE_INFINITY) { - // Spawn if still below concurrency and backpressure limit - trySpawn(); - } + let next; + try { + next = iterator.next(); + if (isPromiseLike(next)) { + // Optimization: if our concurrency and/or backpressure is bounded (so that we won't infinitely recurse), + // and we need to `await` the next `iterator` element, we first eagerly spawn more `mapNext` promises, + // so that these promises can begin `await`ing their respective `iterator` elements (if needed) and `mapper` results in parallel. + // This may entail memory usage closer to `options.backpressure` than necessary, but this space was already allocated to `pMapIterable` via + // `options.concurrency` and `options.backpressure`. + // This may also cause iteration well past the end of the `iterator`: we don't inspect the `iterator`'s response before `trySpawn`ing + // (because we are `trySpawn`ing before `await`ing the response), which will request the next `iterator` element, so we may end up spawning many promises which resolve to `done`. + // However, the time needed to `await` and ignore these `done` promises is presumed to be small relative to the time needed to perform common + // `async` operations like disk reads, network requests, etc. + // Overall, this can reduce the total time taken to process all elements. + if (backpressure !== Number.POSITIVE_INFINITY) { + // Spawn if still below concurrency and backpressure limit + trySpawn(); + } - try { next = await next; - } catch (error) { - isDone = true; - return {result: {error}, inputIndex: myInputIndex}; } + } catch (error) { + isDone = true; + return {result: {error}, inputIndex: myInputIndex}; } let {done, value} = next; - if (done) { isDone = true; return {result: {done: true}, inputIndex: myInputIndex}; diff --git a/test.js b/test.js index 4f61cff..40e7fb2 100644 --- a/test.js +++ b/test.js @@ -545,7 +545,7 @@ test('pMapIterable - empty', async t => { t.deepEqual(await collectAsyncIterable(pMapIterable([], mapper)), []); }); -test('pMapIterable - iterable that throws', async t => { +test('pMapIterable - async iterable that throws', async t => { let isFirstNextCall = true; const iterable = { @@ -568,12 +568,27 @@ test('pMapIterable - iterable that throws', async t => { await t.throwsAsync(iterator.next(), {message: 'foo'}); }); -test('pMapIterable - mapper that throws', async t => { +test('pMapIterable - sync iterable that throws', async t => { + function * throwingGenerator() { // eslint-disable-line require-yield + throw new Error('foo'); + } + + const iterator = pMapIterable(throwingGenerator(), mapper)[Symbol.asyncIterator](); + await t.throwsAsync(() => iterator.next(), {message: 'foo'}); +}); + +test('pMapIterable - async mapper that throws', async t => { await t.throwsAsync(collectAsyncIterable(pMapIterable(sharedInput, async () => { throw new Error('foo'); })), {message: 'foo'}); }); +test('pMapIterable - sync mapper that throws', async t => { + await t.throwsAsync(collectAsyncIterable(pMapIterable(sharedInput, () => { + throw new Error('foo'); + })), {message: 'foo'}); +}); + test('pMapIterable - stop on error', async t => { const output = []; From 4b8f367d2b776d490816a9a7bebc636608bfa9cb Mon Sep 17 00:00:00 2001 From: Graham Fisher Date: Sun, 25 Aug 2024 04:31:44 -0400 Subject: [PATCH 3/6] 1. add promiseEmitter to preserveOrder: false Promise.race to address case of: infinite concurrency + async iterable producing >1 element 2. use `!isSyncIterator` as shortcut for `isPromiseLike(next)` (`next` is promise iff iterator is async) 3. add `trySpawn` to the `returnValue === pMapSkip && preserveOrder && (promise mapping next input iterable element is pending` branch 4. add tests for changes (1) and (3) 5. tests `rangeAround` helper 6. extra `pMapSkip` tests 7. test for #76 --- index.js | 63 +++++++++++++++++++++++------ test.js | 121 ++++++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 158 insertions(+), 26 deletions(-) diff --git a/index.js b/index.js index 741f088..3c6c41f 100644 --- a/index.js +++ b/index.js @@ -193,7 +193,8 @@ export function pMapIterable( return { async * [Symbol.asyncIterator]() { - const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator](); + const isSyncIterator = iterable[Symbol.asyncIterator] === undefined; + const iterator = isSyncIterator ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator](); const promises = []; const promisesIndexFromInputIndex = {}; @@ -201,12 +202,32 @@ export function pMapIterable( let runningMappersCount = 0; let isDone = false; let inputIndex = 0; - let outputIndex = 0; // Only used when `preserveOrder: false` + let outputIndex = 0; // Only used when `preserveOrder: true` + + // This event emitter prevents the race conditions that arises when: + // - `preserveOrder: false` + // - `promises` are added after `Promise.race` is invoked, since `Promise.race` only races the promises that existed in its input array at call time + // More specifically, this occurs when (in addition to `preserveOrder: false`): + // - `concurrency === Number.PositiveInfinity && Number.PositiveInfinity === backpressure` + // - this forces us to forgo eagerly filling the `promises` pool to avoid infinite recursion + // - IMO this is the root of this problem, and a problem in and of itself: we should consider requiring a finite concurrency & backpressure + // - given the inability to eagerly filing the `promises` pool with infinite concurrency & backpressure, there are some situations in which specifying + // a finite concurrency & backpressure will be faster than specifying the otherwise faster-sounding infinite concurrency & backpressure + // - an async iterator input iterable + // - `mapNext` can't `trySpawn` until it `await`s its `next`, since the input iterable might be done + // - the initial `trySpawn` thus ends when the execution of `mapNext` is suspended to `await next` + // - the input iterable produces more than one element + // - the (single) running `mapNext`'s `trySpawn` _will necessarily_ (since concurrency and backpressure are infinite) + // start another `mapNext` promise that `trySpawn` adds to `promises` + // - this additional promise does not partake in the already-running `nextPromise`, because its underlying `Promise.race` began without it, + // when the initial `trySpawn` returned and `nextPromise` was invoked from the main loop + const promiseEmitter = new EventTarget(); // Only used when `preserveOrder: false` + const promiseEmitterEvent = 'promiseFulfilled'; const nextPromise = preserveOrder // Treat `promises` as a queue ? () => { - // May be undefined bc of `pMapSkip`s + // May be `undefined` bc of `pMapSkip`s while (promisesIndexFromInputIndex[outputIndex] === undefined) { outputIndex += 1; } @@ -214,7 +235,19 @@ export function pMapIterable( return promises[promisesIndexFromInputIndex[outputIndex++]]; } // Treat `promises` as a pool (order doesn't matter) - : () => Promise.race(promises); + : () => Promise.race([ + // Ensures correctness in the case that mappers resolve between the time that one `await nextPromise()` resolves and the next `nextPromise` call is made + // (these promises would otherwise be lost if an event emitter is not listening - the `promises` pool buffers resolved promises to be processed) + // (I wonder if it may be actually be possible to convert the `preserveOrder: false` case to _exclusively_ event-based, + // but such a solution may get messy since we'd want to `yield` from a callback, likely requiring a resolved promises buffer anyway...) + Promise.race(promises), + // Ensures correctness in the case that more promises are added to `promises` after the initial `nextPromise` call is made + // (these additional promises are not be included in the above `Promise.race`) + // (see comment above `promiseEmitter` declaration for details on when this can occur) + new Promise(resolve => { + promiseEmitter.addEventListener(promiseEmitterEvent, r => resolve(r.detail), {once: true}); + }), + ]); function popPromise(inputIndex) { // Swap the fulfilled promise with the last element to avoid an O(n) shift to the `promises` array @@ -239,7 +272,7 @@ export function pMapIterable( let next; try { next = iterator.next(); - if (isPromiseLike(next)) { + if (!isSyncIterator) { // `!isSyncIterator` iff `isPromiseLike(next)`, but former is already computed // Optimization: if our concurrency and/or backpressure is bounded (so that we won't infinitely recurse), // and we need to `await` the next `iterator` element, we first eagerly spawn more `mapNext` promises, // so that these promises can begin `await`ing their respective `iterator` elements (if needed) and `mapper` results in parallel. @@ -250,6 +283,7 @@ export function pMapIterable( // However, the time needed to `await` and ignore these `done` promises is presumed to be small relative to the time needed to perform common // `async` operations like disk reads, network requests, etc. // Overall, this can reduce the total time taken to process all elements. + // Potential TODO: in the `concurrency === Number.POSITIVE_INFINITY` case, we could potentially still optimize here by eagerly spawning some # of promises. if (backpressure !== Number.POSITIVE_INFINITY) { // Spawn if still below concurrency and backpressure limit trySpawn(); @@ -291,12 +325,15 @@ export function pMapIterable( if (returnValue === pMapSkip) { // If `preserveOrder: true`, resolve to the next inputIndex's promise, in case we are already being `await`ed // NOTE: no chance that `myInputIndex + 1`-spawning code is waiting to be executed in another part of the event loop, - // but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly `mapNext` and - // this potentially-currently-awaited promise resolves to the result of mapping a later element than a different member of - // `promises`, i.e. `promises` resolve out of order), because all `trySpawn`/`mapNext` calls execute the bookkeeping synchronously, - // before any `await`s. + // but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly skip this `if` condition and + // instead call `mapNext`, causing this potentially-currently-awaited promise to resolve to the result of mapping an element + // of the input iterable that was produced later `myInputIndex + 1`, i.e., no chance `promises` resolve out of order, because: + // all `trySpawn`/`mapNext` calls execute their bookkeeping synchronously, before any `await`s, so we cannot observe an intermediate + // state in which input the promise mapping iterable element `myInputIndex + 1` has not been recorded in the `promisesIndexFromInputIndex` ledger. if (preserveOrder && promisesIndexFromInputIndex[myInputIndex + 1] !== undefined) { popPromise(myInputIndex); + // Spawn if still below backpressure limit and just dropped below concurrency limit + trySpawn(); return promises[promisesIndexFromInputIndex[myInputIndex + 1]]; } @@ -321,16 +358,18 @@ export function pMapIterable( // Reserve index in `promises` array: we don't actually have the promise to save yet, // but we don't want recursive `trySpawn` calls to use this same index. // This is safe (i.e., the empty slot won't be `await`ed) because we replace the value immediately, - // without yielding to the event loop, so no consumers (namely `getAndRemoveFromPoolNextPromise`) + // without yielding to the event loop, so no consumers (namely `nextPromise`) // can observe the intermediate state. const promisesIndex = promises.length++; promises[promisesIndex] = mapNext(promisesIndex); + promises[promisesIndex].then(p => promiseEmitter.dispatchEvent(new CustomEvent(promiseEmitterEvent, {detail: p}))); } + // Bootstrap `promises` trySpawn(); while (promises.length > 0) { - const {result: {error, done, value}, inputIndex} = await nextPromise();// eslint-disable-line no-await-in-loop + const {result: {error, done, value}, inputIndex} = await nextPromise(); // eslint-disable-line no-await-in-loop popPromise(inputIndex); if (error) { @@ -338,7 +377,7 @@ export function pMapIterable( } if (done) { - // When `preserveOrder: false`, ignore to consume any remaining pending promises in the pool + // When `preserveOrder: false`, `continue` to consume any remaining pending promises in the pool if (!preserveOrder) { continue; } diff --git a/test.js b/test.js index 40e7fb2..7285d5d 100644 --- a/test.js +++ b/test.js @@ -102,6 +102,10 @@ class ThrowingIterator { } } +function rangeAround(expected) { + return {start: expected - 5, end: expected + 50}; +} + test('main', async t => { const end = timeSpan(); t.deepEqual(await pMap(sharedInput, mapper), [10, 20, 30]); @@ -622,9 +626,9 @@ test('pMapIterable - concurrency: 2', async t => { assertInRange(t, times.get(10), {start: 0, end: 50}); assertInRange(t, times.get(20), {start: 0, end: 50}); - assertInRange(t, times.get(30), {start: 195, end: 250}); - assertInRange(t, times.get(40), {start: 295, end: 350}); - assertInRange(t, times.get(50), {start: 295, end: 350}); + assertInRange(t, times.get(30), rangeAround(200)); + assertInRange(t, times.get(40), rangeAround(300)); + assertInRange(t, times.get(50), rangeAround(300)); }); test('pMapIterable - backpressure', async t => { @@ -716,6 +720,18 @@ test('pMapIterable - complex pMapSkip pattern - concurrency 2 - preserveOrder: f t.assert(result.length === 8); }); +test('pMapIterable - pMapSkip + preserveOrder: true + next input mapping promise pending - eagerly spawns next promise', async t => { + const end = timeSpan(); + const testData = [ + [pMapSkip, 100], + [2, 200], + [3, 100], // Ensure 3 is spawned when pMapSkip ends (otherwise, overall runtime will be 300 ms) + ]; + const result = await collectAsyncIterable(pMapIterable(testData, mapper, {preserveOrder: true, concurrency: 2})); + assertInRange(t, end(), rangeAround(200)); + t.deepEqual(result, [2, 3]); +}); + test('pMapIterable - async iterable input', async t => { const result = await collectAsyncIterable(pMapIterable(new AsyncTestData(sharedInput), mapper)); t.deepEqual(result, [10, 20, 30]); @@ -759,26 +775,47 @@ function * promiseGenerator() { })(); } +// Differs from `AsyncTestData` because it is not a generator: +// each `next` yields a promise that does not wait for previous `next` promises to finish. +const asyncIterableDoingWorkOnEachNext = (start, stop) => { + let i = start; + return { + [Symbol.asyncIterator]() { + return { + async next() { + const me = i++; + if (me > stop) { + return {done: true}; + } + + await delay(100); + return {done: false, value: me}; + }, + }; + }, + }; +}; + test('pMapIterable - eager spawn when input iterable returns promise', async t => { const end = timeSpan(); - await collectAsyncIterable(pMapIterable(promiseGenerator(), value => delay(100, {value}), {concurrency: 3})); - assertInRange(t, end(), {start: 195, end: 250}); + await collectAsyncIterable(pMapIterable(asyncIterableDoingWorkOnEachNext(1, 3), value => value, /* value => delay(100, {value}), */{concurrency: 5})); + assertInRange(t, end(), rangeAround(100)); }); test('pMapIterable - eager spawn when input iterable returns promise incurs little overhead', async t => { const end = timeSpan(); await collectAsyncIterable(pMapIterable(promiseGenerator(), value => delay(100, {value}), {concurrency: 100})); - assertInRange(t, end(), {start: 195, end: 250}); + assertInRange(t, end(), rangeAround(200)); }); test('pMapIterable - preserveOrder: false - yields mappings as they resolve', async t => { const end = timeSpan(); const result = await collectAsyncIterable(pMapIterable(sharedInput, mapper, {preserveOrder: false})); t.deepEqual(result, [30, 20, 10]); - assertInRange(t, end(), {start: 295, end: 350}); + assertInRange(t, end(), rangeAround(300)); }); -test('pMapIterable - preserveOrder: false - more complex example', async t => { +test('pMapIterable - preserveOrder: false - more complex example - sync iterable and bounded concurrency', async t => { t.deepEqual(await collectAsyncIterable(pMapIterable([ [1, 200], [2, 100], @@ -789,6 +826,38 @@ test('pMapIterable - preserveOrder: false - more complex example', async t => { ], mapper, {concurrency: 3, preserveOrder: false})), [2, 3, 1, 5, 6, 4]); }); +test('pMapIterable - preserveOrder: false - more complex example - async iterable and unbounded concurrency', async t => { + const testData = [ + [1, 200], + [2, 125], + [3, 150], + [4, 200], + [5, 100], + [6, 75], + ]; + async function * asyncIterable() { + yield * testData; + } + + t.deepEqual(await collectAsyncIterable(pMapIterable(asyncIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), testData.toSorted(([_aId, aMs], [_bId, bMs]) => aMs - bMs).map(([id, _ms]) => id)); +}); + +test('pMapIterable - preserveOrder: false - more complex example - sync promise-returning iterable and unbounded concurrency', async t => { + const testData = [ + [1, 200], + [2, 125], + [3, 150], + [4, 225], + [5, 100], + [6, 75], + ]; + function * syncPromiseReturningIterable() { + yield * testData.map(d => Promise.resolve(d)); + } + + t.deepEqual(await collectAsyncIterable(pMapIterable(syncPromiseReturningIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), testData.toSorted(([_aId, aMs], [_bId, bMs]) => aMs - bMs).map(([id, _ms]) => id)); +}); + test('pMapIterable - preserveOrder: false - concurrency: 2', async t => { const input = [100, 200, 10, 36, 13, 45]; const times = new Map(); @@ -799,12 +868,12 @@ test('pMapIterable - preserveOrder: false - concurrency: 2', async t => { return delay(value, {value}); }, {concurrency: 2, backpressure: Number.POSITIVE_INFINITY, preserveOrder: false})), [100, 10, 36, 13, 200, 45]); - assertInRange(t, times.get(100), {start: 0, end: 50}); - assertInRange(t, times.get(200), {start: 0, end: 50}); - assertInRange(t, times.get(10), {start: times.get(100) + 100 - 5, end: times.get(100) + 100 + 50}); - assertInRange(t, times.get(36), {start: times.get(10) + 10 - 5, end: times.get(10) + 10 + 50}); - assertInRange(t, times.get(13), {start: times.get(36) + 36 - 5, end: times.get(36) + 36 + 50}); - assertInRange(t, times.get(45), {start: times.get(13) + 13 - 5, end: times.get(13) + 13 + 50}); + assertInRange(t, times.get(100), rangeAround(0)); + assertInRange(t, times.get(200), rangeAround(0)); + assertInRange(t, times.get(10), rangeAround(times.get(100) + 100)); + assertInRange(t, times.get(36), rangeAround(times.get(10) + 10)); + assertInRange(t, times.get(13), rangeAround(times.get(36) + 36)); + assertInRange(t, times.get(45), rangeAround(times.get(13) + 13)); }); test('pMapIterable - preserveOrder: false - backpressure', async t => { @@ -846,3 +915,27 @@ test('pMapIterable - preserveOrder: false - throws first error to settle', async }, 10], ], mapper, {preserveOrder: false, concurrency: 2})), {message: 'bar'}); }); + +test('pMapIterable - {concurrency: 1, backpressure: 2} => no concurrent mappers (#76)', async t => { + const theLog = []; + const log = message => theLog.push(message); + const startLog = n => `${n}: mapper start`; + const endLog = n => `${n}: mapper end`; + + async function * source() { + yield 1; + yield 2; + yield 3; + } + + await collectAsyncIterable(pMapIterable(source(), async n => { + log(startLog(n)); + await delay(100); + log(endLog(n)); + }, { + concurrency: 1, + backpressure: 2, + })); + t.deepEqual(theLog, [startLog(1), endLog(1), startLog(2), endLog(2), startLog(3), endLog(3)]); +}); + From 2d50f7153e614bb226c33aa4b94b9057b75884e7 Mon Sep 17 00:00:00 2001 From: Graham Fisher Date: Sun, 25 Aug 2024 08:57:14 -0400 Subject: [PATCH 4/6] return 'pMapSkip' from the promise once again, after ing to avoid artificial backpressure --- index.js | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/index.js b/index.js index 3c6c41f..01fa2a2 100644 --- a/index.js +++ b/index.js @@ -227,7 +227,7 @@ export function pMapIterable( const nextPromise = preserveOrder // Treat `promises` as a queue ? () => { - // May be `undefined` bc of `pMapSkip`s + // May be `undefined` bc of `pMapSkip`s (which `popPromise` and remove their `promisesIndexFromInputIndex` entries) while (promisesIndexFromInputIndex[outputIndex] === undefined) { outputIndex += 1; } @@ -323,25 +323,9 @@ export function pMapIterable( runningMappersCount--; if (returnValue === pMapSkip) { - // If `preserveOrder: true`, resolve to the next inputIndex's promise, in case we are already being `await`ed - // NOTE: no chance that `myInputIndex + 1`-spawning code is waiting to be executed in another part of the event loop, - // but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly skip this `if` condition and - // instead call `mapNext`, causing this potentially-currently-awaited promise to resolve to the result of mapping an element - // of the input iterable that was produced later `myInputIndex + 1`, i.e., no chance `promises` resolve out of order, because: - // all `trySpawn`/`mapNext` calls execute their bookkeeping synchronously, before any `await`s, so we cannot observe an intermediate - // state in which input the promise mapping iterable element `myInputIndex + 1` has not been recorded in the `promisesIndexFromInputIndex` ledger. - if (preserveOrder && promisesIndexFromInputIndex[myInputIndex + 1] !== undefined) { - popPromise(myInputIndex); - // Spawn if still below backpressure limit and just dropped below concurrency limit - trySpawn(); - return promises[promisesIndexFromInputIndex[myInputIndex + 1]]; - } - - // Otherwise, start mapping the next input element - delete promisesIndexFromInputIndex[myInputIndex]; - // Not necessary to `delete inputIndexFromPromisesIndex[promisesIndex]` since `inputIndexFromPromisesIndex[promisesIndex]` is only used - // when this promise resolves, but by that point this recursive `mapNext(promisesIndex)` call will have necessarily overwritten it. - return mapNext(promisesIndex); + // We `popPromise` ourselves so that we don't eat into the backpressure if we don't get `await`ed/cleanup up by the main loop for a while. + // This is safe because the main loop will forgo `popPromise` when `pMapSkip` is produced. + popPromise(myInputIndex); } // Spawn if still below backpressure limit and just dropped below concurrency limit @@ -361,8 +345,18 @@ export function pMapIterable( // without yielding to the event loop, so no consumers (namely `nextPromise`) // can observe the intermediate state. const promisesIndex = promises.length++; - promises[promisesIndex] = mapNext(promisesIndex); - promises[promisesIndex].then(p => promiseEmitter.dispatchEvent(new CustomEvent(promiseEmitterEvent, {detail: p}))); + + const promise = mapNext(promisesIndex); + promise.then(p => { + promiseEmitter.dispatchEvent(new CustomEvent(promiseEmitterEvent, {detail: p})); + }); + // If the input iterable is sync, produces a non-promise, and maps to a non-promise-wrapped `pMapSkip` (no Promises anywhere), + // then `mapNext` may execute `popPromise` synchronously, which removes the empty array entry we created above and + // deletes information pertaining to `promisesIndex` from the ledgers, only to have us confound the effort by writing + // back into `promises` again here. + if (promises[promisesIndex] === undefined) { + promises[promisesIndex] = promise; + } } // Bootstrap `promises` @@ -370,6 +364,12 @@ export function pMapIterable( while (promises.length > 0) { const {result: {error, done, value}, inputIndex} = await nextPromise(); // eslint-disable-line no-await-in-loop + + if (value === pMapSkip) { + // `mapNext` already called `popPromise` and `trySpawn` upon observing `pMapSkip` + continue; + } + popPromise(inputIndex); if (error) { From 5a8bb4ce1edc5af45194c96dc0e58a2203222d3a Mon Sep 17 00:00:00 2001 From: Graham Fisher Date: Sun, 25 Aug 2024 09:29:48 -0400 Subject: [PATCH 5/6] CustomEvent -> Event (former not available in node18) --- index.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index 01fa2a2..c317be3 100644 --- a/index.js +++ b/index.js @@ -348,7 +348,9 @@ export function pMapIterable( const promise = mapNext(promisesIndex); promise.then(p => { - promiseEmitter.dispatchEvent(new CustomEvent(promiseEmitterEvent, {detail: p})); + const event = new Event(promiseEmitterEvent); + event.detail = p; + promiseEmitter.dispatchEvent(event); }); // If the input iterable is sync, produces a non-promise, and maps to a non-promise-wrapped `pMapSkip` (no Promises anywhere), // then `mapNext` may execute `popPromise` synchronously, which removes the empty array entry we created above and From 6cac3dc20da3d5bf9682b1897e1deada5faf93d6 Mon Sep 17 00:00:00 2001 From: Graham Fisher Date: Sun, 25 Aug 2024 09:38:51 -0400 Subject: [PATCH 6/6] a.toSorted(f) -> structuredClone(a).sort(f) (former not avalailable in node18) --- test.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/test.js b/test.js index 7285d5d..7530da1 100644 --- a/test.js +++ b/test.js @@ -839,7 +839,9 @@ test('pMapIterable - preserveOrder: false - more complex example - async iterabl yield * testData; } - t.deepEqual(await collectAsyncIterable(pMapIterable(asyncIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), testData.toSorted(([_aId, aMs], [_bId, bMs]) => aMs - bMs).map(([id, _ms]) => id)); + const sortedTestData = structuredClone(testData).sort(([_aId, aMs], [_bId, bMs]) => aMs - bMs); + + t.deepEqual(await collectAsyncIterable(pMapIterable(asyncIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), sortedTestData.map(([id, _ms]) => id)); }); test('pMapIterable - preserveOrder: false - more complex example - sync promise-returning iterable and unbounded concurrency', async t => { @@ -855,7 +857,9 @@ test('pMapIterable - preserveOrder: false - more complex example - sync promise- yield * testData.map(d => Promise.resolve(d)); } - t.deepEqual(await collectAsyncIterable(pMapIterable(syncPromiseReturningIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), testData.toSorted(([_aId, aMs], [_bId, bMs]) => aMs - bMs).map(([id, _ms]) => id)); + const sortedTestData = structuredClone(testData).sort(([_aId, aMs], [_bId, bMs]) => aMs - bMs); + + t.deepEqual(await collectAsyncIterable(pMapIterable(syncPromiseReturningIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), sortedTestData.map(([id, _ms]) => id)); }); test('pMapIterable - preserveOrder: false - concurrency: 2', async t => {