From 16e7e45e972866cda44ed07ac3c1163c28a48ed3 Mon Sep 17 00:00:00 2001 From: ehmicky Date: Sun, 1 Sep 2024 17:41:36 +0100 Subject: [PATCH] Add `result.pipedFrom` --- source/context.js | 9 ++-- source/index.d.ts | 2 + source/index.js | 12 +++-- source/index.test-d.ts | 6 +++ source/pipe.js | 18 +++++--- test/pipe.js | 99 ++++++++++++++++++++++++++---------------- 6 files changed, 89 insertions(+), 57 deletions(-) diff --git a/source/context.js b/source/context.js index 0e69cb7..75c9a50 100644 --- a/source/context.js +++ b/source/context.js @@ -1,12 +1,9 @@ import process from 'node:process'; import {stripVTControlCharacters} from 'node:util'; -export const getContext = ({start, command}, raw) => ({ - start: start ?? process.hrtime.bigint(), - command: [ - command, - raw.map(part => getCommandPart(stripVTControlCharacters(part))).join(' '), - ].filter(Boolean).join(' | '), +export const getContext = raw => ({ + start: process.hrtime.bigint(), + command: raw.map(part => getCommandPart(stripVTControlCharacters(part))).join(' '), state: {stdout: '', stderr: '', output: ''}, }); diff --git a/source/index.d.ts b/source/index.d.ts index 7a1f382..15cb4c8 100644 --- a/source/index.d.ts +++ b/source/index.d.ts @@ -29,6 +29,8 @@ export type Result = { command: string; durationMs: number; + + pipedFrom?: Result | SubprocessError; }; export type SubprocessError = Error & Result & { diff --git a/source/index.js b/source/index.js index c37f00e..ddb0536 100644 --- a/source/index.js +++ b/source/index.js @@ -5,16 +5,14 @@ import {getResult} from './result.js'; import {handlePipe} from './pipe.js'; import {lineIterator, combineAsyncIterators} from './iterable.js'; -export default function nanoSpawn(first, second = [], third = {}) { - const [file, previous] = Array.isArray(first) ? first : [first, {}]; - const [commandArguments, options] = Array.isArray(second) ? [second, third] : [[], second]; - - const context = getContext(previous, [file, ...commandArguments]); +export default function nanoSpawn(file, second, third, previous) { + const [commandArguments = [], options = {}] = Array.isArray(second) ? [second, third] : [[], second]; + const context = getContext([file, ...commandArguments]); const spawnOptions = getOptions(options); const nodeChildProcess = spawnSubprocess(file, commandArguments, spawnOptions, context); let subprocess = getResult(nodeChildProcess, spawnOptions, context); Object.assign(subprocess, {nodeChildProcess}); - subprocess = previous.subprocess === undefined ? subprocess : handlePipe(previous, subprocess); + subprocess = previous ? handlePipe(previous, subprocess) : subprocess; const stdout = lineIterator(subprocess, context, 'stdout'); const stderr = lineIterator(subprocess, context, 'stderr'); @@ -22,6 +20,6 @@ export default function nanoSpawn(first, second = [], third = {}) { stdout, stderr, [Symbol.asyncIterator]: () => combineAsyncIterators(stdout, stderr), - pipe: (file, second, third) => nanoSpawn([file, {...context, subprocess}], second, third), + pipe: (file, second, third) => nanoSpawn(file, second, third, subprocess), }); } diff --git a/source/index.test-d.ts b/source/index.test-d.ts index b142536..676378a 100644 --- a/source/index.test-d.ts +++ b/source/index.test-d.ts @@ -20,6 +20,9 @@ try { expectType(result.output); expectType(result.command); expectType(result.durationMs); + expectType(result.pipedFrom); + expectType(result.pipedFrom?.pipedFrom); + expectType(result.pipedFrom?.durationMs); expectNotAssignable(result); expectError(result.exitCode); expectError(result.signalName); @@ -31,6 +34,9 @@ try { expectType(subprocessError.output); expectType(subprocessError.command); expectType(subprocessError.durationMs); + expectType(subprocessError.pipedFrom); + expectType(subprocessError.pipedFrom?.pipedFrom); + expectType(subprocessError.pipedFrom?.durationMs); expectAssignable(subprocessError); expectType(subprocessError.exitCode); expectType(subprocessError.signalName); diff --git a/source/pipe.js b/source/pipe.js index b99f7ed..fa477e4 100644 --- a/source/pipe.js +++ b/source/pipe.js @@ -1,18 +1,22 @@ import {pipeline} from 'node:stream/promises'; -export const handlePipe = (previous, subprocess) => Object.assign(runProcesses([previous.subprocess, subprocess]), subprocess); +export const handlePipe = (previous, subprocess) => Object.assign(runProcesses([previous, subprocess]), subprocess); const runProcesses = async subprocesses => { // Ensure both subprocesses have exited before resolving, and that we handle errors from both - const returns = await Promise.allSettled([pipeStreams(subprocesses), ...subprocesses]); + const [[from, to]] = await Promise.all([Promise.allSettled(subprocesses), pipeStreams(subprocesses)]); - // If both subprocesses fail, throw source error to use a predictable order and avoid race conditions - const error = returns.map(({reason}) => reason).find(Boolean); - if (error) { - throw error; + // If both subprocesses fail, throw destination error to use a predictable order and avoid race conditions + if (to.reason) { + to.reason.pipedFrom = from.reason ?? from.value; + throw to.reason; + } + + if (from.reason) { + throw from.reason; } - return returns[2].value; + return {...to.value, pipedFrom: from.value}; }; const pipeStreams = async subprocesses => { diff --git a/test/pipe.js b/test/pipe.js index 2ec5bb1..5202f06 100644 --- a/test/pipe.js +++ b/test/pipe.js @@ -39,28 +39,35 @@ import { const testFixtureUrl = new URL('test.txt', FIXTURES_URL); -const getPipeSize = command => command.split(' | ').length; - test('.pipe() success', async t => { - const {stdout, output, command, durationMs} = await nanoSpawn(...nodePrintStdout).pipe(...nodeToUpperCase); + const first = nanoSpawn(...nodePrintStdout); + const {stdout, output, durationMs, pipedFrom} = await first.pipe(...nodeToUpperCase); + const firstResult = await first; + t.is(firstResult.pipedFrom, undefined); + t.is(pipedFrom, firstResult); t.is(stdout, testUpperCase); t.is(output, stdout); - t.is(getPipeSize(command), 2); assertDurationMs(t, durationMs); }); test('.pipe() source fails', async t => { - const error = await t.throwsAsync(nanoSpawn(...nodePrintFail).pipe(...nodeToUpperCase)); - assertFail(t, error); - t.is(error.stdout, testString); - t.is(error.output, error.stdout); - t.is(getPipeSize(error.command), 1); + const first = nanoSpawn(...nodePrintFail); + const secondError = await t.throwsAsync(first.pipe(...nodeToUpperCase)); + const firstError = await t.throwsAsync(first); + t.is(firstError, secondError); + t.is(secondError.pipedFrom, undefined); + assertFail(t, secondError); + t.is(secondError.stdout, testString); + t.is(secondError.output, secondError.stdout); }); test('.pipe() source fails due to child_process invalid option', async t => { - const error = await t.throwsAsync(nanoSpawn(...nodePrintStdout, earlyErrorOptions).pipe(...nodeToUpperCase)); - assertEarlyError(t, error); - t.is(getPipeSize(error.command), 1); + const first = nanoSpawn(...nodePrintStdout, earlyErrorOptions); + const secondError = await t.throwsAsync(first.pipe(...nodeToUpperCase)); + const firstError = await t.throwsAsync(first); + assertEarlyError(t, secondError); + t.is(firstError, secondError); + t.is(secondError.pipedFrom, undefined); }); test('.pipe() source fails due to stream error', async t => { @@ -69,21 +76,33 @@ test('.pipe() source fails due to stream error', async t => { const cause = new Error(testString); const nodeChildProcess = await first.nodeChildProcess; nodeChildProcess.stdout.destroy(cause); - const error = await t.throwsAsync(second); - assertErrorEvent(t, error, cause); + const secondError = await t.throwsAsync(second); + const firstError = await t.throwsAsync(first); + assertErrorEvent(t, secondError, cause); + assertErrorEvent(t, firstError, cause); + t.is(firstError.pipedFrom, undefined); + t.is(secondError.pipedFrom, firstError); }); test('.pipe() destination fails', async t => { - const error = await t.throwsAsync(nanoSpawn(...nodePrintStdout).pipe(...nodeToUpperCaseFail)); - assertFail(t, error); - t.is(error.stdout, testUpperCase); - t.is(getPipeSize(error.command), 2); + const first = nanoSpawn(...nodePrintStdout); + const secondError = await t.throwsAsync(first.pipe(...nodeToUpperCaseFail)); + const firstResult = await first; + assertFail(t, secondError); + t.is(firstResult.pipedFrom, undefined); + t.is(secondError.pipedFrom, firstResult); + t.is(firstResult.stdout, testString); + t.is(secondError.stdout, testUpperCase); }); test('.pipe() destination fails due to child_process invalid option', async t => { - const error = await t.throwsAsync(nanoSpawn(...nodePrintStdout).pipe(...nodeToUpperCase, earlyErrorOptions)); - assertEarlyError(t, error); - t.is(getPipeSize(error.command), 2); + const first = nanoSpawn(...nodePrintStdout); + const secondError = await t.throwsAsync(first.pipe(...nodeToUpperCase, earlyErrorOptions)); + const firstResult = await first; + assertEarlyError(t, secondError); + t.is(firstResult.pipedFrom, undefined); + t.is(secondError.pipedFrom, undefined); + t.is(firstResult.stdout, testString); }); test('.pipe() destination fails due to stream error', async t => { @@ -92,15 +111,26 @@ test('.pipe() destination fails due to stream error', async t => { const cause = new Error(testString); const nodeChildProcess = await second.nodeChildProcess; nodeChildProcess.stdin.destroy(cause); - const error = await t.throwsAsync(second); - assertErrorEvent(t, error, cause); + const secondError = await t.throwsAsync(second); + const firstError = await t.throwsAsync(first); + assertErrorEvent(t, secondError, cause); + assertErrorEvent(t, firstError, cause); + t.is(firstError.pipedFrom, undefined); + t.is(secondError.pipedFrom, firstError); }); test('.pipe() source and destination fail', async t => { - const error = await t.throwsAsync(nanoSpawn(...nodePrintFail).pipe(...nodeToUpperCaseFail)); - assertFail(t, error); - t.is(error.stdout, testString); - t.is(getPipeSize(error.command), 1); + const first = nanoSpawn(...nodePrintFail); + const secondError = await t.throwsAsync(first.pipe(...nodeToUpperCaseFail)); + const firstError = await t.throwsAsync(first); + assertFail(t, firstError); + assertFail(t, secondError); + t.is(firstError.pipedFrom, undefined); + t.is(secondError.pipedFrom, firstError); + t.is(firstError.stdout, testString); + t.is(firstError.output, firstError.stdout); + t.is(secondError.stdout, testUpperCase); + t.is(secondError.output, secondError.stdout); }); test('.pipe().pipe() success', async t => { @@ -111,10 +141,7 @@ test('.pipe().pipe() success', async t => { t.is(firstResult.output, firstResult.stdout); t.is(secondResult.stdout, testDoubleUpperCase); t.is(secondResult.output, secondResult.stdout); - t.is(getPipeSize(firstResult.command), 2); - t.is(getPipeSize(secondResult.command), 3); assertDurationMs(t, firstResult.durationMs); - t.true(secondResult.durationMs > firstResult.durationMs); }); test('.pipe().pipe() first source fail', async t => { @@ -125,7 +152,6 @@ test('.pipe().pipe() first source fail', async t => { t.is(firstError, secondError); t.is(firstError.stdout, testString); t.is(firstError.output, firstError.stdout); - t.is(getPipeSize(firstError.command), 1); }); test('.pipe().pipe() second source fail', async t => { @@ -136,7 +162,6 @@ test('.pipe().pipe() second source fail', async t => { t.is(firstError, secondError); t.is(firstError.stdout, testUpperCase); t.is(firstError.output, firstError.stdout); - t.is(getPipeSize(firstError.command), 2); }); test('.pipe().pipe() destination fail', async t => { @@ -148,8 +173,6 @@ test('.pipe().pipe() destination fail', async t => { t.is(firstResult.output, firstResult.stdout); t.is(secondError.stdout, testDoubleUpperCase); t.is(secondError.output, secondError.stdout); - t.is(getPipeSize(firstResult.command), 2); - t.is(getPipeSize(secondError.command), 3); assertDurationMs(t, firstResult.durationMs); }); @@ -158,10 +181,12 @@ test('.pipe().pipe() all fail', async t => { const secondError = await t.throwsAsync(first.pipe(...nodeDoubleFail)); const firstError = await t.throwsAsync(first); assertFail(t, firstError); - t.is(firstError, secondError); - t.is(firstError.stdout, testString); + assertFail(t, secondError); + t.is(secondError.pipedFrom, firstError); + t.is(firstError.stdout, testUpperCase); t.is(firstError.output, firstError.stdout); - t.is(getPipeSize(firstError.command), 1); + t.is(secondError.stdout, testDoubleUpperCase); + t.is(secondError.output, secondError.stdout); }); // Cannot guarantee that `cat` exists on Windows