diff --git a/index.js b/index.js index 1fc2446..2d46c71 100644 --- a/index.js +++ b/index.js @@ -56,9 +56,20 @@ function createThreadInterceptor (opts) { ...opts, headers, } + delete newOpts.dispatcher - port.postMessage({ type: 'request', id, opts: newOpts, threadId }) + if (newOpts.body?.[Symbol.asyncIterator]) { + collectBodyAndDispatch(newOpts, handler).then(() => { + port.postMessage({ type: 'request', id, opts: newOpts, threadId }) + }, (err) => { + clearTimeout(handle) + + handler.onError(err) + }) + } else { + port.postMessage({ type: 'request', id, opts: newOpts, threadId }) + } const inflights = portInflights.get(port) let handle @@ -241,7 +252,7 @@ function wire ({ server: newServer, port, ...undiciOpts }) { url: opts.path, headers: opts.headers, query: opts.query, - body: opts.body, + body: opts.body instanceof Uint8Array ? Buffer.from(opts.body) : opts.body, } const onInject = (err, res) => { @@ -301,5 +312,21 @@ function wire ({ server: newServer, port, ...undiciOpts }) { return { interceptor, replaceServer } } +async function collectBodyAndDispatch (opts) { + const data = [] + + for await (const chunk of opts.body) { + data.push(chunk) + } + + if (typeof data[0] === 'string') { + opts.body = data.join('') + } else if (data[0] instanceof Buffer) { + opts.body = Buffer.concat(data) + } else { + throw new Error('Cannot not transfer streams of objects') + } +} + module.exports.createThreadInterceptor = createThreadInterceptor module.exports.wire = wire diff --git a/test/basic.test.js b/test/basic.test.js index 6ce4bca..28eeffb 100644 --- a/test/basic.test.js +++ b/test/basic.test.js @@ -1,13 +1,14 @@ 'use strict' const { test } = require('node:test') +const { Readable } = require('node:stream') const { deepStrictEqual, strictEqual, rejects, ifError } = require('node:assert') -const { join } = require('path') -const { Worker } = require('worker_threads') +const { join } = require('node:path') +const { Worker } = require('node:worker_threads') +const { once } = require('node:events') +const { setTimeout: sleep } = require('node:timers/promises') const { createThreadInterceptor } = require('../') const { Agent, request } = require('undici') -const { once } = require('events') -const { setTimeout: sleep } = require('timers/promises') const Fastify = require('fastify') test('basic', async (t) => { @@ -319,40 +320,50 @@ test('close', async (t) => { await Promise.all([once(worker1, 'exit'), once(worker2, 'exit')]) }) -test('timeout', async (t) => { - const empty = new Worker(join(__dirname, 'fixtures', 'empty.js')) +test('POST', async (t) => { + const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) + t.after(() => worker.terminate()) const interceptor = createThreadInterceptor({ domain: '.local', - timeout: 1000, }) - interceptor.route('myserver', empty) + interceptor.route('myserver', worker) const agent = new Agent().compose(interceptor) - await rejects(request('http://myserver.local', { + const { statusCode, body } = await request('http://myserver.local/echo-body', { dispatcher: agent, - }), new Error('Timeout while waiting from a response from myserver.local')) + method: 'POST', + headers: { + 'content-type': 'application/json', + }, + body: JSON.stringify({ hello: 'world' }), + }) - empty.postMessage('close') - await once(empty, 'exit') + strictEqual(statusCode, 200) + deepStrictEqual(await body.json(), { hello: 'world' }) }) -test('timeout set to a boolean', async (t) => { - const empty = new Worker(join(__dirname, 'fixtures', 'empty.js')) +test('POST with Stream', async (t) => { + const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) + t.after(() => worker.terminate()) const interceptor = createThreadInterceptor({ domain: '.local', - timeout: true, }) - interceptor.route('myserver', empty) + interceptor.route('myserver', worker) const agent = new Agent().compose(interceptor) - await rejects(request('http://myserver.local', { + const { statusCode, body } = await request('http://myserver.local/echo-body', { dispatcher: agent, - }), new Error('Timeout while waiting from a response from myserver.local')) + method: 'POST', + headers: { + 'content-type': 'application/json', + }, + body: Readable.from(JSON.stringify({ hello: 'world' })), + }) - empty.postMessage('close') - await once(empty, 'exit') + strictEqual(statusCode, 200) + deepStrictEqual(await body.json(), { hello: 'world' }) }) diff --git a/test/fixtures/worker1.js b/test/fixtures/worker1.js index ea26103..4491c0c 100644 --- a/test/fixtures/worker1.js +++ b/test/fixtures/worker1.js @@ -36,4 +36,8 @@ app.get('/no-headers', (req, reply) => { reply.send(Readable.from(['text'], { objectMode: false })) }) +app.post('/echo-body', (req, reply) => { + reply.send(req.body) +}) + wire({ server: app, port: parentPort }) diff --git a/test/post.test.js b/test/post.test.js new file mode 100644 index 0000000..bd1f3e5 --- /dev/null +++ b/test/post.test.js @@ -0,0 +1,127 @@ +'use strict' + +const { test } = require('node:test') +const { Readable } = require('node:stream') +const { deepStrictEqual, strictEqual, rejects } = require('node:assert') +const { join } = require('node:path') +const { Worker } = require('node:worker_threads') +const { createThreadInterceptor } = require('../') +const { Agent, request } = require('undici') + +test('POST', async (t) => { + const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) + t.after(() => worker.terminate()) + + const interceptor = createThreadInterceptor({ + domain: '.local', + }) + interceptor.route('myserver', worker) + + const agent = new Agent().compose(interceptor) + + const { statusCode, body } = await request('http://myserver.local/echo-body', { + dispatcher: agent, + method: 'POST', + headers: { + 'content-type': 'application/json', + }, + body: JSON.stringify({ hello: 'world' }), + }) + + strictEqual(statusCode, 200) + deepStrictEqual(await body.json(), { hello: 'world' }) +}) + +test('POST with Stream', async (t) => { + const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) + t.after(() => worker.terminate()) + + const interceptor = createThreadInterceptor({ + domain: '.local', + }) + interceptor.route('myserver', worker) + + const agent = new Agent().compose(interceptor) + + const { statusCode, body } = await request('http://myserver.local/echo-body', { + dispatcher: agent, + method: 'POST', + headers: { + 'content-type': 'application/json', + }, + body: Readable.from(JSON.stringify({ hello: 'world' })), + }) + + strictEqual(statusCode, 200) + deepStrictEqual(await body.json(), { hello: 'world' }) +}) + +test('POST with Stream that errors', async (t) => { + const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) + t.after(() => worker.terminate()) + + const interceptor = createThreadInterceptor({ + domain: '.local', + }) + interceptor.route('myserver', worker) + + const agent = new Agent().compose(interceptor) + + await rejects(request('http://myserver.local/echo-body', { + dispatcher: agent, + method: 'POST', + headers: { + 'content-type': 'application/json', + }, + body: new Readable({ + read () { + this.destroy(new Error('kaboom')) + }, + }), + })) +}) + +test('POST with buffer stream', async (t) => { + const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) + t.after(() => worker.terminate()) + + const interceptor = createThreadInterceptor({ + domain: '.local', + }) + interceptor.route('myserver', worker) + + const agent = new Agent().compose(interceptor) + + const { statusCode, body } = await request('http://myserver.local/echo-body', { + dispatcher: agent, + method: 'POST', + headers: { + 'content-type': 'application/json', + }, + body: Readable.from(Buffer.from(JSON.stringify({ hello: 'world' }))), + }) + + strictEqual(statusCode, 200) + deepStrictEqual(await body.json(), { hello: 'world' }) +}) + +test('POST errors with streams of objects', async (t) => { + const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js')) + t.after(() => worker.terminate()) + + const interceptor = createThreadInterceptor({ + domain: '.local', + }) + interceptor.route('myserver', worker) + + const agent = new Agent().compose(interceptor) + + await rejects(request('http://myserver.local/echo-body', { + dispatcher: agent, + method: 'POST', + headers: { + 'content-type': 'application/json', + }, + body: Readable.from([{ hello: 'world' }]), + })) +}) diff --git a/test/timeout.test.js b/test/timeout.test.js new file mode 100644 index 0000000..2a19615 --- /dev/null +++ b/test/timeout.test.js @@ -0,0 +1,47 @@ +'use strict' + +const { test } = require('node:test') +const { rejects } = require('node:assert') +const { join } = require('node:path') +const { Worker } = require('node:worker_threads') +const { once } = require('node:events') +const { createThreadInterceptor } = require('../') +const { Agent, request } = require('undici') + +test('timeout', async (t) => { + const empty = new Worker(join(__dirname, 'fixtures', 'empty.js')) + + const interceptor = createThreadInterceptor({ + domain: '.local', + timeout: 1000, + }) + interceptor.route('myserver', empty) + + const agent = new Agent().compose(interceptor) + + await rejects(request('http://myserver.local', { + dispatcher: agent, + }), new Error('Timeout while waiting from a response from myserver.local')) + + empty.postMessage('close') + await once(empty, 'exit') +}) + +test('timeout set to a boolean', async (t) => { + const empty = new Worker(join(__dirname, 'fixtures', 'empty.js')) + + const interceptor = createThreadInterceptor({ + domain: '.local', + timeout: true, + }) + interceptor.route('myserver', empty) + + const agent = new Agent().compose(interceptor) + + await rejects(request('http://myserver.local', { + dispatcher: agent, + }), new Error('Timeout while waiting from a response from myserver.local')) + + empty.postMessage('close') + await once(empty, 'exit') +})