Skip to content

Commit

Permalink
stream: prevent dead lock when Duplex generator is "thrown"
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieusieben committed Dec 17, 2024
1 parent 5ad2ca9 commit d8d8b42
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 11 deletions.
46 changes: 36 additions & 10 deletions lib/internal/streams/duplexify.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ module.exports = function duplexify(body, name) {
};

function fromAsyncGen(fn, destructor) {
let { promise, resolve } = PromiseWithResolvers();
let { promise, resolve, reject } = PromiseWithResolvers();
const ac = new AbortController();
const signal = ac.signal;

Expand All @@ -231,7 +231,7 @@ function fromAsyncGen(fn, destructor) {
if (done) return;
if (signal.aborted)
throw new AbortError(undefined, { cause: signal.reason });
({ promise, resolve } = PromiseWithResolvers());
({ promise, resolve, reject } = PromiseWithResolvers());
// Next line will "break" the loop if the generator is returned/thrown.
yield chunk;
}
Expand All @@ -242,6 +242,13 @@ function fromAsyncGen(fn, destructor) {
try {
return await originalReturn.call(this, value);
} finally {
if (resolve) {
const _resolve = resolve;
resolve = null;
reject = null;
_resolve({ done: true, cb: () => {} });
}

if (promise) {
const _promise = promise;
promise = null;
Expand All @@ -258,13 +265,22 @@ function fromAsyncGen(fn, destructor) {
try {
return await originalThrow.call(this, err);
} finally {
// asyncGenerator.throw(undefined) should cause a callback error
const error = err || new AbortError();

if (reject) {
const _reject = reject;
reject = null;
resolve = null;
_reject(error);
}

if (promise) {
const _promise = promise;
promise = null;
const { cb } = await _promise;

// asyncGenerator.throw(undefined) should cause a callback error
process.nextTick(cb, err ?? new AbortError());
process.nextTick(cb, error);
}
}
};
Expand All @@ -274,14 +290,24 @@ function fromAsyncGen(fn, destructor) {
return {
value,
write(chunk, encoding, cb) {
const _resolve = resolve;
resolve = null;
_resolve({ chunk, done: false, cb });
if (resolve) {
const _resolve = resolve;
resolve = null;
reject = null;
_resolve({ chunk, done: false, cb });
} else {
cb(new AbortError());
}
},
final(cb) {
const _resolve = resolve;
resolve = null;
_resolve({ done: true, cb });
if (resolve) {
const _resolve = resolve;
resolve = null;
reject = null;
_resolve({ done: true, cb });
} else {
cb(new AbortError());
}
},
destroy(err, cb) {
ac.abort();
Expand Down
29 changes: 28 additions & 1 deletion test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -1682,7 +1682,7 @@ tmpdir.refresh();
}

{
// See https://github.com/nodejs/node/issues/51540 for the following 2 tests
// See https://github.com/nodemakejs/node/issues/51540 for the following 2 tests
const src = new Readable();
const dst = new Writable({
destroy(error, cb) {
Expand Down Expand Up @@ -1723,3 +1723,30 @@ tmpdir.refresh();
});
src.destroy(new Error('problem'));
}

{
async function* myAsyncGenerator(ag) {
for await (const data of ag) {
yield data;
}
}

const duplexStream = Duplex.from(myAsyncGenerator);

const r = new Readable({
read() {
this.push('data1\n');
throw new Error('booom');
},
});

const w = new Writable({
write(chunk, encoding, callback) {
callback();
},
});

pipeline(r, duplexStream, w, common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('booom'));
}));
}

0 comments on commit d8d8b42

Please sign in to comment.