diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index 76347cf5579448..aa2da92e1d80af 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -218,7 +218,7 @@ module.exports = function duplexify(body, name) { }; function fromAsyncGen(fn, destructor) { - let { promise, resolve } = PromiseWithResolvers(); + let { promise, resolve, reject } = PromiseWithResolvers(); const ac = new AbortController(); const signal = ac.signal; @@ -231,7 +231,7 @@ function fromAsyncGen(fn, destructor) { if (done) return; if (signal.aborted) throw new AbortError(undefined, { cause: signal.reason }); - ({ promise, resolve } = PromiseWithResolvers()); + ({ promise, resolve, reject } = PromiseWithResolvers()); // Next line will "break" the loop if the generator is returned/thrown. yield chunk; } @@ -242,6 +242,13 @@ function fromAsyncGen(fn, destructor) { try { return await originalReturn.call(this, value); } finally { + if (resolve) { + const _resolve = resolve; + resolve = null; + reject = null; + _resolve({ done: true, cb: () => {} }); + } + if (promise) { const _promise = promise; promise = null; @@ -258,13 +265,22 @@ function fromAsyncGen(fn, destructor) { try { return await originalThrow.call(this, err); } finally { + // asyncGenerator.throw(undefined) should cause a callback error + const error = err || new AbortError(); + + if (reject) { + const _reject = reject; + reject = null; + resolve = null; + _reject(error); + } + if (promise) { const _promise = promise; promise = null; const { cb } = await _promise; - // asyncGenerator.throw(undefined) should cause a callback error - process.nextTick(cb, err ?? new AbortError()); + process.nextTick(cb, error); } } }; @@ -274,14 +290,24 @@ function fromAsyncGen(fn, destructor) { return { value, write(chunk, encoding, cb) { - const _resolve = resolve; - resolve = null; - _resolve({ chunk, done: false, cb }); + if (resolve) { + const _resolve = resolve; + resolve = null; + reject = null; + _resolve({ chunk, done: false, cb }); + } else { + cb(new AbortError()); + } }, final(cb) { - const _resolve = resolve; - resolve = null; - _resolve({ done: true, cb }); + if (resolve) { + const _resolve = resolve; + resolve = null; + reject = null; + _resolve({ done: true, cb }); + } else { + cb(new AbortError()); + } }, destroy(err, cb) { ac.abort(); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 2ee323a934606e..1cbff5bd70a819 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1682,7 +1682,7 @@ tmpdir.refresh(); } { - // See https://github.com/nodejs/node/issues/51540 for the following 2 tests + // See https://github.com/nodemakejs/node/issues/51540 for the following 2 tests const src = new Readable(); const dst = new Writable({ destroy(error, cb) { @@ -1723,3 +1723,30 @@ tmpdir.refresh(); }); src.destroy(new Error('problem')); } + +{ + async function* myAsyncGenerator(ag) { + for await (const data of ag) { + yield data; + } + } + + const duplexStream = Duplex.from(myAsyncGenerator); + + const r = new Readable({ + read() { + this.push('data1\n'); + throw new Error('booom'); + }, + }); + + const w = new Writable({ + write(chunk, encoding, callback) { + callback(); + }, + }); + + pipeline(r, duplexStream, w, common.mustCall((err) => { + assert.deepStrictEqual(err, new Error('booom')); + })); +}