diff --git a/index.js b/index.js index 483c564..ef196fa 100644 --- a/index.js +++ b/index.js @@ -163,9 +163,11 @@ function createThreadInterceptor (opts) { routes.set(url, new RoundRobin()) } - const roundRobinIndex = routes.get(url).add(port) + const roundRobin = routes.get(url) + roundRobin.add(port) function onClose () { + const roundRobinIndex = routes.get(url).findIndex(port) const roundRobin = routes.get(url) roundRobin.remove(port) for (const f of forwarded.get(port)) { @@ -199,12 +201,18 @@ function createThreadInterceptor (opts) { inflight(err, res) } } else if (msg.type === 'address') { + const roundRobinIndex = roundRobin.findIndex(port) res.setAddress(url, roundRobinIndex, msg.address, forward) } }) } res.setAddress = (url, index, address, forward = true) => { + /* + if (!address) { + throw new Error('address is required') + } + */ const port = routes.get(url)?.get(index) if (port) { @@ -217,6 +225,7 @@ function createThreadInterceptor (opts) { for (const [, roundRobin] of routes) { for (const otherPort of roundRobin) { + process._rawDebug('sending address', address, 'to', otherPort.threadId, 'for', url) otherPort.postMessage({ type: 'address', url, index, address }) } } diff --git a/lib/roundrobin.js b/lib/roundrobin.js index 741fa9b..5923ea8 100644 --- a/lib/roundrobin.js +++ b/lib/roundrobin.js @@ -17,6 +17,10 @@ class RoundRobin { return this.ports.length - 1 } + findIndex (port) { + return this.ports.indexOf(port) + } + remove (port) { const index = this.ports.indexOf(port) if (index === -1) { diff --git a/test/fixtures/network-crash.js b/test/fixtures/network-crash.js new file mode 100644 index 0000000..8742cad --- /dev/null +++ b/test/fixtures/network-crash.js @@ -0,0 +1,22 @@ +'use strict' + +const { parentPort } = require('worker_threads') +const fastify = require('fastify') +const { wire } = require('../../') + +const app = fastify() + +app.get('/example', async (request, reply) => { + return { hello: 'world' } +}) + +app.get('/crash', async (request, reply) => { + process.exit(1) +}) + +// TODO(mcollina): there is a race condition here +const { replaceServer } = wire({ port: parentPort }) +app.listen({ port: 0 }).then((url) => { + process._rawDebug('server started on', url) + replaceServer(url) +}) diff --git a/test/service-restarted.test.js b/test/service-restarted.test.js new file mode 100644 index 0000000..3f60d23 --- /dev/null +++ b/test/service-restarted.test.js @@ -0,0 +1,83 @@ +'use strict' + +const { test } = require('node:test') +const { strictEqual, rejects } = require('node:assert') +const { join } = require('node:path') +const { Worker } = require('node:worker_threads') +const { setTimeout: sleep } = require('node:timers/promises') +const { createThreadInterceptor } = require('../') +const { Agent, request } = require('undici') + +test('two service in a mesh, one is terminated with an inflight message', async (t) => { + const worker1 = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), { + workerData: { message: 'mesh' }, + }) + t.after(() => worker1.terminate()) + const worker2 = new Worker(join(__dirname, 'fixtures', 'network-crash.js')) + t.after(() => worker2.terminate()) + + const interceptor = createThreadInterceptor({ + domain: '.local', + }) + interceptor.route('myserver', worker1) + interceptor.route('myserver2', worker2) + + const agent = new Agent().compose(interceptor) + + await sleep(1000) + + { + const res = await request('http://myserver.local/example', { + dispatcher: agent, + }) + + strictEqual(res.statusCode, 200) + await res.body.dump() + } + + { + const res = await request('http://myserver2.local/example', { + dispatcher: agent, + }) + + strictEqual(res.statusCode, 200) + await res.body.dump() + } + + console.log('all successful request completed') + + await rejects(request('http://myserver2.local/crash', { + dispatcher: agent, + })) + + console.log('service crashed') + + const worker2bis = new Worker(join(__dirname, 'fixtures', 'network-crash.js')) + t.after(() => worker2bis.terminate()) + + interceptor.route('myserver2', worker2bis) + + await sleep(2000) + + console.log('calling worker1') + + { + const res = await request('http://myserver.local/example', { + dispatcher: agent, + }) + + strictEqual(res.statusCode, 200) + await res.body.dump() + } + + console.log('calling worker2bis') + + { + const res = await request('http://myserver2.local/example', { + dispatcher: agent, + }) + + strictEqual(res.statusCode, 200) + await res.body.dump() + } +})