Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly support POSTs with streams #20

Merged
merged 2 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,20 @@ function createThreadInterceptor (opts) {
...opts,
headers,
}

delete newOpts.dispatcher

port.postMessage({ type: 'request', id, opts: newOpts, threadId })
if (newOpts.body?.[Symbol.asyncIterator]) {
collectBodyAndDispatch(newOpts, handler).then(() => {
port.postMessage({ type: 'request', id, opts: newOpts, threadId })
}, (err) => {
clearTimeout(handle)

handler.onError(err)
})
} else {
port.postMessage({ type: 'request', id, opts: newOpts, threadId })
}
const inflights = portInflights.get(port)

let handle
Expand Down Expand Up @@ -231,7 +242,7 @@ function wire ({ server: newServer, port, ...undiciOpts }) {
url: opts.path,
headers: opts.headers,
query: opts.query,
body: opts.body,
body: opts.body instanceof Uint8Array ? Buffer.from(opts.body) : opts.body,
}

const onInject = (err, res) => {
Expand Down Expand Up @@ -291,5 +302,21 @@ function wire ({ server: newServer, port, ...undiciOpts }) {
return { interceptor, replaceServer }
}

async function collectBodyAndDispatch (opts) {
const data = []

for await (const chunk of opts.body) {
data.push(chunk)
}

if (typeof data[0] === 'string') {
opts.body = data.join('')
} else if (data[0] instanceof Buffer) {
opts.body = Buffer.concat(data)
} else {
throw new Error('Cannot not transfer streams of objects')
}
}

module.exports.createThreadInterceptor = createThreadInterceptor
module.exports.wire = wire
51 changes: 31 additions & 20 deletions test/basic.test.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
'use strict'

const { test } = require('node:test')
const { Readable } = require('node:stream')
const { deepStrictEqual, strictEqual, rejects, ifError } = require('node:assert')
const { join } = require('path')
const { Worker } = require('worker_threads')
const { join } = require('node:path')
const { Worker } = require('node:worker_threads')
const { once } = require('node:events')
const { setTimeout: sleep } = require('node:timers/promises')
const { createThreadInterceptor } = require('../')
const { Agent, request } = require('undici')
const { once } = require('events')
const { setTimeout: sleep } = require('timers/promises')
const Fastify = require('fastify')

test('basic', async (t) => {
Expand Down Expand Up @@ -319,40 +320,50 @@ test('close', async (t) => {
await Promise.all([once(worker1, 'exit'), once(worker2, 'exit')])
})

test('timeout', async (t) => {
const empty = new Worker(join(__dirname, 'fixtures', 'empty.js'))
test('POST', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
timeout: 1000,
})
interceptor.route('myserver', empty)
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local', {
const { statusCode, body } = await request('http://myserver.local/echo-body', {
dispatcher: agent,
}), new Error('Timeout while waiting from a response from myserver.local'))
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: JSON.stringify({ hello: 'world' }),
})

empty.postMessage('close')
await once(empty, 'exit')
strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { hello: 'world' })
})

test('timeout set to a boolean', async (t) => {
const empty = new Worker(join(__dirname, 'fixtures', 'empty.js'))
test('POST with Stream', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
timeout: true,
})
interceptor.route('myserver', empty)
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local', {
const { statusCode, body } = await request('http://myserver.local/echo-body', {
dispatcher: agent,
}), new Error('Timeout while waiting from a response from myserver.local'))
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: Readable.from(JSON.stringify({ hello: 'world' })),
})

empty.postMessage('close')
await once(empty, 'exit')
strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { hello: 'world' })
})
4 changes: 4 additions & 0 deletions test/fixtures/worker1.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,8 @@ app.get('/no-headers', (req, reply) => {
reply.send(Readable.from(['text'], { objectMode: false }))
})

app.post('/echo-body', (req, reply) => {
reply.send(req.body)
})

wire({ server: app, port: parentPort })
127 changes: 127 additions & 0 deletions test/post.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
'use strict'

const { test } = require('node:test')
const { Readable } = require('node:stream')
const { deepStrictEqual, strictEqual, rejects } = require('node:assert')
const { join } = require('node:path')
const { Worker } = require('node:worker_threads')
const { createThreadInterceptor } = require('../')
const { Agent, request } = require('undici')

test('POST', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

const { statusCode, body } = await request('http://myserver.local/echo-body', {
dispatcher: agent,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: JSON.stringify({ hello: 'world' }),
})

strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { hello: 'world' })
})

test('POST with Stream', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

const { statusCode, body } = await request('http://myserver.local/echo-body', {
dispatcher: agent,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: Readable.from(JSON.stringify({ hello: 'world' })),
})

strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { hello: 'world' })
})

test('POST with Stream that errors', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local/echo-body', {
dispatcher: agent,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: new Readable({
read () {
this.destroy(new Error('kaboom'))
},
}),
}))
})

test('POST with buffer stream', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

const { statusCode, body } = await request('http://myserver.local/echo-body', {
dispatcher: agent,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: Readable.from(Buffer.from(JSON.stringify({ hello: 'world' }))),
})

strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { hello: 'world' })
})

test('POST errors with streams of objects', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local/echo-body', {
dispatcher: agent,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: Readable.from([{ hello: 'world' }]),
}))
})
47 changes: 47 additions & 0 deletions test/timeout.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
'use strict'

const { test } = require('node:test')
const { rejects } = require('node:assert')
const { join } = require('node:path')
const { Worker } = require('node:worker_threads')
const { once } = require('node:events')
const { createThreadInterceptor } = require('../')
const { Agent, request } = require('undici')

test('timeout', async (t) => {
const empty = new Worker(join(__dirname, 'fixtures', 'empty.js'))

const interceptor = createThreadInterceptor({
domain: '.local',
timeout: 1000,
})
interceptor.route('myserver', empty)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local', {
dispatcher: agent,
}), new Error('Timeout while waiting from a response from myserver.local'))

empty.postMessage('close')
await once(empty, 'exit')
})

test('timeout set to a boolean', async (t) => {
const empty = new Worker(join(__dirname, 'fixtures', 'empty.js'))

const interceptor = createThreadInterceptor({
domain: '.local',
timeout: true,
})
interceptor.route('myserver', empty)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local', {
dispatcher: agent,
}), new Error('Timeout while waiting from a response from myserver.local'))

empty.postMessage('close')
await once(empty, 'exit')
})
Loading