From 1181d57b97473bea3aaa4c77b8edd798ab364838 Mon Sep 17 00:00:00 2001 From: ehmicky Date: Sun, 1 Sep 2024 20:13:09 +0100 Subject: [PATCH] Separate smaller core --- source/context.js | 1 - source/index.js | 19 ++++++++-- source/iterable.js | 6 +-- source/pico-spawn.js | 90 ++++++++++++++++++++++++++++++++++++++++++++ source/result.js | 67 +++++++-------------------------- source/spawn.js | 49 ++++++------------------ 6 files changed, 134 insertions(+), 98 deletions(-) create mode 100644 source/pico-spawn.js diff --git a/source/context.js b/source/context.js index 75c9a50..a462d60 100644 --- a/source/context.js +++ b/source/context.js @@ -4,7 +4,6 @@ import {stripVTControlCharacters} from 'node:util'; export const getContext = raw => ({ start: process.hrtime.bigint(), command: raw.map(part => getCommandPart(stripVTControlCharacters(part))).join(' '), - state: {stdout: '', stderr: '', output: ''}, }); const getCommandPart = part => /[^\w./-]/.test(part) diff --git a/source/index.js b/source/index.js index 0654c8a..e8db2dc 100644 --- a/source/index.js +++ b/source/index.js @@ -1,6 +1,7 @@ import {getContext} from './context.js'; import {getOptions} from './options.js'; -import {spawnSubprocess} from './spawn.js'; +import {handleArguments} from './spawn.js'; +import picoSpawn from './pico-spawn.js'; import {getResult} from './result.js'; import {handlePipe} from './pipe.js'; import {lineIterator, combineAsyncIterators} from './iterable.js'; @@ -9,8 +10,9 @@ export default function spawn(file, second, third, previous) { const [commandArguments = [], options = {}] = Array.isArray(second) ? [second, third] : [[], second]; const context = getContext([file, ...commandArguments]); const spawnOptions = getOptions(options); - const nodeChildProcess = spawnSubprocess(file, commandArguments, spawnOptions, context); - let subprocess = getResult(nodeChildProcess, spawnOptions, context); + const picoPromise = getPicoSubprocess(file, commandArguments, spawnOptions, context); + const nodeChildProcess = getNodeChildProcess(picoPromise); + let subprocess = getResult(picoPromise, nodeChildProcess, context, spawnOptions); Object.assign(subprocess, {nodeChildProcess}); subprocess = previous ? handlePipe([previous, subprocess]) : subprocess; @@ -24,3 +26,14 @@ export default function spawn(file, second, third, previous) { pipe: (file, second, third) => spawn(file, second, third, subprocess), }); } + +const getPicoSubprocess = async (file, commandArguments, spawnOptions, context) => { + const spawnArguments = await handleArguments(file, commandArguments, spawnOptions, context); + const picoSubprocess = picoSpawn(...spawnArguments); + return {picoSubprocess}; +}; + +const getNodeChildProcess = async picoPromise => { + const {picoSubprocess} = await picoPromise; + return picoSubprocess.nodeChildProcess; +}; diff --git a/source/iterable.js b/source/iterable.js index db0b256..339c6d1 100644 --- a/source/iterable.js +++ b/source/iterable.js @@ -1,12 +1,12 @@ -export const lineIterator = async function * (subprocess, {state}, streamName) { +export const lineIterator = async function * (subprocess, context, streamName) { // Prevent buffering when iterating. // This would defeat one of the main goals of iterating: low memory consumption. - if (state.isIterating === false) { + if (context.isIterating === false) { throw new Error(`The subprocess must be iterated right away, for example: for await (const line of spawn(...)) { ... }`); } - state.isIterating = true; + context.isIterating = true; try { const {[streamName]: stream} = await subprocess.nodeChildProcess; diff --git a/source/pico-spawn.js b/source/pico-spawn.js new file mode 100644 index 0000000..7ba11c4 --- /dev/null +++ b/source/pico-spawn.js @@ -0,0 +1,90 @@ +import {spawn} from 'node:child_process'; +import {once, on} from 'node:events'; + +export default function picoSpawn(file, second, third) { + const [commandArguments = [], options = {}] = Array.isArray(second) ? [second, third] : [[], second]; + const state = { + stdout: '', + stderr: '', + output: '', + command: [file, ...commandArguments].join(' '), + }; + const nodeChildProcess = spawnSubprocess(file, commandArguments, options, state); + return Object.assign(getResult(nodeChildProcess, state), {nodeChildProcess}); +} + +const spawnSubprocess = async (file, commandArguments, options, state) => { + try { + const instance = spawn(file, commandArguments, options); + bufferOutput(instance.stdout, 'stdout', options, state); + bufferOutput(instance.stderr, 'stderr', options, state); + + // The `error` event is caught by `once(instance, 'spawn')` and `once(instance, 'close')`. + // But it creates an uncaught exception if it happens exactly one tick after 'spawn'. + // This prevents that. + instance.once('error', () => {}); + + await once(instance, 'spawn'); + return instance; + } catch (error) { + throw getSubprocessError(error, {}, state); + } +}; + +const bufferOutput = (stream, streamName, {buffer = true}, state) => { + if (stream) { + stream.setEncoding('utf8'); + if (buffer) { + stream.on('data', chunk => { + state[streamName] += chunk; + state.output += chunk; + }); + } + } +}; + +const getResult = async (nodeChildProcess, state) => { + const instance = await nodeChildProcess; + const onClose = once(instance, 'close'); + + try { + await Promise.race([ + onClose, + ...instance.stdio.filter(Boolean).map(stream => onStreamError(stream)), + ]); + checkFailure(instance, state); + return state; + } catch (error) { + await Promise.allSettled([onClose]); + throw getSubprocessError(error, instance, state); + } +}; + +const onStreamError = async stream => { + for await (const [error] of on(stream, 'error')) { + // Ignore errors that are due to closing errors when the subprocesses exit normally, or due to piping + if (!['ERR_STREAM_PREMATURE_CLOSE', 'EPIPE'].includes(error?.code)) { + throw error; + } + } +}; + +const checkFailure = ({exitCode, signalCode}, {command}) => { + if (signalCode) { + throw new Error(`Command was terminated with ${signalCode}: ${command}`); + } + + if (exitCode >= 1) { + throw new Error(`Command failed with exit code ${exitCode}: ${command}`); + } +}; + +const getSubprocessError = (error, {exitCode, signalCode}, state) => Object.assign( + error?.message?.includes('Command ') + ? error + : new Error(`Command failed: ${state.command}`, {cause: error}), + // `exitCode` can be a negative number (`errno`) when the `error` event is emitted on the `instance` + exitCode >= 1 ? {exitCode} : {}, + signalCode ? {signalName: signalCode} : {}, + state, +); diff --git a/source/result.js b/source/result.js index 9716418..c804f84 100644 --- a/source/result.js +++ b/source/result.js @@ -1,66 +1,27 @@ -import {once, on} from 'node:events'; import process from 'node:process'; -export const getResult = async (nodeChildProcess, {input}, context) => { - const instance = await nodeChildProcess; - if (input !== undefined) { - instance.stdin.end(input); - } - - const onClose = once(instance, 'close'); - +export const getResult = async (picoPromise, nodeChildProcess, context, options) => { try { - await Promise.race([ - onClose, - ...instance.stdio.filter(Boolean).map(stream => onStreamError(stream)), - ]); - checkFailure(context, getErrorOutput(instance)); - return getOutputs(context); + const {picoSubprocess} = await picoPromise; + const [result] = await Promise.all([picoSubprocess, handleInput(nodeChildProcess, options)]); + return updateResult(result, context); } catch (error) { - await Promise.allSettled([onClose]); - throw getResultError(error, instance, context); - } -}; - -const onStreamError = async stream => { - for await (const [error] of on(stream, 'error')) { - // Ignore errors that are due to closing errors when the subprocesses exit normally, or due to piping - if (!['ERR_STREAM_PREMATURE_CLOSE', 'EPIPE'].includes(error?.code)) { - throw error; - } + error.message = error.message.replaceAll(error.command, context.command); + throw updateResult(error, context); } }; -const checkFailure = ({command}, {exitCode, signalName}) => { - if (signalName !== undefined) { - throw new Error(`Command was terminated with ${signalName}: ${command}`); - } - - if (exitCode !== undefined) { - throw new Error(`Command failed with exit code ${exitCode}: ${command}`); +const handleInput = async (nodeChildProcess, {input}) => { + const {stdin} = await nodeChildProcess; + if (input !== undefined) { + stdin.end(input); } }; -export const getResultError = (error, instance, context) => Object.assign( - getErrorInstance(error, context), - getErrorOutput(instance), - getOutputs(context), -); - -const getErrorInstance = (error, {command}) => error?.message.startsWith('Command ') - ? error - : new Error(`Command failed: ${command}`, {cause: error}); - -const getErrorOutput = ({exitCode, signalCode}) => ({ - // `exitCode` can be a negative number (`errno`) when the `error` event is emitted on the `instance` - ...(exitCode < 1 ? {} : {exitCode}), - ...(signalCode === null ? {} : {signalName: signalCode}), -}); - -const getOutputs = ({state: {stdout, stderr, output}, command, start}) => ({ - stdout: getOutput(stdout), - stderr: getOutput(stderr), - output: getOutput(output), +const updateResult = (result, {command, start}) => Object.assign(result, { + stdout: getOutput(result.stdout), + stderr: getOutput(result.stderr), + output: getOutput(result.output), command, durationMs: Number(process.hrtime.bigint() - start) / 1e6, }); diff --git a/source/spawn.js b/source/spawn.js index d31eedd..58cdc5c 100644 --- a/source/spawn.js +++ b/source/spawn.js @@ -1,43 +1,16 @@ -import {spawn} from 'node:child_process'; -import {once} from 'node:events'; import process from 'node:process'; import {applyForceShell} from './windows.js'; -import {getResultError} from './result.js'; -export const spawnSubprocess = async (file, commandArguments, options, context) => { - try { - // When running `node`, keep the current Node version and CLI flags. - // Not applied with file paths to `.../node` since those indicate a clear intent to use a specific Node version. - // Does not work with shebangs, but those don't work cross-platform anyway. - [file, commandArguments] = ['node', 'node.exe'].includes(file.toLowerCase()) - ? [process.execPath, [...process.execArgv.filter(flag => !flag.startsWith('--inspect')), ...commandArguments]] - : [file, commandArguments]; +export const handleArguments = async (file, commandArguments, options, context) => { + // When running `node`, keep the current Node version and CLI flags. + // Not applied with file paths to `.../node` since those indicate a clear intent to use a specific Node version. + // This also provides a way to opting out, e.g. using `process.execPath` instead of `node` to discard current CLI flags. + // Does not work with shebangs, but those don't work cross-platform anyway. + [file, commandArguments] = ['node', 'node.exe'].includes(file.toLowerCase()) + ? [process.execPath, [...process.execArgv.filter(flag => !flag.startsWith('--inspect')), ...commandArguments]] + : [file, commandArguments]; - const instance = spawn(...await applyForceShell(file, commandArguments, options)); - bufferOutput(instance.stdout, context, 'stdout'); - bufferOutput(instance.stderr, context, 'stderr'); - - // The `error` event is caught by `once(instance, 'spawn')` and `once(instance, 'close')`. - // But it creates an uncaught exception if it happens exactly one tick after 'spawn'. - // This prevents that. - instance.once('error', () => {}); - - await once(instance, 'spawn'); - return instance; - } catch (error) { - throw getResultError(error, {}, context); - } -}; - -const bufferOutput = (stream, {state}, streamName) => { - if (stream) { - stream.setEncoding('utf8'); - if (!state.isIterating) { - state.isIterating = false; - stream.on('data', chunk => { - state[streamName] += chunk; - state.output += chunk; - }); - } - } + [file, commandArguments, options] = await applyForceShell(file, commandArguments, options); + context.isIterating ??= false; + return [file, commandArguments, {...options, buffer: !context.isIterating}]; };