Skip to content

Commit

Permalink
Support retry from main
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Collina <[email protected]>
  • Loading branch information
mcollina committed Sep 19, 2024
1 parent 7568f28 commit 052a4fd
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 4 deletions.
14 changes: 12 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,19 @@ function createThreadInterceptor (opts) {
headers.push(value)
}
}

let aborted = false
handler.onConnect((err) => {
if (err) {
handler.onError(err)
}
aborted = true
}, {})
handler.onHeaders(res.statusCode, headers, () => {}, res.statusMessage)
handler.onData(res.rawPayload)
handler.onComplete([])
if (!aborted) {
handler.onData(res.rawPayload)
handler.onComplete([])
}
})

return true
Expand Down
3 changes: 3 additions & 0 deletions test/fixtures/worker1.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ app.get('/', (req, reply) => {
})

app.get('/whoami', (req, reply) => {
if (workerData?.whoamiReturn503) {
return reply.code(503).send({ threadId })
}
reply.send({ threadId })
})

Expand Down
41 changes: 39 additions & 2 deletions test/round-robin.test.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
'use strict'

const { test } = require('node:test')
const { deepStrictEqual, rejects } = require('node:assert')
const { deepStrictEqual, rejects, strictEqual } = require('node:assert')
const { join } = require('path')
const { Worker } = require('worker_threads')
const { createThreadInterceptor } = require('../')
const { Agent, request } = require('undici')
const { Agent, request, interceptors } = require('undici')
const { once } = require('events')
const RoundRobin = require('../lib/roundrobin')

Expand Down Expand Up @@ -177,3 +177,40 @@ test('RoundRobin remove unknown port', () => {
const rr = new RoundRobin()
rr.remove({})
})

test('503 status code re tries it', async (t) => {
const worker1 = new Worker(join(__dirname, 'fixtures', 'worker1.js'), {
workerData: {
message: 'mesh',
whoamiReturn503: true,
},
})
t.after(() => worker1.terminate())
const worker2 = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker2.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', [worker1, worker2])

const agent = new Agent().compose(interceptor, interceptors.retry())

{
const { body, statusCode } = await request('http://myserver.local/whoami', {
dispatcher: agent,
})

strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { threadId: worker2.threadId })
}

{
const { body, statusCode } = await request('http://myserver.local/whoami', {
dispatcher: agent,
})

strictEqual(statusCode, 200)
deepStrictEqual(await body.json(), { threadId: worker2.threadId })
}
})

0 comments on commit 052a4fd

Please sign in to comment.