Skip to content

Commit

Permalink
Correctly support POSTs with streams (#20)
Browse files Browse the repository at this point in the history
* add post test

Signed-off-by: Matteo Collina <[email protected]>

* Correctly support POSTs with streams

Signed-off-by: Matteo Collina <[email protected]>

---------

Signed-off-by: Matteo Collina <[email protected]>
  • Loading branch information
mcollina authored Oct 7, 2024
1 parent 4a89b5a commit 12691cf
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 22 deletions.
31 changes: 29 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,20 @@ function createThreadInterceptor (opts) {
...opts,
headers,
}

delete newOpts.dispatcher

port.postMessage({ type: 'request', id, opts: newOpts, threadId })
if (newOpts.body?.[Symbol.asyncIterator]) {
collectBodyAndDispatch(newOpts, handler).then(() => {
port.postMessage({ type: 'request', id, opts: newOpts, threadId })
}, (err) => {
clearTimeout(handle)

handler.onError(err)
})
} else {
port.postMessage({ type: 'request', id, opts: newOpts, threadId })
}
const inflights = portInflights.get(port)

let handle
Expand Down Expand Up @@ -241,7 +252,7 @@ function wire ({ server: newServer, port, ...undiciOpts }) {
url: opts.path,
headers: opts.headers,
query: opts.query,
body: opts.body,
body: opts.body instanceof Uint8Array ? Buffer.from(opts.body) : opts.body,
}

const onInject = (err, res) => {
Expand Down Expand Up @@ -301,5 +312,21 @@ function wire ({ server: newServer, port, ...undiciOpts }) {
return { interceptor, replaceServer }
}

async function collectBodyAndDispatch (opts) {
const data = []

for await (const chunk of opts.body) {
data.push(chunk)
}

if (typeof data[0] === 'string') {
opts.body = data.join('')
} else if (data[0] instanceof Buffer) {
opts.body = Buffer.concat(data)
} else {
throw new Error('Cannot not transfer streams of objects')
}
}

module.exports.createThreadInterceptor = createThreadInterceptor
module.exports.wire = wire
51 changes: 31 additions & 20 deletions test/basic.test.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
'use strict'

const { test } = require('node:test')
const { Readable } = require('node:stream')
const { deepStrictEqual, strictEqual, rejects, ifError } = require('node:assert')
const { join } = require('path')
const { Worker } = require('worker_threads')
const { join } = require('node:path')
const { Worker } = require('node:worker_threads')
const { once } = require('node:events')
const { setTimeout: sleep } = require('node:timers/promises')
const { createThreadInterceptor } = require('../')
const { Agent, request } = require('undici')
const { once } = require('events')
const { setTimeout: sleep } = require('timers/promises')
const Fastify = require('fastify')

test('basic', async (t) => {
Expand Down Expand Up @@ -319,40 +320,50 @@ test('close', async (t) => {
await Promise.all([once(worker1, 'exit'), once(worker2, 'exit')])
})

test('timeout', async (t) => {
const empty = new Worker(join(__dirname, 'fixtures', 'empty.js'))
test('POST', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
timeout: 1000,
})
interceptor.route('myserver', empty)
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local', {
const { statusCode, body } = await request('http://myserver.local/echo-body', {
dispatcher: agent,
}), new Error('Timeout while waiting from a response from myserver.local'))
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: JSON.stringify({ hello: 'world' }),
})

empty.postMessage('close')
await once(empty, 'exit')
strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { hello: 'world' })
})

test('timeout set to a boolean', async (t) => {
const empty = new Worker(join(__dirname, 'fixtures', 'empty.js'))
test('POST with Stream', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
timeout: true,
})
interceptor.route('myserver', empty)
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local', {
const { statusCode, body } = await request('http://myserver.local/echo-body', {
dispatcher: agent,
}), new Error('Timeout while waiting from a response from myserver.local'))
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: Readable.from(JSON.stringify({ hello: 'world' })),
})

empty.postMessage('close')
await once(empty, 'exit')
strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { hello: 'world' })
})
4 changes: 4 additions & 0 deletions test/fixtures/worker1.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,8 @@ app.get('/no-headers', (req, reply) => {
reply.send(Readable.from(['text'], { objectMode: false }))
})

app.post('/echo-body', (req, reply) => {
reply.send(req.body)
})

wire({ server: app, port: parentPort })
127 changes: 127 additions & 0 deletions test/post.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
'use strict'

const { test } = require('node:test')
const { Readable } = require('node:stream')
const { deepStrictEqual, strictEqual, rejects } = require('node:assert')
const { join } = require('node:path')
const { Worker } = require('node:worker_threads')
const { createThreadInterceptor } = require('../')
const { Agent, request } = require('undici')

test('POST', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

const { statusCode, body } = await request('http://myserver.local/echo-body', {
dispatcher: agent,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: JSON.stringify({ hello: 'world' }),
})

strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { hello: 'world' })
})

test('POST with Stream', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

const { statusCode, body } = await request('http://myserver.local/echo-body', {
dispatcher: agent,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: Readable.from(JSON.stringify({ hello: 'world' })),
})

strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { hello: 'world' })
})

test('POST with Stream that errors', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local/echo-body', {
dispatcher: agent,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: new Readable({
read () {
this.destroy(new Error('kaboom'))
},
}),
}))
})

test('POST with buffer stream', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

const { statusCode, body } = await request('http://myserver.local/echo-body', {
dispatcher: agent,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: Readable.from(Buffer.from(JSON.stringify({ hello: 'world' }))),
})

strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { hello: 'world' })
})

test('POST errors with streams of objects', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local/echo-body', {
dispatcher: agent,
method: 'POST',
headers: {
'content-type': 'application/json',
},
body: Readable.from([{ hello: 'world' }]),
}))
})
47 changes: 47 additions & 0 deletions test/timeout.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
'use strict'

const { test } = require('node:test')
const { rejects } = require('node:assert')
const { join } = require('node:path')
const { Worker } = require('node:worker_threads')
const { once } = require('node:events')
const { createThreadInterceptor } = require('../')
const { Agent, request } = require('undici')

test('timeout', async (t) => {
const empty = new Worker(join(__dirname, 'fixtures', 'empty.js'))

const interceptor = createThreadInterceptor({
domain: '.local',
timeout: 1000,
})
interceptor.route('myserver', empty)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local', {
dispatcher: agent,
}), new Error('Timeout while waiting from a response from myserver.local'))

empty.postMessage('close')
await once(empty, 'exit')
})

test('timeout set to a boolean', async (t) => {
const empty = new Worker(join(__dirname, 'fixtures', 'empty.js'))

const interceptor = createThreadInterceptor({
domain: '.local',
timeout: true,
})
interceptor.route('myserver', empty)

const agent = new Agent().compose(interceptor)

await rejects(request('http://myserver.local', {
dispatcher: agent,
}), new Error('Timeout while waiting from a response from myserver.local'))

empty.postMessage('close')
await once(empty, 'exit')
})

0 comments on commit 12691cf

Please sign in to comment.