Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support stdio #10

Merged
merged 3 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions fixtures/asynchronous-modules/stdio.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export const console = {...globalThis.console}
export const logs = async (messages) => {
for (const {type, message} of messages) {
console[type](message)
}
}
export const writes = async (messages) => {
for (const {type, message} of messages) {
process[type].write(message)
}
}
export const printObject = async () => {
console.log(printObject)
}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"npm-run-all": "4.1.5",
"prettier": "3.2.4",
"sort-package-json": "2.6.0",
"strip-ansi": "7.1.0",
"tempy": "3.1.0"
},
"packageManager": "[email protected]",
Expand Down
18 changes: 16 additions & 2 deletions source/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,40 @@ class Response {

#actionHandlers

#stdio = []

constructor(actionHandlers) {
this.#actionHandlers = actionHandlers

process.exit = () => {
this.#terminate()
processExit()
}

// https://github.com/nodejs/node/blob/66556f53a7b36384bce305865c30ca43eaa0874b/lib/internal/worker/io.js#L369
for (const type of ['stdout', 'stderr']) {
process[type]._writev = (chunks, callback) => {
for (const {chunk} of chunks) {
this.#stdio.push({type, chunk})
}

callback()
}
}
}

#send(response) {
const responsePort = this.#responsePort
const signal = this.#signal

try {
responsePort.postMessage(response)
responsePort.postMessage({...response, stdio: this.#stdio})
} catch {
const error = new Error(
`Cannot serialize worker response:\n${util.inspect(response.result)}`,
)

responsePort.postMessage({error})
responsePort.postMessage({error, stdio: this.#stdio})
} finally {
responsePort.close()

Expand Down Expand Up @@ -68,6 +81,7 @@ class Response {
async ({signal, port, action, payload}) => {
this.#signal = signal
this.#responsePort = port
this.#stdio.length = 0

try {
this.#sendResult(await this.#processAction(action, payload))
Expand Down
6 changes: 5 additions & 1 deletion source/threads-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,17 @@ class ThreadsWorker {
@param {number} [timeout]
*/
#sendActionToWorker(worker, action, payload, timeout) {
const {terminated, result, error, errorData} = request(
const {terminated, result, error, errorData, stdio} = request(
worker,
action,
payload,
timeout,
)

for (const {chunk, type} of stdio) {
process[type].write(chunk)
}

if (terminated && this.#worker) {
this.#worker.terminate()
this.#worker = undefined
Expand Down
136 changes: 136 additions & 0 deletions tests/stdio.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import test from 'node:test'
import * as assert from 'node:assert/strict'
import process from 'node:process'
import stripAnsi from 'strip-ansi'
import loadModuleForTests from '../scripts/load-module-for-tests.js'

const {makeSynchronized} = await loadModuleForTests()

const stdio = makeSynchronized(
new URL('../fixtures/asynchronous-modules/stdio.js', import.meta.url),
)

const getResult = (run) => {
const original = {}
const result = []

for (const type of ['stdout', 'stderr']) {
original[type] = process[type].write

process[type].write = (message) => {
message = stripAnsi(message)
result.push({type, message})
}
}

try {
run()
} finally {
for (const type of ['stdout', 'stderr']) {
process[type].write = original[type]
}
}

return result
}

test('stdio', () => {
{
const result = getResult(() => {
stdio.console.log('console.log called')
})
assert.deepEqual(result, [
{type: 'stdout', message: 'console.log called\n'},
])
}

{
const result = getResult(() => {
stdio.console.warn('console.warn called')
})
assert.deepEqual(result, [
{type: 'stderr', message: 'console.warn called\n'},
])
}

{
const result = getResult(() => {
stdio.console.error('console.error called')
})
assert.deepEqual(result, [
{type: 'stderr', message: 'console.error called\n'},
])
}

{
const result = getResult(() => {
stdio.console.table([{fisker: 'jerk'}])
})
const table = /* Indent */ `
┌─────────┬────────┐
│ (index) │ fisker │
├─────────┼────────┤
│ 0 │ 'jerk' │
└─────────┴────────┘
`
assert.deepEqual(result, [
{
type: 'stdout',
message: `${table
.trim()
.split('\n')
.map((line) => line.trim())
.join('\n')}\n`,
},
])
}

{
const result = getResult(() => {
stdio.logs([
{type: 'log', message: '1'},
{type: 'error', message: '2'},
{type: 'log', message: '3'},
{type: 'warn', message: '4'},
])
})
assert.deepEqual(result, [
{type: 'stdout', message: '1\n'},
{type: 'stderr', message: '2\n'},
{type: 'stdout', message: '3\n'},
{type: 'stderr', message: '4\n'},
])
}

{
const [{message}] = getResult(() => {
stdio.logs([{type: 'time'}, {type: 'timeEnd'}])
})

assert.ok(/default: [\d.]+ms\n/.test(message), message)
}

{
const messages = [
{type: 'stdout', message: '1'},
{type: 'stderr', message: '2'},
{type: 'stdout', message: '3'},
{type: 'stderr', message: '4'},
]

const result = getResult(() => {
stdio.writes(messages)
})
assert.deepEqual(result, messages)
}

{
const result = getResult(() => {
stdio.printObject()
})

assert.deepEqual(result, [
{type: 'stdout', message: '[AsyncFunction: printObject]\n'},
])
}
})