Skip to content

Commit

Permalink
Crash and errors (#34)
Browse files Browse the repository at this point in the history
* removed useless file

Signed-off-by: Matteo Collina <[email protected]>

* first part of a fix

Signed-off-by: Matteo Collina <[email protected]>

* working but ugly

Signed-off-by: Matteo Collina <[email protected]>

* pretty

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* concurrency to 1

Signed-off-by: Matteo Collina <[email protected]>

* fixup

Signed-off-by: Matteo Collina <[email protected]>

* Apply suggestions from code review

Co-authored-by: Ivan Tymoshenko <[email protected]>

* Update index.js

* concurrent testing

Signed-off-by: Matteo Collina <[email protected]>

---------

Signed-off-by: Matteo Collina <[email protected]>
Co-authored-by: Ivan Tymoshenko <[email protected]>
  • Loading branch information
mcollina and ivan-tymoshenko authored Nov 25, 2024
1 parent ee7ac18 commit f311a85
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:

strategy:
matrix:
node-version: [18.x, 20.x, 20.x]
node-version: [18.x, 20.x, 22.x, 23.x]
steps:
- uses: actions/checkout@v3

Expand Down
38 changes: 28 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ function createThreadInterceptor (opts) {
const { port1, port2 } = new MessageChannel()
forwarded.get(otherPort).add(port2)
forwarded.get(port).add(port1)
otherPort.postMessage({ type: 'route', url, port: port2 }, [port2])
port.postMessage({ type: 'route', url: key, port: port1 }, [port1])
otherPort.postMessage({ type: 'route', url, port: port2, threadId: port.threadId }, [port2])
port.postMessage({ type: 'route', url: key, port: port1, threadId: otherPort.threadId }, [port1])
}
}
}
Expand All @@ -163,7 +163,12 @@ function createThreadInterceptor (opts) {
routes.set(url, new RoundRobin())
}

const roundRobinIndex = routes.get(url).add(port)
const roundRobin = routes.get(url)
roundRobin.add(port)

// We must copy the threadId outsise because it can be nulled
// by Node.js
const threadId = port.threadId

function onClose () {
const roundRobin = routes.get(url)
Expand All @@ -180,7 +185,7 @@ function createThreadInterceptor (opts) {
}

// Notify other threads that any eventual network address for this route is no longer valid
res.setAddress(url, roundRobinIndex)
res.setAddress(url, threadId)
}

// If port is a worker, we need to remove it from the routes
Expand All @@ -199,13 +204,22 @@ function createThreadInterceptor (opts) {
inflight(err, res)
}
} else if (msg.type === 'address') {
res.setAddress(url, roundRobinIndex, msg.address, forward)
if (!msg.url) {
res.setAddress(url, port.threadId, msg.address, forward)
} else {
const roundRobin = routes.get(msg.url)
if (!roundRobin) {
return
}

res.setAddress(msg.url, msg.threadId, msg.address, false)
}
}
})
}

res.setAddress = (url, index, address, forward = true) => {
const port = routes.get(url)?.get(index)
res.setAddress = (url, threadId, address, forward = true) => {
const port = routes.get(url)?.findByThreadId(threadId)

if (port) {
port[kAddress] = address
Expand All @@ -217,7 +231,10 @@ function createThreadInterceptor (opts) {

for (const [, roundRobin] of routes) {
for (const otherPort of roundRobin) {
otherPort.postMessage({ type: 'address', url, index, address })
// Avoid loops, do not send the message to the source
if (otherPort.threadId !== threadId) {
otherPort.postMessage({ type: 'address', url, address, threadId })
}
}
}
}
Expand Down Expand Up @@ -246,7 +263,7 @@ function wire ({ server: newServer, port, ...undiciOpts }) {
server = newServer

if (typeof server === 'string') {
parentPort.postMessage({ type: 'address', address: server })
parentPort.postMessage({ type: 'address', address: server, threadId })
} else {
hasInject = typeof server?.inject === 'function'
}
Expand Down Expand Up @@ -315,10 +332,11 @@ function wire ({ server: newServer, port, ...undiciOpts }) {
}
})
} else if (msg.type === 'route') {
msg.port.threadId = msg.threadId
interceptor.route(msg.url, msg.port, false)
msg.port.on('message', onMessage)
} else if (msg.type === 'address') {
interceptor.setAddress(msg.url, msg.index, msg.address, false)
interceptor.setAddress(msg.url, msg.threadId, msg.address, false)
}
}

Expand Down
8 changes: 4 additions & 4 deletions lib/roundrobin.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class RoundRobin {
return this.ports.length - 1
}

findByThreadId (threadId) {
return this.ports.find((p) => p.threadId === threadId)
}

remove (port) {
const index = this.ports.indexOf(port)
if (index === -1) {
Expand All @@ -30,10 +34,6 @@ class RoundRobin {
this.index = this.index % this.ports.length
}

get (index) {
return this.ports[index]
}

get length () {
return this.ports.length
}
Expand Down
Binary file removed test.ttf
Binary file not shown.
30 changes: 30 additions & 0 deletions test/fixtures/composer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict'

const { parentPort } = require('worker_threads')
const fastify = require('fastify')
const { wire } = require('../../')
const { request } = require('undici')

const app = fastify()

wire({ server: app, port: parentPort, domain: '.local' })

app.get('/s1/example', async (req, reply) => {
const { body } = await request('http://myserver.local/example')
return await body.json()
})

app.get('/s2/example', async (req, reply) => {
const { body } = await request('http://myserver2.local/example')
return await body.json()
})

app.get('/s1/crash', async (req, reply) => {
const { body } = await request('http://myserver.local/crash')
return await body.json()
})

app.get('/s2/crash', async (req, reply) => {
const { body } = await request('http://myserver2.local/crash')
return await body.json()
})
21 changes: 21 additions & 0 deletions test/fixtures/network-crash.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
'use strict'

const { parentPort } = require('worker_threads')
const fastify = require('fastify')
const { wire } = require('../../')

const app = fastify()

app.get('/example', async (request, reply) => {
return { hello: 'world' }
})

app.get('/crash', async (request, reply) => {
process.exit(1)
})

// TODO(mcollina): there is a race condition here
const { replaceServer } = wire({ port: parentPort })
app.listen({ port: 0 }).then((url) => {
replaceServer(url)
})
154 changes: 154 additions & 0 deletions test/service-restarted.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
'use strict'

const { test } = require('node:test')
const { strictEqual, rejects } = require('node:assert')
const { join } = require('node:path')
const { Worker } = require('node:worker_threads')
const { setTimeout: sleep } = require('node:timers/promises')
const { createThreadInterceptor } = require('../')
const { Agent, request } = require('undici')

test('service restart with network / 1', async (t) => {
const worker1 = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), {
workerData: { message: 'mesh' },
})
t.after(() => worker1.terminate())
const worker2 = new Worker(join(__dirname, 'fixtures', 'network-crash.js'))
t.after(() => worker2.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker1)
interceptor.route('myserver2', worker2)

const agent = new Agent().compose(interceptor)

await sleep(1000)

{
const res = await request('http://myserver.local/example', {
dispatcher: agent,
})

strictEqual(res.statusCode, 200)
await res.body.dump()
}

{
const res = await request('http://myserver2.local/example', {
dispatcher: agent,
})

strictEqual(res.statusCode, 200)
await res.body.dump()
}

await rejects(request('http://myserver2.local/crash', {
dispatcher: agent,
}))

const worker2bis = new Worker(join(__dirname, 'fixtures', 'network-crash.js'))
t.after(() => worker2bis.terminate())

interceptor.route('myserver2', worker2bis)

await sleep(2000)

{
const res = await request('http://myserver.local/example', {
dispatcher: agent,
})

strictEqual(res.statusCode, 200)
await res.body.dump()
}

{
const res = await request('http://myserver2.local/example', {
dispatcher: agent,
})

strictEqual(res.statusCode, 200)
await res.body.dump()
}
})

test('service restart with network / 2', async (t) => {
const composer = new Worker(join(__dirname, 'fixtures', 'composer.js'), {
workerData: { name: 'composer' },
})
t.after(() => composer.terminate())
const worker1 = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), {
workerData: { name: 'worker1' },
})
t.after(() => worker1.terminate())
const worker2 = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), {
workerData: { name: 'worker2' },
})
t.after(() => worker2.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('composer', composer)
interceptor.route('myserver', worker1)
interceptor.route('myserver2', worker2)

const agent = new Agent().compose(interceptor)

await sleep(1000)

{
const res = await request('http://composer.local/s1/example', {
dispatcher: agent,
})

strictEqual(res.statusCode, 200)
await res.body.dump()
}

{
const res = await request('http://composer.local/s2/example', {
dispatcher: agent,
})

strictEqual(res.statusCode, 200)
await res.body.dump()
}

{
const res = await request('http://composer.local/s2/crash', {
dispatcher: agent,
})
strictEqual(res.statusCode, 500)
await res.body.dump()
}

const worker2bis = new Worker(join(__dirname, 'fixtures', 'network-crash.js'), {
workerData: { name: 'worker2bis' },
})
t.after(() => worker2bis.terminate())

interceptor.route('myserver2', worker2bis)

await sleep(2000)

{
const res = await request('http://composer.local/s1/example', {
dispatcher: agent,
})

strictEqual(res.statusCode, 200)
await res.body.dump()
}

{
const res = await request('http://composer.local/s2/example', {
dispatcher: agent,
})

strictEqual(res.statusCode, 200)
await res.body.dump()
}
})

0 comments on commit f311a85

Please sign in to comment.