From 833a4d698cbfe37da89910603f8c320087791357 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=AA=E3=81=A4=E3=81=8D?= Date: Mon, 4 Nov 2024 14:30:31 -0800 Subject: [PATCH] Switch to sync-child-process (#347) --- lib/src/compiler/sync.ts | 15 +- lib/src/sync-process/event.ts | 72 ------- lib/src/sync-process/index.test.ts | 157 -------------- lib/src/sync-process/index.ts | 192 ------------------ .../sync-process/sync-message-port.test.ts | 179 ---------------- lib/src/sync-process/sync-message-port.ts | 174 ---------------- lib/src/sync-process/worker.ts | 67 ------ package.json | 1 + 8 files changed, 9 insertions(+), 848 deletions(-) delete mode 100644 lib/src/sync-process/event.ts delete mode 100644 lib/src/sync-process/index.test.ts delete mode 100644 lib/src/sync-process/index.ts delete mode 100644 lib/src/sync-process/sync-message-port.test.ts delete mode 100644 lib/src/sync-process/sync-message-port.ts delete mode 100644 lib/src/sync-process/worker.ts diff --git a/lib/src/compiler/sync.ts b/lib/src/compiler/sync.ts index 9ca992ad..77a2668a 100644 --- a/lib/src/compiler/sync.ts +++ b/lib/src/compiler/sync.ts @@ -3,6 +3,7 @@ // https://opensource.org/licenses/MIT. import {Subject} from 'rxjs'; +import {SyncChildProcess} from 'sync-child-process'; import * as path from 'path'; import { @@ -20,7 +21,6 @@ import {FunctionRegistry} from '../function-registry'; import {ImporterRegistry} from '../importer-registry'; import {MessageTransformer} from '../message-transformer'; import {PacketTransformer} from '../packet-transformer'; -import {SyncProcess} from '../sync-process'; import * as utils from '../utils'; import * as proto from '../vendor/embedded_sass_pb'; import {CompileResult} from '../vendor/sass/compile'; @@ -36,7 +36,7 @@ const initFlag = Symbol(); /** A synchronous wrapper for the embedded Sass compiler */ export class Compiler { /** The underlying process that's being wrapped. */ - private readonly process = new SyncProcess( + private readonly process = new SyncChildProcess( compilerCommand[0], [...compilerCommand.slice(1), '--embedded'], { @@ -77,7 +77,12 @@ export class Compiler { /** Yields the next event from the underlying process. */ private yield(): boolean { - const event = this.process.yield(); + const result = this.process.next(); + if (result.done) { + this.disposed = true; + return false; + } + const event = result.value; switch (event.type) { case 'stdout': this.stdout$.next(event.data); @@ -86,10 +91,6 @@ export class Compiler { case 'stderr': this.stderr$.next(event.data); return true; - - case 'exit': - this.disposed = true; - return false; } } diff --git a/lib/src/sync-process/event.ts b/lib/src/sync-process/event.ts deleted file mode 100644 index 5fe1ac95..00000000 --- a/lib/src/sync-process/event.ts +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright 2021 Google LLC. Use of this source code is governed by an -// MIT-style license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -/** An event emitted by the child process. */ -export type Event = StdoutEvent | StderrEvent | ExitEvent; - -/** An event sent from the worker to the host. */ -export type InternalEvent = - | InternalStdoutEvent - | InternalStderrEvent - | ExitEvent - | ErrorEvent; - -/** An event indicating that data has been emitted over stdout. */ -export interface StdoutEvent { - type: 'stdout'; - data: Buffer; -} - -/** An event indicating that data has been emitted over stderr. */ -export interface StderrEvent { - type: 'stderr'; - data: Buffer; -} - -/** An event indicating that process has exited. */ -export interface ExitEvent { - type: 'exit'; - - /** - * The exit code. This will be `undefined` if the subprocess was killed via - * signal. - */ - code?: number; - - /** - * The signal that caused this process to exit. This will be `undefined` if - * the subprocess exited normally. - */ - signal?: NodeJS.Signals; -} - -/** - * The stdout event sent from the worker to the host. The structured clone - * algorithm automatically converts `Buffer`s sent through `MessagePort`s to - * `Uint8Array`s. - */ -export interface InternalStdoutEvent { - type: 'stdout'; - data: Buffer | Uint8Array; -} - -/** - * The stderr event sent from the worker to the host. The structured clone - * algorithm automatically converts `Buffer`s sent through `MessagePort`s to - * `Uint8Array`s. - */ -export interface InternalStderrEvent { - type: 'stderr'; - data: Buffer | Uint8Array; -} - -/** - * An error occurred when starting or closing the child process. This is only - * used internally; the host will throw the error rather than returning it to - * the caller. - */ -export interface ErrorEvent { - type: 'error'; - error: Error; -} diff --git a/lib/src/sync-process/index.test.ts b/lib/src/sync-process/index.test.ts deleted file mode 100644 index 99a2996f..00000000 --- a/lib/src/sync-process/index.test.ts +++ /dev/null @@ -1,157 +0,0 @@ -// Copyright 2021 Google LLC. Use of this source code is governed by an -// MIT-style license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -import * as fs from 'fs'; -import * as p from 'path'; -import * as del from 'del'; - -import {Event, StderrEvent, StdoutEvent, SyncProcess} from './index'; - -describe('SyncProcess', () => { - describe('stdio', () => { - it('emits stdout', () => { - withJSProcess('console.log("hello, world!");', node => { - expectStdout(node.yield(), 'hello, world!\n'); - }); - }); - - it('emits stderr', () => { - withJSProcess('console.error("hello, world!");', node => { - expectStderr(node.yield(), 'hello, world!\n'); - }); - }); - - it('receives stdin', () => { - withJSProcess( - 'process.stdin.on("data", (data) => process.stdout.write(data));', - node => { - node.stdin.write('hi there!\n'); - expectStdout(node.yield(), 'hi there!\n'); - node.stdin.write('fblthp\n'); - expectStdout(node.yield(), 'fblthp\n'); - }, - ); - }); - - it('closes stdin', () => { - withJSProcess( - ` - process.stdin.on("data", () => {}); - process.stdin.on("end", () => console.log("closed!")); - `, - node => { - node.stdin.end(); - expectStdout(node.yield(), 'closed!\n'); - }, - ); - }); - }); - - describe('emits exit', () => { - it('with code 0 by default', () => { - withJSProcess('', node => { - expectExit(node.yield(), 0); - }); - }); - - it('with a non-0 code', () => { - withJSProcess('process.exit(123);', node => { - expectExit(node.yield(), 123); - }); - }); - - it('with a signal code', () => { - withJSProcess('for (;;) {}', node => { - node.kill('SIGINT'); - expectExit(node.yield(), 'SIGINT'); - }); - }); - }); - - it('passes options to the subprocess', () => { - withJSFile('console.log(process.env.SYNC_PROCESS_TEST);', file => { - const node = new SyncProcess(process.argv0, [file], { - env: {...process.env, SYNC_PROCESS_TEST: 'abcdef'}, - }); - expectStdout(node.yield(), 'abcdef\n'); - node.kill(); - }); - }); -}); - -/** Asserts that `event` is a `StdoutEvent` with text `text`. */ -function expectStdout(event: Event, text: string): void { - if (event.type === 'stderr') { - throw `Expected stdout event, was stderr event: ${event.data.toString()}`; - } - - expect(event.type).toEqual('stdout'); - expect((event as StdoutEvent).data.toString()).toEqual(text); -} - -/** Asserts that `event` is a `StderrEvent` with text `text`. */ -function expectStderr(event: Event, text: string): void { - if (event.type === 'stdout') { - throw `Expected stderr event, was stdout event: ${event.data.toString()}`; - } - - expect(event.type).toEqual('stderr'); - expect((event as StderrEvent).data.toString()).toEqual(text); -} - -/** - * Asserts that `event` is an `ExitEvent` with either the given exit code (if - * `codeOrSignal` is a number) or signal (if `codeOrSignal` is a string). - */ -function expectExit(event: Event, codeOrSignal: number | NodeJS.Signals): void { - if (event.type !== 'exit') { - throw ( - `Expected exit event, was ${event.type} event: ` + event.data.toString() - ); - } - - expect(event).toEqual( - typeof codeOrSignal === 'number' - ? {type: 'exit', code: codeOrSignal} - : {type: 'exit', signal: codeOrSignal}, - ); -} - -/** - * Starts a `SyncProcess` running a JS file with the given `contents` and passes - * it to `callback`. - */ -function withJSProcess( - contents: string, - callback: (process: SyncProcess) => void, -): void { - return withJSFile(contents, file => { - const node = new SyncProcess(process.argv0, [file]); - - try { - callback(node); - } finally { - node.kill(); - } - }); -} - -/** - * Creates a JS file with the given `contents` for the duration of `callback`. - * - * The `callback` is passed the name of the created file. - */ -function withJSFile(contents: string, callback: (file: string) => void): void { - const testDir = p.join('spec', 'sandbox', `${Math.random()}`.slice(2)); - fs.mkdirSync(testDir, {recursive: true}); - const file = p.join(testDir, 'script.js'); - fs.writeFileSync(file, contents); - - try { - callback(file); - } finally { - // TODO(awjin): Change this to rmSync once we drop support for Node 12. - del.sync(testDir, {force: true}); - } -} diff --git a/lib/src/sync-process/index.ts b/lib/src/sync-process/index.ts deleted file mode 100644 index 37f0d26d..00000000 --- a/lib/src/sync-process/index.ts +++ /dev/null @@ -1,192 +0,0 @@ -// Copyright 2021 Google LLC. Use of this source code is governed by an -// MIT-style license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -import * as fs from 'fs'; -import * as p from 'path'; -import * as stream from 'stream'; -import {Worker, WorkerOptions} from 'worker_threads'; - -import {SyncMessagePort} from './sync-message-port'; -import {Event, InternalEvent} from './event'; - -export {Event, ExitEvent, StderrEvent, StdoutEvent} from './event'; - -// TODO(nex3): Factor this out into its own package. - -/** - * A child process that runs synchronously while also allowing the user to - * interact with it before it shuts down. - */ -export class SyncProcess { - /** The port that communicates with the worker thread. */ - private readonly port: SyncMessagePort; - - /** The worker in which the child process runs. */ - private readonly worker: Worker; - - /** The standard input stream to write to the process. */ - readonly stdin: stream.Writable; - - /** Creates a new synchronous process running `command` with `args`. */ - constructor(command: string, options?: Options); - constructor(command: string, args: string[], options?: Options); - - constructor( - command: string, - argsOrOptions?: string[] | Options, - options?: Options, - ) { - let args: string[]; - if (Array.isArray(argsOrOptions)) { - args = argsOrOptions; - } else { - args = []; - options = argsOrOptions; - } - - const {port1, port2} = SyncMessagePort.createChannel(); - this.port = new SyncMessagePort(port1); - - this.worker = spawnWorker(p.join(p.dirname(__filename), 'worker'), { - workerData: {port: port2, command, args, options}, - transferList: [port2], - }); - - // The worker shouldn't emit any errors unless it breaks in development. - this.worker.on('error', console.error); - - this.stdin = new stream.Writable({ - write: (chunk: Buffer, encoding, callback) => { - this.port.postMessage( - { - type: 'stdin', - data: chunk as Buffer, - }, - [chunk.buffer], - ); - callback(); - }, - }); - - // Unfortunately, there's no built-in event or callback that will reliably - // *synchronously* notify us that the stdin stream has been closed. (The - // `final` callback works in Node v16 but not v14.) Instead, we wrap the - // methods themselves that are used to close the stream. - const oldEnd = this.stdin.end.bind(this.stdin) as ( - a1?: unknown, - a2?: unknown, - a3?: unknown, - ) => void; - this.stdin.end = ((a1?: unknown, a2?: unknown, a3?: unknown) => { - oldEnd(a1, a2, a3); - this.port.postMessage({type: 'stdinClosed'}); - }) as typeof this.stdin.end; - - const oldDestroy = this.stdin.destroy.bind(this.stdin) as ( - a1?: unknown, - ) => void; - this.stdin.destroy = ((a1?: unknown) => { - oldDestroy(a1); - this.port.postMessage({type: 'stdinClosed'}); - }) as typeof this.stdin.destroy; - } - - /** - * Blocks until the child process is ready to emit another event, then returns - * that event. - * - * If there's an error running the child process, this will throw that error. - * This may not be called after it emits an `ExitEvent` or throws an error. - */ - yield(): Event { - if (this.stdin.destroyed) { - throw new Error( - "Can't call SyncProcess.yield() after the process has exited.", - ); - } - - const message = this.port.receiveMessage() as InternalEvent; - switch (message.type) { - case 'stdout': - return {type: 'stdout', data: Buffer.from(message.data.buffer)}; - - case 'stderr': - return {type: 'stderr', data: Buffer.from(message.data.buffer)}; - - case 'error': - this.close(); - throw message.error; - - case 'exit': - this.close(); - return message; - } - } - - // TODO(nex3): Add a non-blocking `yieldIfReady()` function that returns - // `null` if the worker hasn't queued up an event. - - // TODO(nex3): Add a `yieldAsync()` function that returns a `Promise`. - - /** - * Sends a signal (`SIGTERM` by default) to the child process. - * - * This has no effect if the process has already exited. - */ - kill(signal?: NodeJS.Signals | number): void { - this.port.postMessage({type: 'kill', signal}); - } - - /** Closes down the worker thread and the stdin stream. */ - private close(): void { - this.port.close(); - void this.worker.terminate(); - this.stdin.destroy(); - } -} - -/** - * Spawns a worker for the given `fileWithoutExtension` in either a JS or TS - * worker, depending on which file exists. - */ -function spawnWorker( - fileWithoutExtension: string, - options: WorkerOptions, -): Worker { - // The released version always spawns the JS worker. The TS worker is only - // used for development. - const jsFile = fileWithoutExtension + '.js'; - if (fs.existsSync(jsFile)) return new Worker(jsFile, options); - - const tsFile = fileWithoutExtension + '.ts'; - if (fs.existsSync(tsFile)) { - return new Worker( - ` - require('ts-node').register(); - require(${JSON.stringify(tsFile)}); - `, - {...options, eval: true}, - ); - } - - throw new Error(`Neither "${jsFile}" nor ".ts" exists.`); -} - -/** - * A subset of the options for [`child_process.spawn()`]. - * - * [`child_process.spawn()`]: https://nodejs.org/api/child_process.html#child_processspawncommand-args-options - */ -export interface Options { - cwd?: string; - env?: Record; - argv0?: string; - uid?: number; - gid?: number; - shell?: boolean | string; - windowsVerbatimArguments?: boolean; - windowsHide?: boolean; - timeout?: number; - killSignal?: string | number; -} diff --git a/lib/src/sync-process/sync-message-port.test.ts b/lib/src/sync-process/sync-message-port.test.ts deleted file mode 100644 index 29fd08b1..00000000 --- a/lib/src/sync-process/sync-message-port.test.ts +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright 2021 Google LLC. Use of this source code is governed by an -// MIT-style license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -import * as fs from 'fs'; -import * as p from 'path'; -import {MessagePort, Worker} from 'worker_threads'; - -import {SyncMessagePort} from './sync-message-port'; - -describe('SyncMessagePort', () => { - describe('sends a message', () => { - it('before the other endpoint calls receiveMessage()', () => { - const channel = SyncMessagePort.createChannel(); - const port1 = new SyncMessagePort(channel.port1); - port1.postMessage('hi there!'); - - const port2 = new SyncMessagePort(channel.port2); - expect(port2.receiveMessage()).toEqual('hi there!'); - }); - - it('after the other endpoint calls receiveMessage()', () => { - const channel = SyncMessagePort.createChannel(); - const port = new SyncMessagePort(channel.port1); - - spawnWorker( - ` - // Wait a little bit just to make entirely sure that the parent thread - // is awaiting a message. - setTimeout(() => { - port.postMessage('done!'); - port.close(); - }, 100); - `, - channel.port2, - ); - - expect(port.receiveMessage()).toEqual('done!'); - expect(port.receiveMessage).toThrow(); - }); - - it('multiple times before the other endpoint starts reading', () => { - const channel = SyncMessagePort.createChannel(); - const port1 = new SyncMessagePort(channel.port1); - port1.postMessage('message1'); - port1.postMessage('message2'); - port1.postMessage('message3'); - port1.postMessage('message4'); - - const port2 = new SyncMessagePort(channel.port2); - expect(port2.receiveMessage()).toEqual('message1'); - expect(port2.receiveMessage()).toEqual('message2'); - expect(port2.receiveMessage()).toEqual('message3'); - expect(port2.receiveMessage()).toEqual('message4'); - }); - - it('multiple times and close', () => { - const channel = SyncMessagePort.createChannel(); - const port = new SyncMessagePort(channel.port1); - - spawnWorker( - ` - port.postMessage('message1'); - port.postMessage('done!'); - port.close(); - `, - channel.port2, - ); - - expect(port.receiveMessage()).toEqual('message1'); - expect(port.receiveMessage()).toEqual('done!'); - expect(port.receiveMessage).toThrow(); - }); - }); - - describe('with an asynchronous listener', () => { - it('receives a message sent before listening', async () => { - const channel = SyncMessagePort.createChannel(); - const port1 = new SyncMessagePort(channel.port1); - port1.postMessage('hi there!'); - - const port2 = new SyncMessagePort(channel.port2); - - // Wait a macrotask to make sure the message is as queued up as it's going - // to be. - await new Promise(process.nextTick); - - const promise = new Promise(resolve => port2.once('message', resolve)); - await expect(promise).resolves.toEqual('hi there!'); - port1.close(); - }); - - it('receives a message sent after listening', async () => { - const channel = SyncMessagePort.createChannel(); - const port1 = new SyncMessagePort(channel.port1); - const promise = new Promise(resolve => port1.once('message', resolve)); - - // Wait a macrotask to make sure the message is as queued up as it's going - // to be. - await new Promise(process.nextTick); - const port2 = new SyncMessagePort(channel.port2); - port2.postMessage('hi there!'); - - await expect(promise).resolves.toEqual('hi there!'); - port1.close(); - }); - - it('receiveMessage() throws an error after listening', async () => { - const channel = SyncMessagePort.createChannel(); - const port1 = new SyncMessagePort(channel.port1); - port1.on('message', () => {}); - - expect(port1.receiveMessage).toThrow(); - port1.close(); - }); - }); - - describe('close()', () => { - it('closing one port closes the other', async () => { - const channel = SyncMessagePort.createChannel(); - const port1 = new SyncMessagePort(channel.port1); - const port2 = new SyncMessagePort(channel.port2); - - port1.close(); - - // Should resolve. - await new Promise(resolve => port2.once('close', resolve)); - }); - - it('receiveMessage() throws an error for a closed port', () => { - const channel = SyncMessagePort.createChannel(); - const port1 = new SyncMessagePort(channel.port1); - const port2 = new SyncMessagePort(channel.port2); - - port1.close(); - expect(port1.receiveMessage).toThrow(); - expect(port2.receiveMessage).toThrow(); - }); - }); -}); - -/** - * Spawns a worker that executes the given TypeScript `source`. - * - * Automatically initializes a `SyncMessageChannel` named `port` connected to - * `port`. - */ -function spawnWorker(source: string, port: MessagePort): Worker { - fs.mkdirSync('spec/sandbox', {recursive: true}); - const file = p.join('spec/sandbox', `${Math.random()}.ts`.slice(2)); - fs.writeFileSync( - file, - ` - const {SyncMessagePort} = require(${JSON.stringify( - p.join(p.dirname(__filename), 'sync-message-port'), - )}); - const {workerData} = require('worker_threads'); - - const port = new SyncMessagePort(workerData); - - ${source} - `, - ); - - const worker = new Worker( - ` - require('ts-node').register(); - require(${JSON.stringify(p.resolve(file.substring(0, file.length - 3)))}); - `, - {eval: true, workerData: port, transferList: [port]}, - ); - - worker.on('error', error => { - throw error; - }); - worker.on('exit', () => fs.unlinkSync(file)); - - return worker; -} diff --git a/lib/src/sync-process/sync-message-port.ts b/lib/src/sync-process/sync-message-port.ts deleted file mode 100644 index efb5c40e..00000000 --- a/lib/src/sync-process/sync-message-port.ts +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright 2021 Google LLC. Use of this source code is governed by an -// MIT-style license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -import {strict as assert} from 'assert'; -import {EventEmitter} from 'events'; -import { - MessageChannel, - MessagePort, - TransferListItem, - receiveMessageOnPort, -} from 'worker_threads'; - -// TODO(nex3): Make this its own package. - -/** - * An enum of possible states for the shared buffer that two `SyncMessagePort`s - * use to communicate. - */ -enum BufferState { - /** - * The initial state. When an endpoint is ready to receive messages, it'll set - * the buffer to this state so that it can use `Atomics.wait()` to be notified - * when it switches to `MessageSent`. - */ - AwaitingMessage = 0b00, - /** - * The state indicating that a message has been sent. Whenever an endpoint - * sends a message, it'll set the buffer to this state so that the other - * endpoint's `Atomics.wait()` call terminates. - */ - MessageSent = 0b01, - /** - * The bitmask indicating that the channel has been closed. This is masked on - * top of AwaitingMessage and MessageSent state. It never transitions to any - * other states once closed. - */ - Closed = 0b10, -} - -/** - * A communication port that can receive messages synchronously from another - * `SyncMessagePort`. - * - * This also emits the same asynchronous events as `MessagePort`. - */ -export class SyncMessagePort extends EventEmitter { - /** Creates a channel whose ports can be passed to `new SyncMessagePort()`. */ - static createChannel(): MessageChannel { - const channel = new MessageChannel(); - // Four bytes is the minimum necessary to use `Atomics.wait()`. - const buffer = new SharedArrayBuffer(4); - - // Queue up messages on each port so the caller doesn't have to explicitly - // pass the buffer around along with them. - channel.port1.postMessage(buffer); - channel.port2.postMessage(buffer); - return channel; - } - - /** - * An Int32 view of the shared buffer. - * - * Each port sets this to `BufferState.AwaitingMessage` before checking for - * new messages in `receiveMessage()`, and each port sets it to - * `BufferState.MessageSent` after sending a new message. It's set to - * `BufferState.Closed` when the channel is closed. - */ - private readonly buffer: Int32Array; - - /** - * Creates a new message port. The `port` must be created by - * `SyncMessagePort.createChannel()` and must connect to a port passed to - * another `SyncMessagePort` in another worker. - */ - constructor(private readonly port: MessagePort) { - super(); - - const buffer = receiveMessageOnPort(this.port)?.message; - if (!buffer) { - throw new Error( - 'new SyncMessagePort() must be passed a port from ' + - 'SyncMessagePort.createChannel().', - ); - } - this.buffer = new Int32Array(buffer as SharedArrayBuffer); - - this.on('newListener', (event, listener) => { - this.port.on(event, listener); - }); - this.on('removeListener', (event, listener) => - this.port.removeListener(event, listener), - ); - } - - /** See `MessagePort.postMesage()`. */ - postMessage(value: unknown, transferList?: TransferListItem[]): void { - this.port.postMessage(value, transferList); - - // If the other port is waiting for a new message, notify it that the - // message is ready. Use `Atomics.compareExchange` so that we don't - // overwrite the "closed" state. - if ( - Atomics.compareExchange( - this.buffer, - 0, - BufferState.AwaitingMessage, - BufferState.MessageSent, - ) === BufferState.AwaitingMessage - ) { - Atomics.notify(this.buffer, 0); - } - } - - // TODO(nex3): - // * Add a non-blocking `receiveMessage()` - // * Add a timeout option to `receiveMessage()` - // * Add an option to `receiveMessage()` to return a special value if the - // channel is closed. - - /** - * Blocks and returns the next message sent by the other port. - * - * This may not be called while this has a listener for the `'message'` event. - * Throws an error if the channel is closed, including if it closes while this - * is waiting for a message. - */ - receiveMessage(): unknown { - if (this.listenerCount('message')) { - throw new Error( - 'SyncMessageChannel.receiveMessage() may not be called while there ' + - 'are message listeners.', - ); - } - - // Set the "new message" indicator to zero before we check for new messages. - // That way if the other port sets it to 1 between the call to - // `receiveMessageOnPort` and the call to `Atomics.wait()`, we won't - // overwrite it. Use `Atomics.compareExchange` so that we don't overwrite - // the "closed" state. - if ( - Atomics.compareExchange( - this.buffer, - 0, - BufferState.MessageSent, - BufferState.AwaitingMessage, - ) === BufferState.Closed - ) { - throw new Error("The SyncMessagePort's channel is closed."); - } - - let message = receiveMessageOnPort(this.port); - if (message) return message.message; - - // If there's no new message, wait for the other port to flip the "new - // message" indicator to 1. If it's been set to 1 since we stored 0, this - // will terminate immediately. - Atomics.wait(this.buffer, 0, BufferState.AwaitingMessage); - message = receiveMessageOnPort(this.port); - if (message) return message.message; - - // Update the state to 0b10 after the last message is consumed. - const oldState = Atomics.and(this.buffer, 0, BufferState.Closed); - // Assert the old state was either 0b10 or 0b11. - assert.equal(oldState & BufferState.Closed, BufferState.Closed); - throw new Error("The SyncMessagePort's channel is closed."); - } - - /** See `MessagePort.close()`. */ - close(): void { - Atomics.or(this.buffer, 0, BufferState.Closed); - this.port.close(); - } -} diff --git a/lib/src/sync-process/worker.ts b/lib/src/sync-process/worker.ts deleted file mode 100644 index d141d554..00000000 --- a/lib/src/sync-process/worker.ts +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2021 Google LLC. Use of this source code is governed by an -// MIT-style license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -import { - MessagePort, - TransferListItem, - parentPort, - workerData, -} from 'worker_threads'; -import {SpawnOptionsWithoutStdio, spawn} from 'child_process'; -import {strict as assert} from 'assert'; - -import {SyncMessagePort} from './sync-message-port'; -import {InternalEvent} from './event'; - -const port = new SyncMessagePort(workerData.port as MessagePort); - -/** A more type-safe way to call `port.postMesage()` */ -function emit(event: InternalEvent, transferList?: TransferListItem[]): void { - port.postMessage(event, transferList); -} - -const process = spawn( - workerData.command as string, - workerData.args as string[], - workerData.options as SpawnOptionsWithoutStdio | undefined, -); - -port.on('message', message => { - if (message.type === 'stdin') { - process.stdin.write(message.data as Buffer); - } else if (message.type === 'stdinClosed') { - process.stdin.end(); - } else { - assert.equal(message.type, 'kill'); - process.kill(message.signal as number | NodeJS.Signals | undefined); - } -}); - -process.stdout.on('data', data => { - emit({type: 'stdout', data}, [data.buffer]); -}); - -process.stderr.on('data', data => { - emit({type: 'stderr', data}, [data.buffer]); -}); - -process.on('error', error => { - emit({type: 'error', error}); - - process.kill(); - parentPort!.close(); - port.close(); -}); - -process.on('exit', (code, signal) => { - if (code !== null) { - emit({type: 'exit', code}); - } else { - assert(signal); - emit({type: 'exit', signal}); - } - - parentPort!.close(); - port.close(); -}); diff --git a/package.json b/package.json index f26c2ded..f3e69263 100644 --- a/package.json +++ b/package.json @@ -66,6 +66,7 @@ "immutable": "^4.0.0", "rxjs": "^7.4.0", "supports-color": "^8.1.1", + "sync-child-process": "^1.0.2", "varint": "^6.0.0" }, "devDependencies": {