Skip to content

Commit

Permalink
first part of a fix
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Collina <[email protected]>
  • Loading branch information
mcollina committed Nov 25, 2024
1 parent c784506 commit 6eb3ecb
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 1 deletion.
11 changes: 10 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,11 @@ function createThreadInterceptor (opts) {
routes.set(url, new RoundRobin())
}

const roundRobinIndex = routes.get(url).add(port)
const roundRobin = routes.get(url)
roundRobin.add(port)

function onClose () {
const roundRobinIndex = routes.get(url).findIndex(port)
const roundRobin = routes.get(url)
roundRobin.remove(port)
for (const f of forwarded.get(port)) {
Expand Down Expand Up @@ -199,12 +201,18 @@ function createThreadInterceptor (opts) {
inflight(err, res)
}
} else if (msg.type === 'address') {
const roundRobinIndex = roundRobin.findIndex(port)
res.setAddress(url, roundRobinIndex, msg.address, forward)
}
})
}

res.setAddress = (url, index, address, forward = true) => {
/*
if (!address) {
throw new Error('address is required')
}
*/
const port = routes.get(url)?.get(index)

if (port) {
Expand All @@ -217,6 +225,7 @@ function createThreadInterceptor (opts) {

for (const [, roundRobin] of routes) {
for (const otherPort of roundRobin) {
process._rawDebug('sending address', address, 'to', otherPort.threadId, 'for', url)
otherPort.postMessage({ type: 'address', url, index, address })
}
}
Expand Down
4 changes: 4 additions & 0 deletions lib/roundrobin.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class RoundRobin {
return this.ports.length - 1
}

findIndex (port) {
return this.ports.indexOf(port)
}

remove (port) {
const index = this.ports.indexOf(port)
if (index === -1) {
Expand Down
22 changes: 22 additions & 0 deletions test/fixtures/network-crash.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict'

const { parentPort } = require('worker_threads')
const fastify = require('fastify')
const { wire } = require('../../')

const app = fastify()

app.get('/example', async (request, reply) => {
return { hello: 'world' }
})

app.get('/crash', async (request, reply) => {
process.exit(1)
})

// TODO(mcollina): there is a race condition here
const { replaceServer } = wire({ port: parentPort })
app.listen({ port: 0 }).then((url) => {
process._rawDebug('server started on', url)
replaceServer(url)
})
83 changes: 83 additions & 0 deletions test/service-restarted.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
'use strict'

const { test } = require('node:test')
const { strictEqual, rejects } = require('node:assert')
const { join } = require('node:path')
const { Worker } = require('node:worker_threads')
const { setTimeout: sleep } = require('node:timers/promises')
const { createThreadInterceptor } = require('../')
const { Agent, request } = require('undici')

test('two service in a mesh, one is terminated with an inflight message', async (t) => {
const worker1 = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), {
workerData: { message: 'mesh' },
})
t.after(() => worker1.terminate())
const worker2 = new Worker(join(__dirname, 'fixtures', 'network-crash.js'))
t.after(() => worker2.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker1)
interceptor.route('myserver2', worker2)

const agent = new Agent().compose(interceptor)

await sleep(1000)

{
const res = await request('http://myserver.local/example', {
dispatcher: agent,
})

strictEqual(res.statusCode, 200)
await res.body.dump()
}

{
const res = await request('http://myserver2.local/example', {
dispatcher: agent,
})

strictEqual(res.statusCode, 200)
await res.body.dump()
}

console.log('all successful request completed')

await rejects(request('http://myserver2.local/crash', {
dispatcher: agent,
}))

console.log('service crashed')

const worker2bis = new Worker(join(__dirname, 'fixtures', 'network-crash.js'))
t.after(() => worker2bis.terminate())

interceptor.route('myserver2', worker2bis)

await sleep(2000)

console.log('calling worker1')

{
const res = await request('http://myserver.local/example', {
dispatcher: agent,
})

strictEqual(res.statusCode, 200)
await res.body.dump()
}

console.log('calling worker2bis')

{
const res = await request('http://myserver2.local/example', {
dispatcher: agent,
})

strictEqual(res.statusCode, 200)
await res.body.dump()
}
})

0 comments on commit 6eb3ecb

Please sign in to comment.