Skip to content

Commit

Permalink
Allow binary input/output
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky committed Sep 2, 2024
1 parent 6e8cf4e commit 652cd20
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 20 deletions.
56 changes: 40 additions & 16 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import {Buffer} from 'node:buffer';
import {spawn} from 'node:child_process';
import {once, on} from 'node:events';
import {stripVTControlCharacters} from 'node:util';
Expand All @@ -15,11 +16,16 @@ export default function nanoSpawn(first, second = [], third = {}) {
const start = previous.start ?? process.hrtime.bigint();
const spawnOptions = getOptions(options);
const command = [previous.command, getCommand(file, commandArguments)].filter(Boolean).join(' | ');
const context = {start, command, state: initState()};
const context = {
start,
command,
state: initState(spawnOptions),
spawnOptions,
};
[file, commandArguments] = handleNode(file, commandArguments);
const input = getInput(spawnOptions);

const nodeChildProcess = getInstance(file, commandArguments, spawnOptions, context);
const nodeChildProcess = getInstance(file, commandArguments, context);
const resultPromise = Object.assign(getResult(nodeChildProcess, input, context), {nodeChildProcess});
const finalPromise = previous.resultPromise === undefined ? resultPromise : handlePipe(previous, resultPromise);

Expand Down Expand Up @@ -93,7 +99,8 @@ const getInput = ({stdio}) => {
return input;
};

const getInstance = async (file, commandArguments, spawnOptions, context) => {
const getInstance = async (file, commandArguments, context) => {
const {spawnOptions} = context;
try {
const forcedShell = await getForcedShell(file, spawnOptions);
spawnOptions.shell ||= forcedShell;
Expand All @@ -109,7 +116,7 @@ const getInstance = async (file, commandArguments, spawnOptions, context) => {
await once(instance, 'spawn');
return instance;
} catch (error) {
throw getResultError(error, initState(), context);
throw getResultError(error, initState(spawnOptions), context);
}
};

Expand All @@ -121,7 +128,7 @@ const getResult = async (nodeChildProcess, input, context) => {
try {
await Promise.race([onClose, ...onStreamErrors(instance)]);
checkFailure(context, getErrorOutput(instance));
return getOutput(context);
return getOutputs(context);
} catch (error) {
await Promise.allSettled([onClose]);
throw getResultError(error, instance, context);
Expand All @@ -134,21 +141,32 @@ const useInput = (instance, input) => {
}
};

const initState = () => ({stdout: '', stderr: ''});
// We use `Buffer` internally instead of `Uint8Array` because:
// - It pools the underlying `Uint8Arrays`
// - `Buffer.toString()` uses performant logic written in C++ (libuv)
// - It results in shorter code
const initState = ({binary}) => binary
? {stdout: Buffer.alloc(0), stderr: Buffer.alloc(0)}
: {stdout: '', stderr: ''};

const bufferOutput = (stream, {state}, streamName) => {
const bufferOutput = (stream, {state, spawnOptions: {binary}}, streamName) => {
if (!stream) {
return;
}

stream.setEncoding('utf8');
if (!binary) {
stream.setEncoding('utf8');
}

if (state.isIterating) {
return;
}

state.isIterating = false;
stream.on('data', chunk => {
state[streamName] += chunk;
state[streamName] = binary
? Buffer.concat([state[streamName], chunk])
: `${state[streamName]}${chunk}`;
});
};

Expand All @@ -168,7 +186,7 @@ const IGNORED_CODES = new Set(['ERR_STREAM_PREMATURE_CLOSE', 'EPIPE']);
const getResultError = (error, instance, context) => Object.assign(
getErrorInstance(error, context),
getErrorOutput(instance),
getOutput(context),
getOutputs(context),
);

const getErrorInstance = (error, {command}) => error?.message.startsWith('Command ')
Expand All @@ -181,16 +199,22 @@ const getErrorOutput = ({exitCode, signalCode}) => ({
...(signalCode === null ? {} : {signalName: signalCode}),
});

const getOutput = ({state: {stdout, stderr}, command, start}) => ({
stdout: stripNewline(stdout),
stderr: stripNewline(stderr),
const getOutputs = ({state: {stdout, stderr}, command, start, spawnOptions}) => ({
stdout: getOutput(stdout, spawnOptions),
stderr: getOutput(stderr, spawnOptions),
command,
durationMs: Number(process.hrtime.bigint() - start) / 1e6,
});

const stripNewline = input => input?.at(-1) === '\n'
? input.slice(0, input.at(-2) === '\r' ? -2 : -1)
: input;
const getOutput = (output, {binary}) => {
if (binary) {
return new Uint8Array(output);
}

return output.at(-1) === '\n'
? output.slice(0, output.at(-2) === '\r' ? -2 : -1)
: output;
};

const checkFailure = ({command}, {exitCode, signalName}) => {
if (signalName !== undefined) {
Expand Down
12 changes: 8 additions & 4 deletions iterable.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export const lineIterator = async function * (resultPromise, {state}, streamName) {
export const lineIterator = async function * (resultPromise, {state, spawnOptions: {binary}}, streamName) {
// Prevent buffering when iterating.
// This would defeat one of the main goals of iterating: low memory consumption.
if (state.isIterating === false) {
Expand All @@ -16,9 +16,13 @@ export const lineIterator = async function * (resultPromise, {state}, streamName
try {
let buffer = '';
for await (const chunk of stream.iterator({destroyOnReturn: false})) {
const lines = `${buffer}${chunk}`.split(/\r?\n/);
buffer = lines.pop(); // Keep last line in buffer as it may not be complete
yield * lines;
if (binary) {
yield new Uint8Array(chunk);
} else {
const lines = `${buffer}${chunk}`.split(/\r?\n/);
buffer = lines.pop(); // Keep last line in buffer as it may not be complete
yield * lines;
}
}

if (buffer) {
Expand Down

0 comments on commit 652cd20

Please sign in to comment.