Skip to content

Commit

Permalink
fix: AsyncLocalStorage context (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-tymoshenko authored Dec 19, 2024
1 parent 980334b commit de3328c
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 2 deletions.
5 changes: 3 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'

const { AsyncResource } = require('node:async_hooks')
const RoundRobin = require('./lib/roundrobin')
const hyperid = require('hyperid')
const { getGlobalDispatcher, setGlobalDispatcher } = require('undici')
Expand Down Expand Up @@ -94,7 +95,7 @@ function createThreadInterceptor (opts) {
}, timeout)
}

inflights.set(id, (err, res) => {
inflights.set(id, AsyncResource.bind((err, res) => {
clearTimeout(handle)

if (err) {
Expand Down Expand Up @@ -130,7 +131,7 @@ function createThreadInterceptor (opts) {

handler.onResponseData(controller, res.rawPayload)
handler.onResponseEnd(controller, [])
})
}))

return true
}
Expand Down
70 changes: 70 additions & 0 deletions test/async-local-storage.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
'use strict'

const { strictEqual } = require('node:assert')
const { AsyncLocalStorage } = require('node:async_hooks')
const { test } = require('node:test')
const { join } = require('node:path')
const { Worker } = require('node:worker_threads')
const { Agent, request } = require('undici')
const { createThreadInterceptor } = require('../')

const createTestInterceptor = (interceptorOpts) => {
return dispatch => {
return function InterceptedDispatch (opts, handler) {
const onRequestStart = handler.onRequestStart.bind(handler)
const onResponseStart = handler.onResponseStart.bind(handler)

handler.onRequestStart = (...args) => {
interceptorOpts.onRequestStart()
return onRequestStart(...args)
}

handler.onResponseStart = (...args) => {
interceptorOpts.onResponseStart()
return onResponseStart(...args)
}

return dispatch(opts, handler)
}
}
}

test('basic', async (t) => {
const worker = new Worker(join(__dirname, 'fixtures', 'worker1.js'))
t.after(() => worker.terminate())

const interceptor = createThreadInterceptor({
domain: '.local',
})
interceptor.route('myserver', worker)

const as = new AsyncLocalStorage()

let onRequestContextId = null
let onResponseContextId = null

const testInterceptor = createTestInterceptor({
onRequestStart () {
onRequestContextId = as.getStore()
},
onResponseStart () {
onResponseContextId = as.getStore()
},
})

const agent = new Agent().compose([
interceptor,
testInterceptor,
])

const contextId = 42
await as.run(contextId, async () => {
const { statusCode } = await request('http://myserver.local', {
dispatcher: agent,
})
strictEqual(statusCode, 200)
})

strictEqual(onRequestContextId, contextId)
strictEqual(onResponseContextId, contextId)
})

0 comments on commit de3328c

Please sign in to comment.