From c7845064c9a62f93eed2794232d54fea9e6727df Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 25 Nov 2024 11:52:51 +0100 Subject: [PATCH 01/13] removed useless file Signed-off-by: Matteo Collina --- test.ttf | Bin 1024 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 test.ttf diff --git a/test.ttf b/test.ttf deleted file mode 100644 index b20ed066d6ebc47b0fc28096b48e4eff6c9035e8..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1024 zcmV+b1poWw*M~+4Uh1#)WhJF1(M+LP0(6$B2jZ9TE?6(;9K3I5Zozz~J|%v=!B(-q zhuiOaRDz4+kSNR(d5TAchxHe(xf#aXPX9Wu1;R6<}t873x=!?7B z7JKfM%kTj6L8{SCMdR$>_CxoZ;yiwlMP=YzLyg8FT5TuCntzPeR!f}b5PWOTG1EOY zfkgk*$(#ND{4(Eh+|zZ8HM*iuiUpSZ&evr);u z{>K4?0wg^^}Fjs&_9o@ z-oxs5rfPrDg6oyb!Kxd2+o#QjWta*lea1D54kBkn>U9~qyuWn_}Z(&Uz z&u=zUdIjU#&%Tv^CzVuLzx1a{_&SR)7nZKq09NeU1sglOZRm^W|DR1dE=*U8%X*qH zvAXF+4f% z=~8@K?6-;^%eEKLoKhv7^Bo zJE;?a*f#2*$Sr0rg9Vocs*QHS`+Htjn`PE(=^Q#gLT4Zy_J-3{HOFW0a>t(Sgt(h2 zCOEvif?)pehdh=Zxov&I7T0RTtA9)%mFcMrCJQrB7qw{g^$}Kq&rBo`Eix$dlZmr0 zfQPtr_2rJY3m=1L4L79pYNrvGhu#gt0pU>1HnD(wi#?b*z?(%Qc#^CdD(`U4j=)3e zCSYmLh>jN-t-zQ7UP?(H^D;Qr@}6K=(a&rpEPeqclv*b?8|-nkrFxlK*20DK`4V=Q z>C@Qs*f?uJa02#7mK<{%aB0pRNe6mZ`lzT4!`r?AONX*v8LMrp>XvtV8jOX^y#cQH u-9u>MM!FdgTr-0OrE^$B;cMxwRC!wUh-gm!`Nh0`R|^(KA-L&0mRc|Zyabv6 From 6eb3ecbb62918dc5763df5339294c3ad81f4ab2f Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 25 Nov 2024 12:29:28 +0100 Subject: [PATCH 02/13] first part of a fix Signed-off-by: Matteo Collina --- index.js | 11 ++++- lib/roundrobin.js | 4 ++ test/fixtures/network-crash.js | 22 +++++++++ test/service-restarted.test.js | 83 ++++++++++++++++++++++++++++++++++ 4 files changed, 119 insertions(+), 1 deletion(-) create mode 100644 test/fixtures/network-crash.js create mode 100644 test/service-restarted.test.js diff --git a/index.js b/index.js index 483c564..ef196fa 100644 --- a/index.js +++ b/index.js @@ -163,9 +163,11 @@ function createThreadInterceptor (opts) { routes.set(url, new RoundRobin()) } - const roundRobinIndex = routes.get(url).add(port) + const roundRobin = routes.get(url) + roundRobin.add(port) function onClose () { + const roundRobinIndex = routes.get(url).findIndex(port) const roundRobin = routes.get(url) roundRobin.remove(port) for (const f of forwarded.get(port)) { @@ -199,12 +201,18 @@ function createThreadInterceptor (opts) { inflight(err, res) } } else if (msg.type === 'address') { + const roundRobinIndex = roundRobin.findIndex(port) res.setAddress(url, roundRobinIndex, msg.address, forward) } }) } res.setAddress = (url, index, address, forward = true) => { + /* + if (!address) { + throw new Error('address is required') + } + */ const port = routes.get(url)?.get(index) if (port) { @@ -217,6 +225,7 @@ function createThreadInterceptor (opts) { for (const [, roundRobin] of routes) { for (const otherPort of roundRobin) { + process._rawDebug('sending address', address, 'to', otherPort.threadId, 'for', url) otherPort.postMessage({ type: 'address', url, index, address }) } } diff --git a/lib/roundrobin.js b/lib/roundrobin.js index 741fa9b..5923ea8 100644 --- a/lib/roundrobin.js +++ b/lib/roundrobin.js @@ -17,6 +17,10 @@ class RoundRobin { return this.ports.length - 1 } + findIndex (port) { + return this.ports.indexOf(port) + } + remove (port) { const index = this.ports.indexOf(port) if (index === -1) { diff --git a/test/fixtures/network-crash.js b/test/fixtures/network-crash.js new file mode 100644 index 0000000..8742cad --- /dev/null +++ b/test/fixtures/network-crash.js @@ -0,0 +1,22 @@ +'use strict' + +const { parentPort } = require('worker_threads') +const fastify = require('fastify') +const { wire } = require('../../') + +const app = fastify() + +app.get('/example', async (request, reply) => { + return { hello: 'world' } +}) + +app.get('/crash', async (request, reply) => { + process.exit(1) +}) + +// TODO(mcollina): there is a race condition here +const { replaceServer } = wire({ port: parentPort }) +app.listen({ port: 0 }).then((url) => { + process._rawDebug('server started on', url) + replaceServer(url) +}) diff --git a/test/service-restarted.test.js b/test/service-restarted.test.js new file mode 100644 index 0000000..3f60d23 --- /dev/null +++ b/test/service-restarted.test.js @@ -0,0 +1,83 @@ +'use strict' + +const { test } = require('node:test') +const { strictEqual, rejects } = require('node:assert') +const { join } = require('node:path') +const { Worker } = require('node:worker_threads') +const { setTimeout: sleep } = require('node:timers/promises') +const { createThreadInterceptor } = require('../') +const { Agent, request } = require('undici') + +test('two service in a mesh, one is terminated with an inflight message', async (t) => { + const worker1 = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), { + workerData: { message: 'mesh' }, + }) + t.after(() => worker1.terminate()) + const worker2 = new Worker(join(__dirname, 'fixtures', 'network-crash.js')) + t.after(() => worker2.terminate()) + + const interceptor = createThreadInterceptor({ + domain: '.local', + }) + interceptor.route('myserver', worker1) + interceptor.route('myserver2', worker2) + + const agent = new Agent().compose(interceptor) + + await sleep(1000) + + { + const res = await request('http://myserver.local/example', { + dispatcher: agent, + }) + + strictEqual(res.statusCode, 200) + await res.body.dump() + } + + { + const res = await request('http://myserver2.local/example', { + dispatcher: agent, + }) + + strictEqual(res.statusCode, 200) + await res.body.dump() + } + + console.log('all successful request completed') + + await rejects(request('http://myserver2.local/crash', { + dispatcher: agent, + })) + + console.log('service crashed') + + const worker2bis = new Worker(join(__dirname, 'fixtures', 'network-crash.js')) + t.after(() => worker2bis.terminate()) + + interceptor.route('myserver2', worker2bis) + + await sleep(2000) + + console.log('calling worker1') + + { + const res = await request('http://myserver.local/example', { + dispatcher: agent, + }) + + strictEqual(res.statusCode, 200) + await res.body.dump() + } + + console.log('calling worker2bis') + + { + const res = await request('http://myserver2.local/example', { + dispatcher: agent, + }) + + strictEqual(res.statusCode, 200) + await res.body.dump() + } +}) From d669205662e608af367e04c793b4d45f37a122fa Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 25 Nov 2024 13:29:45 +0100 Subject: [PATCH 03/13] working but ugly Signed-off-by: Matteo Collina --- index.js | 38 +++++++++++--- test/fixtures/composer.js | 30 +++++++++++ test/service-restarted.test.js | 91 +++++++++++++++++++++++++++++++++- 3 files changed, 151 insertions(+), 8 deletions(-) create mode 100644 test/fixtures/composer.js diff --git a/index.js b/index.js index ef196fa..cf00857 100644 --- a/index.js +++ b/index.js @@ -2,6 +2,7 @@ const RoundRobin = require('./lib/roundrobin') const hyperid = require('hyperid') +const { workerData } = require('worker_threads') const { getGlobalDispatcher, setGlobalDispatcher } = require('undici') const { threadId, MessageChannel, parentPort } = require('worker_threads') const inject = require('light-my-request') @@ -42,6 +43,7 @@ function createThreadInterceptor (opts) { const port = roundRobin.next() if (port[kAddress]) { + return dispatch({ ...opts, origin: port[kAddress] }, handler) } @@ -193,6 +195,8 @@ function createThreadInterceptor (opts) { const inflights = new Map() portInflights.set(port, inflights) port.on('message', (msg) => { + process._rawDebug('==== start') + process._rawDebug(workerData?.name || 'main', 'otherMessage', msg) if (msg.type === 'response') { const { id, res, err } = msg const inflight = inflights.get(id) @@ -201,13 +205,24 @@ function createThreadInterceptor (opts) { inflight(err, res) } } else if (msg.type === 'address') { - const roundRobinIndex = roundRobin.findIndex(port) - res.setAddress(url, roundRobinIndex, msg.address, forward) + // TODO(mcollina): verify the else clause + if (!msg.url) { + const roundRobinIndex = roundRobin.findIndex(port) + res.setAddress(url, roundRobinIndex, msg.address, forward) + } } + process._rawDebug('==== end') }) } res.setAddress = (url, index, address, forward = true) => { + process._rawDebug(workerData?.name || 'main', 'setAddress', url, index, address, forward, new Error().stack) + process._rawDebug(workerData?.name || 'main', 'routes', Array.from(routes.entries()).map((e) => { + const key = e[0] + const value = e[1] + return { key, addresses: value.ports.map(p => p[kAddress]) } + })); + /* if (!address) { throw new Error('address is required') @@ -225,7 +240,7 @@ function createThreadInterceptor (opts) { for (const [, roundRobin] of routes) { for (const otherPort of roundRobin) { - process._rawDebug('sending address', address, 'to', otherPort.threadId, 'for', url) + process._rawDebug(workerData?.name || 'main', 'sending address', address, 'to', otherPort.threadId, 'for', url) otherPort.postMessage({ type: 'address', url, index, address }) } } @@ -252,6 +267,7 @@ function wire ({ server: newServer, port, ...undiciOpts }) { replaceServer(newServer) function replaceServer (newServer) { + process._rawDebug(workerData?.name || 'main', 'replacing server', server, 'with', newServer) server = newServer if (typeof server === 'string') { @@ -262,6 +278,8 @@ function wire ({ server: newServer, port, ...undiciOpts }) { } function onMessage (msg) { + process._rawDebug('-----') + process._rawDebug(workerData?.name, 'received', msg) if (msg.type === 'request') { const { id, opts } = msg @@ -317,10 +335,15 @@ function wire ({ server: newServer, port, ...undiciOpts }) { return } - if (hasInject) { - server.inject(injectOpts, onInject) - } else { - inject(server, injectOpts, onInject) + try { + if (hasInject) { + server.inject(injectOpts, onInject) + } else { + inject(server, injectOpts, onInject) + } + } catch (err) { + process._rawDebug(workerData?.name || 'main', 'error', err) + throw err } }) } else if (msg.type === 'route') { @@ -329,6 +352,7 @@ function wire ({ server: newServer, port, ...undiciOpts }) { } else if (msg.type === 'address') { interceptor.setAddress(msg.url, msg.index, msg.address, false) } + process._rawDebug('-----') } port.on('message', onMessage) diff --git a/test/fixtures/composer.js b/test/fixtures/composer.js new file mode 100644 index 0000000..02bf02d --- /dev/null +++ b/test/fixtures/composer.js @@ -0,0 +1,30 @@ +'use strict' + +const { parentPort } = require('worker_threads') +const fastify = require('fastify') +const { wire } = require('../../') +const { request } = require('undici') + +const app = fastify() + +wire({ server: app, port: parentPort, domain: '.local' }) + +app.get('/s1/example', async (req, reply) => { + const { body } = await request('http://myserver.local/example') + return await body.json() +}) + +app.get('/s2/example', async (req, reply) => { + const { body } = await request('http://myserver2.local/example') + return await body.json() +}) + +app.get('/s1/crash', async (req, reply) => { + const { body } = await request('http://myserver.local/crash') + return await body.json() +}) + +app.get('/s2/crash', async (req, reply) => { + const { body } = await request('http://myserver2.local/crash') + return await body.json() +}) diff --git a/test/service-restarted.test.js b/test/service-restarted.test.js index 3f60d23..cadce55 100644 --- a/test/service-restarted.test.js +++ b/test/service-restarted.test.js @@ -8,7 +8,7 @@ const { setTimeout: sleep } = require('node:timers/promises') const { createThreadInterceptor } = require('../') const { Agent, request } = require('undici') -test('two service in a mesh, one is terminated with an inflight message', async (t) => { +test('service restart with network / 1', async (t) => { const worker1 = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), { workerData: { message: 'mesh' }, }) @@ -81,3 +81,92 @@ test('two service in a mesh, one is terminated with an inflight message', async await res.body.dump() } }) + +test.only('service restart with network / 2', async (t) => { + const composer = new Worker(join(__dirname, 'fixtures', 'composer.js'), { + workerData: { name: 'composer'}, + }) + t.after(() => composer.terminate()) + const worker1 = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), { + workerData: { name: 'worker1'}, + }) + t.after(() => worker1.terminate()) + const worker2 = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), { + workerData: { name: 'worker2'}, + }) + t.after(() => worker2.terminate()) + + const interceptor = createThreadInterceptor({ + domain: '.local', + }) + interceptor.route('composer', composer) + interceptor.route('myserver', worker1) + interceptor.route('myserver2', worker2) + + const agent = new Agent().compose(interceptor) + + await sleep(1000) + + /* + { + const res = await request('http://composer.local/s1/example', { + dispatcher: agent, + }) + + strictEqual(res.statusCode, 200) + await res.body.dump() + } + + { + const res = await request('http://composer.local/s2/example', { + dispatcher: agent, + }) + + strictEqual(res.statusCode, 200) + await res.body.dump() + } + + console.log('===> all successful request completed') + */ + + { + const res = await request('http://composer.local/s2/crash', { + dispatcher: agent, + }) + strictEqual(res.statusCode, 500) + await res.body.dump() + } + + console.log('===> service crashed') + + const worker2bis = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), { + workerData: { name: 'worker2bis'}, + }) + t.after(() => worker2bis.terminate()) + + interceptor.route('myserver2', worker2bis) + + await sleep(2000) + + console.log('calling worker1') + + { + const res = await request('http://composer.local/s1/example', { + dispatcher: agent, + }) + + strictEqual(res.statusCode, 200) + await res.body.dump() + } + + console.log('calling worker2bis') + + { + const res = await request('http://composer.local/s2/example', { + dispatcher: agent, + }) + + strictEqual(res.statusCode, 200) + await res.body.dump() + } +}) From c4b46a70358c6ad74118d391a667256ad17cdcc6 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 25 Nov 2024 13:31:31 +0100 Subject: [PATCH 04/13] pretty Signed-off-by: Matteo Collina --- index.js | 35 ++++------------------------------ test/service-restarted.test.js | 28 +++++---------------------- 2 files changed, 9 insertions(+), 54 deletions(-) diff --git a/index.js b/index.js index cf00857..7c6258d 100644 --- a/index.js +++ b/index.js @@ -2,7 +2,6 @@ const RoundRobin = require('./lib/roundrobin') const hyperid = require('hyperid') -const { workerData } = require('worker_threads') const { getGlobalDispatcher, setGlobalDispatcher } = require('undici') const { threadId, MessageChannel, parentPort } = require('worker_threads') const inject = require('light-my-request') @@ -43,7 +42,6 @@ function createThreadInterceptor (opts) { const port = roundRobin.next() if (port[kAddress]) { - return dispatch({ ...opts, origin: port[kAddress] }, handler) } @@ -195,8 +193,6 @@ function createThreadInterceptor (opts) { const inflights = new Map() portInflights.set(port, inflights) port.on('message', (msg) => { - process._rawDebug('==== start') - process._rawDebug(workerData?.name || 'main', 'otherMessage', msg) if (msg.type === 'response') { const { id, res, err } = msg const inflight = inflights.get(id) @@ -211,23 +207,10 @@ function createThreadInterceptor (opts) { res.setAddress(url, roundRobinIndex, msg.address, forward) } } - process._rawDebug('==== end') }) } res.setAddress = (url, index, address, forward = true) => { - process._rawDebug(workerData?.name || 'main', 'setAddress', url, index, address, forward, new Error().stack) - process._rawDebug(workerData?.name || 'main', 'routes', Array.from(routes.entries()).map((e) => { - const key = e[0] - const value = e[1] - return { key, addresses: value.ports.map(p => p[kAddress]) } - })); - - /* - if (!address) { - throw new Error('address is required') - } - */ const port = routes.get(url)?.get(index) if (port) { @@ -240,7 +223,6 @@ function createThreadInterceptor (opts) { for (const [, roundRobin] of routes) { for (const otherPort of roundRobin) { - process._rawDebug(workerData?.name || 'main', 'sending address', address, 'to', otherPort.threadId, 'for', url) otherPort.postMessage({ type: 'address', url, index, address }) } } @@ -267,7 +249,6 @@ function wire ({ server: newServer, port, ...undiciOpts }) { replaceServer(newServer) function replaceServer (newServer) { - process._rawDebug(workerData?.name || 'main', 'replacing server', server, 'with', newServer) server = newServer if (typeof server === 'string') { @@ -278,8 +259,6 @@ function wire ({ server: newServer, port, ...undiciOpts }) { } function onMessage (msg) { - process._rawDebug('-----') - process._rawDebug(workerData?.name, 'received', msg) if (msg.type === 'request') { const { id, opts } = msg @@ -335,15 +314,10 @@ function wire ({ server: newServer, port, ...undiciOpts }) { return } - try { - if (hasInject) { - server.inject(injectOpts, onInject) - } else { - inject(server, injectOpts, onInject) - } - } catch (err) { - process._rawDebug(workerData?.name || 'main', 'error', err) - throw err + if (hasInject) { + server.inject(injectOpts, onInject) + } else { + inject(server, injectOpts, onInject) } }) } else if (msg.type === 'route') { @@ -352,7 +326,6 @@ function wire ({ server: newServer, port, ...undiciOpts }) { } else if (msg.type === 'address') { interceptor.setAddress(msg.url, msg.index, msg.address, false) } - process._rawDebug('-----') } port.on('message', onMessage) diff --git a/test/service-restarted.test.js b/test/service-restarted.test.js index cadce55..16169e0 100644 --- a/test/service-restarted.test.js +++ b/test/service-restarted.test.js @@ -44,14 +44,10 @@ test('service restart with network / 1', async (t) => { await res.body.dump() } - console.log('all successful request completed') - await rejects(request('http://myserver2.local/crash', { dispatcher: agent, })) - console.log('service crashed') - const worker2bis = new Worker(join(__dirname, 'fixtures', 'network-crash.js')) t.after(() => worker2bis.terminate()) @@ -59,8 +55,6 @@ test('service restart with network / 1', async (t) => { await sleep(2000) - console.log('calling worker1') - { const res = await request('http://myserver.local/example', { dispatcher: agent, @@ -70,8 +64,6 @@ test('service restart with network / 1', async (t) => { await res.body.dump() } - console.log('calling worker2bis') - { const res = await request('http://myserver2.local/example', { dispatcher: agent, @@ -84,15 +76,15 @@ test('service restart with network / 1', async (t) => { test.only('service restart with network / 2', async (t) => { const composer = new Worker(join(__dirname, 'fixtures', 'composer.js'), { - workerData: { name: 'composer'}, + workerData: { name: 'composer' }, }) t.after(() => composer.terminate()) const worker1 = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), { - workerData: { name: 'worker1'}, + workerData: { name: 'worker1' }, }) t.after(() => worker1.terminate()) const worker2 = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), { - workerData: { name: 'worker2'}, + workerData: { name: 'worker2' }, }) t.after(() => worker2.terminate()) @@ -107,7 +99,6 @@ test.only('service restart with network / 2', async (t) => { await sleep(1000) - /* { const res = await request('http://composer.local/s1/example', { dispatcher: agent, @@ -126,10 +117,7 @@ test.only('service restart with network / 2', async (t) => { await res.body.dump() } - console.log('===> all successful request completed') - */ - - { + { const res = await request('http://composer.local/s2/crash', { dispatcher: agent, }) @@ -137,10 +125,8 @@ test.only('service restart with network / 2', async (t) => { await res.body.dump() } - console.log('===> service crashed') - const worker2bis = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), { - workerData: { name: 'worker2bis'}, + workerData: { name: 'worker2bis' }, }) t.after(() => worker2bis.terminate()) @@ -148,8 +134,6 @@ test.only('service restart with network / 2', async (t) => { await sleep(2000) - console.log('calling worker1') - { const res = await request('http://composer.local/s1/example', { dispatcher: agent, @@ -159,8 +143,6 @@ test.only('service restart with network / 2', async (t) => { await res.body.dump() } - console.log('calling worker2bis') - { const res = await request('http://composer.local/s2/example', { dispatcher: agent, From f0055c79f25dc44d0ddd66b2379005773ee83840 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 25 Nov 2024 13:35:49 +0100 Subject: [PATCH 05/13] fixup Signed-off-by: Matteo Collina --- index.js | 10 +++++++++- test/fixtures/network-crash.js | 1 - 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/index.js b/index.js index 7c6258d..87e8944 100644 --- a/index.js +++ b/index.js @@ -42,6 +42,7 @@ function createThreadInterceptor (opts) { const port = roundRobin.next() if (port[kAddress]) { + return dispatch({ ...opts, origin: port[kAddress] }, handler) } @@ -201,10 +202,17 @@ function createThreadInterceptor (opts) { inflight(err, res) } } else if (msg.type === 'address') { - // TODO(mcollina): verify the else clause if (!msg.url) { const roundRobinIndex = roundRobin.findIndex(port) res.setAddress(url, roundRobinIndex, msg.address, forward) + } else { + const roundRobin = routes.get(msg.url) + if (!roundRobin) { + return + } + + const roundRobinIndex = roundRobin.findIndex(port) + res.setAddress(msg.url, roundRobinIndex, msg.address, false) } } }) diff --git a/test/fixtures/network-crash.js b/test/fixtures/network-crash.js index 8742cad..55d2792 100644 --- a/test/fixtures/network-crash.js +++ b/test/fixtures/network-crash.js @@ -17,6 +17,5 @@ app.get('/crash', async (request, reply) => { // TODO(mcollina): there is a race condition here const { replaceServer } = wire({ port: parentPort }) app.listen({ port: 0 }).then((url) => { - process._rawDebug('server started on', url) replaceServer(url) }) From c5701b0b8bf8e864e1c8abe20211eb7f8cf01f13 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 25 Nov 2024 13:36:30 +0100 Subject: [PATCH 06/13] fixup Signed-off-by: Matteo Collina --- index.js | 1 - 1 file changed, 1 deletion(-) diff --git a/index.js b/index.js index 87e8944..05ad20f 100644 --- a/index.js +++ b/index.js @@ -42,7 +42,6 @@ function createThreadInterceptor (opts) { const port = roundRobin.next() if (port[kAddress]) { - return dispatch({ ...opts, origin: port[kAddress] }, handler) } From cae0b856fcb8bc26cc321449a20f01a84c701eda Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 25 Nov 2024 13:36:53 +0100 Subject: [PATCH 07/13] fixup Signed-off-by: Matteo Collina --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f85d363..5a8ce70 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,7 +16,7 @@ jobs: strategy: matrix: - node-version: [18.x, 20.x, 20.x] + node-version: [18.x, 20.x, 22.x, 23.x] steps: - uses: actions/checkout@v3 From 22e4841c7f8462ea6cb715b2c4ba7c5d196c86ce Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 25 Nov 2024 13:39:39 +0100 Subject: [PATCH 08/13] fixup Signed-off-by: Matteo Collina --- index.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/index.js b/index.js index 05ad20f..ab9b76a 100644 --- a/index.js +++ b/index.js @@ -210,8 +210,7 @@ function createThreadInterceptor (opts) { return } - const roundRobinIndex = roundRobin.findIndex(port) - res.setAddress(msg.url, roundRobinIndex, msg.address, false) + res.setAddress(msg.url, msg.index, msg.address, false) } } }) From 14d818fcd99a22abca91272dca0b528b175f46a6 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 25 Nov 2024 13:43:49 +0100 Subject: [PATCH 09/13] concurrency to 1 Signed-off-by: Matteo Collina --- package.json | 2 +- test/service-restarted.test.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index e344a3a..2643346 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "description": "An Undici interceptor that routes requests over a worker thread", "main": "index.js", "scripts": { - "test": "eslint && borp --coverage --check-coverage" + "test": "eslint && borp --coverage --check-coverage --concurrency=1" }, "keywords": [ "undici", diff --git a/test/service-restarted.test.js b/test/service-restarted.test.js index 16169e0..b311ffd 100644 --- a/test/service-restarted.test.js +++ b/test/service-restarted.test.js @@ -74,7 +74,7 @@ test('service restart with network / 1', async (t) => { } }) -test.only('service restart with network / 2', async (t) => { +test('service restart with network / 2', async (t) => { const composer = new Worker(join(__dirname, 'fixtures', 'composer.js'), { workerData: { name: 'composer' }, }) From 4f8527eef6f0293374cba69c79949e5d770f1ec1 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 25 Nov 2024 15:24:32 +0100 Subject: [PATCH 10/13] fixup Signed-off-by: Matteo Collina --- index.js | 34 ++++++++++++++++++++++------------ lib/roundrobin.js | 8 ++------ 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/index.js b/index.js index ab9b76a..50f13fa 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,7 @@ 'use strict' const RoundRobin = require('./lib/roundrobin') +const { workerData } = require('worker_threads') const hyperid = require('hyperid') const { getGlobalDispatcher, setGlobalDispatcher } = require('undici') const { threadId, MessageChannel, parentPort } = require('worker_threads') @@ -153,8 +154,8 @@ function createThreadInterceptor (opts) { const { port1, port2 } = new MessageChannel() forwarded.get(otherPort).add(port2) forwarded.get(port).add(port1) - otherPort.postMessage({ type: 'route', url, port: port2 }, [port2]) - port.postMessage({ type: 'route', url: key, port: port1 }, [port1]) + otherPort.postMessage({ type: 'route', url, port: port2, threadId: port.threadId }, [port2]) + port.postMessage({ type: 'route', url: key, port: port1, threadId: otherPort.threadId }, [port1]) } } } @@ -166,8 +167,11 @@ function createThreadInterceptor (opts) { const roundRobin = routes.get(url) roundRobin.add(port) + // We must copy the threadId outsise because it can be nulled + // by Node.js + const threadId = port.threadId + function onClose () { - const roundRobinIndex = routes.get(url).findIndex(port) const roundRobin = routes.get(url) roundRobin.remove(port) for (const f of forwarded.get(port)) { @@ -182,7 +186,7 @@ function createThreadInterceptor (opts) { } // Notify other threads that any eventual network address for this route is no longer valid - res.setAddress(url, roundRobinIndex) + res.setAddress(url, threadId) } // If port is a worker, we need to remove it from the routes @@ -193,6 +197,7 @@ function createThreadInterceptor (opts) { const inflights = new Map() portInflights.set(port, inflights) port.on('message', (msg) => { + process._rawDebug(workerData?.main || 'main', 'portOnMessage', url, msg) if (msg.type === 'response') { const { id, res, err } = msg const inflight = inflights.get(id) @@ -202,22 +207,21 @@ function createThreadInterceptor (opts) { } } else if (msg.type === 'address') { if (!msg.url) { - const roundRobinIndex = roundRobin.findIndex(port) - res.setAddress(url, roundRobinIndex, msg.address, forward) + res.setAddress(url, port.threadId, msg.address, forward) } else { const roundRobin = routes.get(msg.url) if (!roundRobin) { return } - res.setAddress(msg.url, msg.index, msg.address, false) + res.setAddress(msg.url, msg.threadId, msg.address, false) } } }) } - res.setAddress = (url, index, address, forward = true) => { - const port = routes.get(url)?.get(index) + res.setAddress = (url, threadId, address, forward = true) => { + const port = routes.get(url)?.findByThreadId(threadId) if (port) { port[kAddress] = address @@ -229,7 +233,10 @@ function createThreadInterceptor (opts) { for (const [, roundRobin] of routes) { for (const otherPort of roundRobin) { - otherPort.postMessage({ type: 'address', url, index, address }) + // Avoid loops, do not send the message to the source + if (otherPort.threadId !== threadId) { + otherPort.postMessage({ type: 'address', url, address, threadId }) + } } } } @@ -258,13 +265,15 @@ function wire ({ server: newServer, port, ...undiciOpts }) { server = newServer if (typeof server === 'string') { - parentPort.postMessage({ type: 'address', address: server }) + process._rawDebug(workerData?.name || 'main', 'replaceServer', server) + parentPort.postMessage({ type: 'address', address: server, threadId }) } else { hasInject = typeof server?.inject === 'function' } } function onMessage (msg) { + process._rawDebug(workerData?.name || 'main', msg) if (msg.type === 'request') { const { id, opts } = msg @@ -327,10 +336,11 @@ function wire ({ server: newServer, port, ...undiciOpts }) { } }) } else if (msg.type === 'route') { + msg.port.threadId = msg.threadId interceptor.route(msg.url, msg.port, false) msg.port.on('message', onMessage) } else if (msg.type === 'address') { - interceptor.setAddress(msg.url, msg.index, msg.address, false) + interceptor.setAddress(msg.url, msg.threadId, msg.address, false) } } diff --git a/lib/roundrobin.js b/lib/roundrobin.js index 5923ea8..87dc482 100644 --- a/lib/roundrobin.js +++ b/lib/roundrobin.js @@ -17,8 +17,8 @@ class RoundRobin { return this.ports.length - 1 } - findIndex (port) { - return this.ports.indexOf(port) + findByThreadId (threadId) { + return this.ports.find((p) => p.threadId === threadId) } remove (port) { @@ -34,10 +34,6 @@ class RoundRobin { this.index = this.index % this.ports.length } - get (index) { - return this.ports[index] - } - get length () { return this.ports.length } From ff470366e2d963528852665bec8310e7bc982058 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 25 Nov 2024 15:29:27 +0100 Subject: [PATCH 11/13] Apply suggestions from code review Co-authored-by: Ivan Tymoshenko --- index.js | 3 --- 1 file changed, 3 deletions(-) diff --git a/index.js b/index.js index 50f13fa..2a67666 100644 --- a/index.js +++ b/index.js @@ -197,7 +197,6 @@ function createThreadInterceptor (opts) { const inflights = new Map() portInflights.set(port, inflights) port.on('message', (msg) => { - process._rawDebug(workerData?.main || 'main', 'portOnMessage', url, msg) if (msg.type === 'response') { const { id, res, err } = msg const inflight = inflights.get(id) @@ -265,7 +264,6 @@ function wire ({ server: newServer, port, ...undiciOpts }) { server = newServer if (typeof server === 'string') { - process._rawDebug(workerData?.name || 'main', 'replaceServer', server) parentPort.postMessage({ type: 'address', address: server, threadId }) } else { hasInject = typeof server?.inject === 'function' @@ -273,7 +271,6 @@ function wire ({ server: newServer, port, ...undiciOpts }) { } function onMessage (msg) { - process._rawDebug(workerData?.name || 'main', msg) if (msg.type === 'request') { const { id, opts } = msg From ee908ad67628e06a84ac970c9d563737af9943d9 Mon Sep 17 00:00:00 2001 From: Ivan Tymoshenko Date: Mon, 25 Nov 2024 15:31:19 +0100 Subject: [PATCH 12/13] Update index.js --- index.js | 1 - 1 file changed, 1 deletion(-) diff --git a/index.js b/index.js index 2a67666..0c9b825 100644 --- a/index.js +++ b/index.js @@ -1,7 +1,6 @@ 'use strict' const RoundRobin = require('./lib/roundrobin') -const { workerData } = require('worker_threads') const hyperid = require('hyperid') const { getGlobalDispatcher, setGlobalDispatcher } = require('undici') const { threadId, MessageChannel, parentPort } = require('worker_threads') From 20b3cb078944b5d2422ba5bd28e8820b8a88ea79 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 25 Nov 2024 15:35:44 +0100 Subject: [PATCH 13/13] concurrent testing Signed-off-by: Matteo Collina --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 2643346..e344a3a 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "description": "An Undici interceptor that routes requests over a worker thread", "main": "index.js", "scripts": { - "test": "eslint && borp --coverage --check-coverage --concurrency=1" + "test": "eslint && borp --coverage --check-coverage" }, "keywords": [ "undici",