Skip to content

Commit

Permalink
Correctly support POSTs with streams
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Collina <[email protected]>
  • Loading branch information
mcollina committed Oct 7, 2024
1 parent b1176ca commit 8b2437d
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 34 deletions.
31 changes: 29 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,20 @@ function createThreadInterceptor (opts) {
...opts,
headers,
}

delete newOpts.dispatcher

port.postMessage({ type: 'request', id, opts: newOpts, threadId })
if (newOpts.body?.[Symbol.asyncIterator]) {
collectBodyAndDispatch(newOpts, handler).then(() => {
port.postMessage({ type: 'request', id, opts: newOpts, threadId })
}, (err) => {
clearTimeout(handle)

handler.onError(err)
})
} else {
port.postMessage({ type: 'request', id, opts: newOpts, threadId })
}
const inflights = portInflights.get(port)

let handle
Expand Down Expand Up @@ -231,7 +242,7 @@ function wire ({ server: newServer, port, ...undiciOpts }) {
url: opts.path,
headers: opts.headers,
query: opts.query,
body: opts.body,
body: opts.body instanceof Uint8Array ? Buffer.from(opts.body) : opts.body,
}

const onInject = (err, res) => {
Expand Down Expand Up @@ -291,5 +302,21 @@ function wire ({ server: newServer, port, ...undiciOpts }) {
return { interceptor, replaceServer }
}

async function collectBodyAndDispatch (opts) {
const data = []

for await (const chunk of opts.body) {
data.push(chunk)
}

if (typeof data[0] === 'string') {
opts.body = data.join('')
} else if (data[0] instanceof Buffer) {
opts.body = Buffer.concat(data)
} else {
throw new Error('Cannot not transfer streams of objects')
}
}

module.exports.createThreadInterceptor = createThreadInterceptor
module.exports.wire = wire
51 changes: 19 additions & 32 deletions test/basic.test.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
'use strict'

const { test } = require('node:test')
const { Readable } = require('node:stream')
const { deepStrictEqual, strictEqual, rejects, ifError } = require('node:assert')
const { join } = require('path')
const { Worker } = require('worker_threads')
const { join } = require('node:path')
const { Worker } = require('node:worker_threads')
const { once } = require('node:events')
const { setTimeout: sleep } = require('node:timers/promises')
const { createThreadInterceptor } = require('../')
const { Agent, request } = require('undici')
const { once } = require('events')
const { setTimeout: sleep } = require('timers/promises')
const Fastify = require('fastify')

test('basic', async (t) => {
Expand Down Expand Up @@ -319,45 +320,31 @@ test('close', async (t) => {
await Promise.all([once(worker1, 'exit'), once(worker2, 'exit')])
})

test('timeout', async (t) => {
const empty = new Worker(join(__dirname, 'fixtures', 'empty.js'))
test('POST', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
timeout: 1000,
})
interceptor.route('myserver', empty)
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local', {
const { statusCode, body } = await request('http://myserver.local/echo-body', {
dispatcher: agent,
}), new Error('Timeout while waiting from a response from myserver.local'))

empty.postMessage('close')
await once(empty, 'exit')
})

test('timeout set to a boolean', async (t) => {
const empty = new Worker(join(__dirname, 'fixtures', 'empty.js'))

const interceptor = createThreadInterceptor({
domain: '.local',
timeout: true,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: JSON.stringify({ hello: 'world' }),
})
interceptor.route('myserver', empty)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local', {
dispatcher: agent,
}), new Error('Timeout while waiting from a response from myserver.local'))

empty.postMessage('close')
await once(empty, 'exit')
strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { hello: 'world' })
})

test('POST', async (t) => {
test('POST with Stream', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

Expand All @@ -374,7 +361,7 @@ test('POST', async (t) => {
headers: {
'content-type': 'application/json',
},
body: JSON.stringify({ hello: 'world' }),
body: Readable.from(JSON.stringify({ hello: 'world' })),
})

strictEqual(statusCode, 200)
Expand Down
127 changes: 127 additions & 0 deletions test/post.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
'use strict'

const { test } = require('node:test')
const { Readable } = require('node:stream')
const { deepStrictEqual, strictEqual, rejects } = require('node:assert')
const { join } = require('node:path')
const { Worker } = require('node:worker_threads')
const { createThreadInterceptor } = require('../')
const { Agent, request } = require('undici')

test('POST', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

const { statusCode, body } = await request('http://myserver.local/echo-body', {
dispatcher: agent,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: JSON.stringify({ hello: 'world' }),
})

strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { hello: 'world' })
})

test('POST with Stream', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

const { statusCode, body } = await request('http://myserver.local/echo-body', {
dispatcher: agent,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: Readable.from(JSON.stringify({ hello: 'world' })),
})

strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { hello: 'world' })
})

test('POST with Stream that errors', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local/echo-body', {
dispatcher: agent,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: new Readable({
read () {
this.destroy(new Error('kaboom'))
},
}),
}))
})

test('POST with buffer stream', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

const { statusCode, body } = await request('http://myserver.local/echo-body', {
dispatcher: agent,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: Readable.from(Buffer.from(JSON.stringify({ hello: 'world' }))),
})

strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { hello: 'world' })
})

test('POST errors with streams of objects', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local/echo-body', {
dispatcher: agent,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: Readable.from([{ hello: 'world' }]),
}))
})
47 changes: 47 additions & 0 deletions test/timeout.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
'use strict'

const { test } = require('node:test')
const { rejects } = require('node:assert')
const { join } = require('node:path')
const { Worker } = require('node:worker_threads')
const { once } = require('node:events')
const { createThreadInterceptor } = require('../')
const { Agent, request } = require('undici')

test('timeout', async (t) => {
const empty = new Worker(join(__dirname, 'fixtures', 'empty.js'))

const interceptor = createThreadInterceptor({
domain: '.local',
timeout: 1000,
})
interceptor.route('myserver', empty)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local', {
dispatcher: agent,
}), new Error('Timeout while waiting from a response from myserver.local'))

empty.postMessage('close')
await once(empty, 'exit')
})

test('timeout set to a boolean', async (t) => {
const empty = new Worker(join(__dirname, 'fixtures', 'empty.js'))

const interceptor = createThreadInterceptor({
domain: '.local',
timeout: true,
})
interceptor.route('myserver', empty)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local', {
dispatcher: agent,
}), new Error('Timeout while waiting from a response from myserver.local'))

empty.postMessage('close')
await once(empty, 'exit')
})

0 comments on commit 8b2437d

Please sign in to comment.