From 10bd05ff434361d0cd06bd3487ac5e7daeb6e6db Mon Sep 17 00:00:00 2001 From: Crystal Magloire Date: Mon, 5 Feb 2024 15:02:01 -0500 Subject: [PATCH 1/7] Diagnostics channel support --- docs/docs/api/DiagnosticsChannel.md | 57 +++++++ lib/core/diagnostics.js | 79 ++++++++- lib/web/fetch/index.js | 179 +++++++++++++++++++- test/node-test/debug.js | 5 +- test/node-test/diagnostics-channel/fetch.js | 106 ++++++++++++ 5 files changed, 420 insertions(+), 6 deletions(-) create mode 100644 test/node-test/diagnostics-channel/fetch.js diff --git a/docs/docs/api/DiagnosticsChannel.md b/docs/docs/api/DiagnosticsChannel.md index 099c072f6c6..9fae2c40c77 100644 --- a/docs/docs/api/DiagnosticsChannel.md +++ b/docs/docs/api/DiagnosticsChannel.md @@ -202,3 +202,60 @@ diagnosticsChannel.channel('undici:websocket:pong').subscribe(({ payload }) => { console.log(payload) }) ``` +The below channels collectively act as [`tracingChannel.tracePromise`](https://nodejs.org/api/diagnostics_channel.html#tracingchanneltracepromisefn-context-thisarg-args) on `fetch`. So all of them will publish the arguments passed to `fetch`. + +## `tracing:undici:fetch:start` + +This message is published when `fetch` is called, and will publish the arguments passed to `fetch`. + +```js +import diagnosticsChannel from 'diagnostics_channel' +diagnosticsChannel.channel('tracing:undici:fetch:start').subscribe(({ input, init }) => { + console.log('input', input) + console.log('init', init) +}) +``` + +## `tracing:undici:fetch:end` + +This message is published at the end of `fetch`'s execution, and will publish any `error` from the synchronous part of `fetch`. Since `fetch` is asynchronous, this should be empty. This channel will publish the same values as `undici:fetch:start`, but we are including it to track when `fetch` finishes execution and to be consistent with [`TracingChannel`](https://nodejs.org/api/diagnostics_channel.html#class-tracingchannel). +```js +import diagnosticsChannel from 'diagnostics_channel' +diagnosticsChannel.channel('tracing:undici:fetch:end').subscribe(({ input, init, error }) => { + console.log('input', input) + console.log('init', init) + console.log('error', error) // should be empty +}) +``` +## `tracing:undici:fetch:asyncStart` +This message is published after `fetch` resolves or rejects. If `fetch` resolves, it publishes the response in `result`. If it rejects, it publishes the error in `error`. +```js +import diagnosticsChannel from 'diagnostics_channel' +diagnosticsChannel.channel('tracing:undici:fetch:asyncStart').subscribe(({ input, init, result, error }) => { + console.log('input', input) + console.log('init', init) + console.log('response', result) + console.log('error', error) +}) +``` +## `tracing:undici:fetch:asyncEnd` +This channel gets published the same values as and at the same time as `tracing:undici:fetch:asyncStart` in the case of [`tracingChannel.tracePromise`](https://nodejs.org/api/diagnostics_channel.html#tracingchanneltracepromisefn-context-thisarg-args) +```js +import diagnosticsChannel from 'diagnostics_channel' +diagnosticsChannel.channel('tracing:undici:fetch:asyncEnd').subscribe(({ input, init, result, error }) => { + console.log('input', input) + console.log('init', init) + console.log('response', result) + console.log('error', error) +}) +``` +## `tracing:undici:fetch:error` +This message is published when an error is thrown or promise rejects while calling `fetch`. +```js +import diagnosticsChannel from 'diagnostics_channel' +diagnosticsChannel.channel('tracing:undici:fetch:error').subscribe(({ input, init, error }) => { + console.log('input', input) + console.log('init', init) + console.log('error', error) +}) +``` \ No newline at end of file diff --git a/lib/core/diagnostics.js b/lib/core/diagnostics.js index e1af3db6112..1f7ff8e8995 100644 --- a/lib/core/diagnostics.js +++ b/lib/core/diagnostics.js @@ -6,6 +6,12 @@ const undiciDebugLog = util.debuglog('undici') const fetchDebuglog = util.debuglog('fetch') const websocketDebuglog = util.debuglog('websocket') let isClientSet = false +let tracingChannel + +if (diagnosticsChannel.tracingChannel) { + tracingChannel = diagnosticsChannel.tracingChannel('undici:fetch') +} + const channels = { // Client beforeConnect: diagnosticsChannel.channel('undici:client:beforeConnect'), @@ -23,7 +29,9 @@ const channels = { close: diagnosticsChannel.channel('undici:websocket:close'), socketError: diagnosticsChannel.channel('undici:websocket:socket_error'), ping: diagnosticsChannel.channel('undici:websocket:ping'), - pong: diagnosticsChannel.channel('undici:websocket:pong') + pong: diagnosticsChannel.channel('undici:websocket:pong'), + // Fetch channels + tracingChannel } if (undiciDebugLog.enabled || fetchDebuglog.enabled) { @@ -114,6 +122,75 @@ if (undiciDebugLog.enabled || fetchDebuglog.enabled) { isClientSet = true } +// Track fetch requests +if (fetchDebuglog.enabled && diagnosticsChannel.tracingChannel) { + const debuglog = fetchDebuglog + + tracingChannel.start.subscribe(evt => { + const { + input + } = evt + debuglog( + 'fetch has started request to %s', + input + ) + }) + + tracingChannel.end.subscribe(evt => { + const { + input + } = evt + debuglog( + 'fetch has received response from %s', + input + ) + }) + + tracingChannel.asyncStart.subscribe(evt => { + const { + input, + result, + error + } = evt + if (result && error) { + debuglog( + 'fetch has received response for %s - HTTP %d, error is %s', + input, + result.status, + error.message + ) + } else if (result) { + debuglog( + 'fetch has received response for %s - HTTP %d', + input, + result.status + ) + } else if (error) { + debuglog( + 'fetch has errored for %s - %s', + input, + error.message + ) + } else { + debuglog( + 'fetch has started request to %s', + input + ) + } + }) + + tracingChannel.error.subscribe(evt => { + const { + error + } = evt + debuglog( + 'fetch error event received response %s', + error.message + ) + }) + isClientSet = true +} + if (websocketDebuglog.enabled) { if (!isClientSet) { const debuglog = undiciDebugLog.enabled ? undiciDebugLog : websocketDebuglog diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index 5fbef495ad6..8de57461b7c 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -70,6 +70,8 @@ const defaultUserAgent = typeof __UNDICI_IS_NODE__ !== 'undefined' || typeof esb ? 'node' : 'undici' +const channels = require('../../core/diagnostics.js').channels.tracingChannel + /** @type {import('buffer').resolveObjectURL} */ let resolveObjectURL @@ -120,6 +122,7 @@ class Fetch extends EE { } } +<<<<<<< HEAD function handleFetchDone (response) { finalizeAndReportTiming(response, 'fetch') } @@ -130,6 +133,68 @@ function fetch (input, init = undefined) { // 1. Let p be a new promise. let p = createDeferredPromise() +======= +// This will publish all diagnostic events only when we have subscribers. +function ifSubscribersRunStores (request, input, init, callback) { + const hasSubscribers = subscribersCheck() + + if (hasSubscribers) { + const context = { request, input, init, result: null, error: null } + + return channels.start.runStores(context, () => { + try { + return callback(createInstrumentedDeferredPromise(context)) + } catch (e) { + context.error = e + channels.error.publish(context) + throw e + } finally { + channels.end.publish(context) + } + }) + } else { + return callback(createDeferredPromise()) + } +} + +// subscribersCheck will be called at the beginning of the fetch call +// and will check if we have subscribers +function subscribersCheck () { + return channels && (channels.start.hasSubscribers || + channels.end.hasSubscribers || + channels.asyncStart.hasSubscribers || + channels.asyncEnd.hasSubscribers || + channels.error.hasSubscribers) +} + +function createInstrumentedDeferredPromise (context) { + let res + let rej + const promise = new Promise((resolve, reject) => { + res = function (result) { + context.result = result + channels.asyncStart.runStores(context, () => { + resolve.apply(this, arguments) + channels.asyncEnd.publish(context) + }) + } + rej = function (error) { + context.error = error + channels.error.publish(context) + channels.asyncStart.runStores(context, () => { + reject.apply(this, arguments) + channels.asyncEnd.publish(context) + }) + } + }) + + return { promise, resolve: res, reject: rej } +} + +// https://fetch.spec.whatwg.org/#fetch-method +function fetch (input, init = {}) { + webidl.argumentLengthCheck(arguments, 1, { header: 'globalThis.fetch' }) +>>>>>>> 01cd0f6e (Diagnostics channel support) // 2. Let requestObject be the result of invoking the initial value of // Request as constructor with input and init as arguments. If this throws @@ -139,8 +204,115 @@ function fetch (input, init = undefined) { try { requestObject = new Request(input, init) } catch (e) { - p.reject(e) + return Promise.reject(e) + } + + return ifSubscribersRunStores(requestObject, input, init, p => { + // 3. Let request be requestObject’s request. + const request = requestObject[kState] + + // 4. If requestObject’s signal’s aborted flag is set, then: + if (requestObject.signal.aborted) { + // 1. Abort the fetch() call with p, request, null, and + // requestObject’s signal’s abort reason. + abortFetch(p, request, null, requestObject.signal.reason) + + // 2. Return p. + return p.promise + } + + // 5. Let globalObject be request’s client’s global object. + const globalObject = request.client.globalObject + + // 6. If globalObject is a ServiceWorkerGlobalScope object, then set + // request’s service-workers mode to "none". + if (globalObject?.constructor?.name === 'ServiceWorkerGlobalScope') { + request.serviceWorkers = 'none' + } + + // 7. Let responseObject be null. + let responseObject = null + + // 8. Let relevantRealm be this’s relevant Realm. + const relevantRealm = null + + // 9. Let locallyAborted be false. + let locallyAborted = false + + // 10. Let controller be null. + let controller = null + + // 11. Add the following abort steps to requestObject’s signal: + addAbortListener( + requestObject.signal, + () => { + // 1. Set locallyAborted to true. + locallyAborted = true + + // 2. Assert: controller is non-null. + assert(controller != null) + + // 3. Abort controller with requestObject’s signal’s abort reason. + controller.abort(requestObject.signal.reason) + + // 4. Abort the fetch() call with p, request, responseObject, + // and requestObject’s signal’s abort reason. + abortFetch(p, request, responseObject, requestObject.signal.reason) + } + ) + + // 12. Let handleFetchDone given response response be to finalize and + // report timing with response, globalObject, and "fetch". + const handleFetchDone = (response) => + finalizeAndReportTiming(response, 'fetch') + + // 13. Set controller to the result of calling fetch given request, + // with processResponseEndOfBody set to handleFetchDone, and processResponse + // given response being these substeps: + + const processResponse = (response) => { + // 1. If locallyAborted is true, terminate these substeps. + if (locallyAborted) { + return + } + + // 2. If response’s aborted flag is set, then: + if (response.aborted) { + // 1. Let deserializedError be the result of deserialize a serialized + // abort reason given controller’s serialized abort reason and + // relevantRealm. + + // 2. Abort the fetch() call with p, request, responseObject, and + // deserializedError. + + abortFetch(p, request, responseObject, controller.serializedAbortReason) + return + } + + // 3. If response is a network error, then reject p with a TypeError + // and terminate these substeps. + if (response.type === 'error') { + p.reject(new TypeError('fetch failed', { cause: response.error })) + return + } + + // 4. Set responseObject to the result of creating a Response object, + // given response, "immutable", and relevantRealm. + responseObject = fromInnerResponse(response, 'immutable', relevantRealm) + + // 5. Resolve p with responseObject. + p.resolve(responseObject) + } + + controller = fetching({ + request, + processResponseEndOfBody: handleFetchDone, + processResponse, + dispatcher: requestObject[kDispatcher] // undici + }) + // 14. Return p. return p.promise +<<<<<<< HEAD } // 3. Let request be requestObject’s request. @@ -245,10 +417,9 @@ function fetch (input, init = undefined) { processResponseEndOfBody: handleFetchDone, processResponse, dispatcher: requestObject[kDispatcher] // undici +======= +>>>>>>> 01cd0f6e (Diagnostics channel support) }) - - // 14. Return p. - return p.promise } // https://fetch.spec.whatwg.org/#finalize-and-report-timing diff --git a/test/node-test/debug.js b/test/node-test/debug.js index d7c462f57ae..f26d1cb9213 100644 --- a/test/node-test/debug.js +++ b/test/node-test/debug.js @@ -48,7 +48,7 @@ test('debug#websocket', { skip: !process.versions.icu }, async t => { }) test('debug#fetch', async t => { - const assert = tspl(t, { plan: 7 }) + const assert = tspl(t, { plan: 10 }) const child = spawn( process.execPath, [join(__dirname, '../fixtures/fetch.js')], @@ -58,11 +58,14 @@ test('debug#fetch', async t => { ) const chunks = [] const assertions = [ + /(FETCH [0-9]+:) (fetch has started)/, /(FETCH [0-9]+:) (connecting to)/, + /(FETCH [0-9]+:) (fetch has received)/, /(FETCH [0-9]+:) (connected to)/, /(FETCH [0-9]+:) (sending request)/, /(FETCH [0-9]+:) (received response)/, /(FETCH [0-9]+:) (trailers received)/, + /(FETCH [0-9]+:) (fetch has received)/, /^$/ ] diff --git a/test/node-test/diagnostics-channel/fetch.js b/test/node-test/diagnostics-channel/fetch.js new file mode 100644 index 00000000000..4987f15bfac --- /dev/null +++ b/test/node-test/diagnostics-channel/fetch.js @@ -0,0 +1,106 @@ +'use strict' + +const { tspl } = require('@matteo.collina/tspl') +const { describe, test, before, after } = require('node:test') +const { fetch } = require('../../..') + +let diagnosticsChannel +let skip = false +try { + diagnosticsChannel = require('node:diagnostics_channel') +} catch { + skip = true +} + +const { createServer } = require('http') + +describe('diagnosticsChannel for fetch', { skip }, () => { + let server + before(() => { + server = createServer((req, res) => { + res.setHeader('Content-Type', 'text/plain') + res.setHeader('trailer', 'foo') + res.write('hello') + res.addTrailers({ + foo: 'oof' + }) + res.end() + }) + }) + + after(() => { server.close() }) + + test('fetch', async t => { + t = tspl(t, { plan: 17 }) + + let startCalled = 0 + diagnosticsChannel.channel('tracing:undici:fetch:start').subscribe(({ input, init, result, error }) => { + startCalled += 1 + if (input.redirect) { + t.strictEqual(input, 'badrequest') + t.deepStrictEqual(init, { redirect: 'error' }) + } else { + t.strictEqual(input, `http://localhost:${server.address().port}`) + t.deepStrictEqual(init, {}) + } + }) + + let endCalled = 0 + diagnosticsChannel.channel('tracing:undici:fetch:end').subscribe(({ input, init, result, error }) => { + endCalled += 1 + if (init.redirect) { + t.strictEqual(input, 'badrequest') + t.deepStrictEqual(init, { redirect: 'error' }) + } else { + t.strictEqual(input, `http://localhost:${server.address().port}`) + t.deepStrictEqual(init, {}) + } + t.strictEqual(result, null) + }) + + let asyncStartCalled = 0 + diagnosticsChannel.channel('tracing:undici:fetch:asyncStart').subscribe(({ input, init, result, error }) => { + asyncStartCalled += 1 + if (init.redirect) { + t.strictEqual(input, 'badrequest') + t.deepStrictEqual(init, { redirect: 'error' }) + } else { + t.strictEqual(input, `http://localhost:${server.address().port}`) + t.deepStrictEqual(init, {}) + t.ok(result) + } + }) + + let asyncEndCalled = 0 + diagnosticsChannel.channel('tracing:undici:fetch:asyncEnd').subscribe(async ({ input, init, result, error }) => { + asyncEndCalled += 1 + if (init.redirect) { + t.strictEqual(input, 'badrequest') + t.deepStrictEqual(init, { redirect: 'error' }) + t.strictEqual(result, null) + t.ok(error) + t.strictEqual(error.cause.code, 'ERR_INVALID_URL') + } else { + t.strictEqual(input, `http://localhost:${server.address().port}`) + t.deepStrictEqual(init, {}) + t.ok(result) + t.strictEqual(result.status, 200) + t.strictEqual(error, null) + } + }) + + server.listen(0, async () => { + await fetch(`http://localhost:${server.address().port}`) + try { + await fetch('badrequest', { redirect: 'error' }) + } catch (e) { } + server.close() + t.strictEqual(startCalled, 1) + t.strictEqual(endCalled, 1) + t.strictEqual(asyncStartCalled, 1) + t.strictEqual(asyncEndCalled, 1) + }) + + await t.completed + }) +}) From 078afb1e2dac4b3e262c6d56be27c87277e26d42 Mon Sep 17 00:00:00 2001 From: Crystal Magloire Date: Wed, 8 May 2024 15:31:43 -0400 Subject: [PATCH 2/7] requested changes --- lib/web/fetch/index.js | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index 8de57461b7c..153a82cf6d5 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -64,6 +64,7 @@ const { dataURLProcessor, serializeAMimeType, minimizeSupportedMimeType } = requ const { getGlobalDispatcher } = require('../../global') const { webidl } = require('./webidl') const { STATUS_CODES } = require('node:http') +const { resourceLimits } = require('node:worker_threads') const GET_OR_HEAD = ['GET', 'HEAD'] const defaultUserAgent = typeof __UNDICI_IS_NODE__ !== 'undefined' || typeof esbuildDetection !== 'undefined' @@ -135,11 +136,11 @@ function fetch (input, init = undefined) { let p = createDeferredPromise() ======= // This will publish all diagnostic events only when we have subscribers. -function ifSubscribersRunStores (request, input, init, callback) { +function ifSubscribersRunStores (req, input, init, callback) { const hasSubscribers = subscribersCheck() if (hasSubscribers) { - const context = { request, input, init, result: null, error: null } + const context = { req, input, init: init ?? {}, result: null, error: null } return channels.start.runStores(context, () => { try { @@ -174,7 +175,7 @@ function createInstrumentedDeferredPromise (context) { res = function (result) { context.result = result channels.asyncStart.runStores(context, () => { - resolve.apply(this, arguments) + resolve(resourceLimits) channels.asyncEnd.publish(context) }) } @@ -182,7 +183,7 @@ function createInstrumentedDeferredPromise (context) { context.error = error channels.error.publish(context) channels.asyncStart.runStores(context, () => { - reject.apply(this, arguments) + reject(error) channels.asyncEnd.publish(context) }) } @@ -192,7 +193,7 @@ function createInstrumentedDeferredPromise (context) { } // https://fetch.spec.whatwg.org/#fetch-method -function fetch (input, init = {}) { +function fetch (input, init = undefined) { webidl.argumentLengthCheck(arguments, 1, { header: 'globalThis.fetch' }) >>>>>>> 01cd0f6e (Diagnostics channel support) From b9dc1ff23190801c9ef3fa494fe72f89a39d42d9 Mon Sep 17 00:00:00 2001 From: Crystal Magloire Date: Wed, 8 May 2024 17:43:02 -0400 Subject: [PATCH 3/7] requested changes --- lib/web/fetch/index.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index 153a82cf6d5..131557a30fa 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -64,7 +64,6 @@ const { dataURLProcessor, serializeAMimeType, minimizeSupportedMimeType } = requ const { getGlobalDispatcher } = require('../../global') const { webidl } = require('./webidl') const { STATUS_CODES } = require('node:http') -const { resourceLimits } = require('node:worker_threads') const GET_OR_HEAD = ['GET', 'HEAD'] const defaultUserAgent = typeof __UNDICI_IS_NODE__ !== 'undefined' || typeof esbuildDetection !== 'undefined' @@ -175,7 +174,7 @@ function createInstrumentedDeferredPromise (context) { res = function (result) { context.result = result channels.asyncStart.runStores(context, () => { - resolve(resourceLimits) + resolve(result) channels.asyncEnd.publish(context) }) } From 3fed3a8c808682f3cdb1da38e5c9feb4e81f0802 Mon Sep 17 00:00:00 2001 From: Crystal Magloire Date: Thu, 9 May 2024 15:22:53 -0400 Subject: [PATCH 4/7] requested changes --- lib/web/fetch/index.js | 120 ----------------------------------------- 1 file changed, 120 deletions(-) diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index 131557a30fa..04f9f8f38aa 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -122,18 +122,6 @@ class Fetch extends EE { } } -<<<<<<< HEAD -function handleFetchDone (response) { - finalizeAndReportTiming(response, 'fetch') -} - -// https://fetch.spec.whatwg.org/#fetch-method -function fetch (input, init = undefined) { - webidl.argumentLengthCheck(arguments, 1, 'globalThis.fetch') - - // 1. Let p be a new promise. - let p = createDeferredPromise() -======= // This will publish all diagnostic events only when we have subscribers. function ifSubscribersRunStores (req, input, init, callback) { const hasSubscribers = subscribersCheck() @@ -194,7 +182,6 @@ function createInstrumentedDeferredPromise (context) { // https://fetch.spec.whatwg.org/#fetch-method function fetch (input, init = undefined) { webidl.argumentLengthCheck(arguments, 1, { header: 'globalThis.fetch' }) ->>>>>>> 01cd0f6e (Diagnostics channel support) // 2. Let requestObject be the result of invoking the initial value of // Request as constructor with input and init as arguments. If this throws @@ -312,113 +299,6 @@ function fetch (input, init = undefined) { }) // 14. Return p. return p.promise -<<<<<<< HEAD - } - - // 3. Let request be requestObject’s request. - const request = requestObject[kState] - - // 4. If requestObject’s signal’s aborted flag is set, then: - if (requestObject.signal.aborted) { - // 1. Abort the fetch() call with p, request, null, and - // requestObject’s signal’s abort reason. - abortFetch(p, request, null, requestObject.signal.reason) - - // 2. Return p. - return p.promise - } - - // 5. Let globalObject be request’s client’s global object. - const globalObject = request.client.globalObject - - // 6. If globalObject is a ServiceWorkerGlobalScope object, then set - // request’s service-workers mode to "none". - if (globalObject?.constructor?.name === 'ServiceWorkerGlobalScope') { - request.serviceWorkers = 'none' - } - - // 7. Let responseObject be null. - let responseObject = null - - // 8. Let relevantRealm be this’s relevant Realm. - - // 9. Let locallyAborted be false. - let locallyAborted = false - - // 10. Let controller be null. - let controller = null - - // 11. Add the following abort steps to requestObject’s signal: - addAbortListener( - requestObject.signal, - () => { - // 1. Set locallyAborted to true. - locallyAborted = true - - // 2. Assert: controller is non-null. - assert(controller != null) - - // 3. Abort controller with requestObject’s signal’s abort reason. - controller.abort(requestObject.signal.reason) - - const realResponse = responseObject?.deref() - - // 4. Abort the fetch() call with p, request, responseObject, - // and requestObject’s signal’s abort reason. - abortFetch(p, request, realResponse, requestObject.signal.reason) - } - ) - - // 12. Let handleFetchDone given response response be to finalize and - // report timing with response, globalObject, and "fetch". - // see function handleFetchDone - - // 13. Set controller to the result of calling fetch given request, - // with processResponseEndOfBody set to handleFetchDone, and processResponse - // given response being these substeps: - - const processResponse = (response) => { - // 1. If locallyAborted is true, terminate these substeps. - if (locallyAborted) { - return - } - - // 2. If response’s aborted flag is set, then: - if (response.aborted) { - // 1. Let deserializedError be the result of deserialize a serialized - // abort reason given controller’s serialized abort reason and - // relevantRealm. - - // 2. Abort the fetch() call with p, request, responseObject, and - // deserializedError. - - abortFetch(p, request, responseObject, controller.serializedAbortReason) - return - } - - // 3. If response is a network error, then reject p with a TypeError - // and terminate these substeps. - if (response.type === 'error') { - p.reject(new TypeError('fetch failed', { cause: response.error })) - return - } - - // 4. Set responseObject to the result of creating a Response object, - // given response, "immutable", and relevantRealm. - responseObject = new WeakRef(fromInnerResponse(response, 'immutable')) - - // 5. Resolve p with responseObject. - p.resolve(responseObject.deref()) - p = null - } - - controller = fetching({ - request, - processResponseEndOfBody: handleFetchDone, - processResponse, - dispatcher: requestObject[kDispatcher] // undici -======= ->>>>>>> 01cd0f6e (Diagnostics channel support) }) } From d7ef0fb75f654717fcfc1f32b47da70d0a4f97f2 Mon Sep 17 00:00:00 2001 From: Crystal Magloire Date: Thu, 23 May 2024 11:06:18 -0400 Subject: [PATCH 5/7] resolving conflicts --- lib/web/fetch/index.js | 33 +++++++++++++-------- test/node-test/diagnostics-channel/fetch.js | 22 +++++++------- 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index 04f9f8f38aa..afd515447da 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -71,7 +71,6 @@ const defaultUserAgent = typeof __UNDICI_IS_NODE__ !== 'undefined' || typeof esb : 'undici' const channels = require('../../core/diagnostics.js').channels.tracingChannel - /** @type {import('buffer').resolveObjectURL} */ let resolveObjectURL @@ -122,12 +121,17 @@ class Fetch extends EE { } } +function handleFetchDone (response) { + finalizeAndReportTiming(response, 'fetch') +} + // This will publish all diagnostic events only when we have subscribers. function ifSubscribersRunStores (req, input, init, callback) { const hasSubscribers = subscribersCheck() + // console.log(arguments) if (hasSubscribers) { - const context = { req, input, init: init ?? {}, result: null, error: null } + const context = { req, input, init, result: null, error: null } return channels.start.runStores(context, () => { try { @@ -181,7 +185,7 @@ function createInstrumentedDeferredPromise (context) { // https://fetch.spec.whatwg.org/#fetch-method function fetch (input, init = undefined) { - webidl.argumentLengthCheck(arguments, 1, { header: 'globalThis.fetch' }) + webidl.argumentLengthCheck(arguments, 1, 'globalThis.fetch') // 2. Let requestObject be the result of invoking the initial value of // Request as constructor with input and init as arguments. If this throws @@ -200,8 +204,8 @@ function fetch (input, init = undefined) { // 4. If requestObject’s signal’s aborted flag is set, then: if (requestObject.signal.aborted) { - // 1. Abort the fetch() call with p, request, null, and - // requestObject’s signal’s abort reason. + // 1. Abort the fetch() call with p, request, null, and + // requestObject’s signal’s abort reason. abortFetch(p, request, null, requestObject.signal.reason) // 2. Return p. @@ -221,7 +225,6 @@ function fetch (input, init = undefined) { let responseObject = null // 8. Let relevantRealm be this’s relevant Realm. - const relevantRealm = null // 9. Let locallyAborted be false. let locallyAborted = false @@ -242,16 +245,17 @@ function fetch (input, init = undefined) { // 3. Abort controller with requestObject’s signal’s abort reason. controller.abort(requestObject.signal.reason) + const realResponse = responseObject?.deref() + // 4. Abort the fetch() call with p, request, responseObject, // and requestObject’s signal’s abort reason. - abortFetch(p, request, responseObject, requestObject.signal.reason) + abortFetch(p, request, realResponse, requestObject.signal.reason) } ) // 12. Let handleFetchDone given response response be to finalize and // report timing with response, globalObject, and "fetch". - const handleFetchDone = (response) => - finalizeAndReportTiming(response, 'fetch') + // see function handleFetchDone // 13. Set controller to the result of calling fetch given request, // with processResponseEndOfBody set to handleFetchDone, and processResponse @@ -285,10 +289,11 @@ function fetch (input, init = undefined) { // 4. Set responseObject to the result of creating a Response object, // given response, "immutable", and relevantRealm. - responseObject = fromInnerResponse(response, 'immutable', relevantRealm) + responseObject = new WeakRef(fromInnerResponse(response, 'immutable')) // 5. Resolve p with responseObject. - p.resolve(responseObject) + p.resolve(responseObject.deref()) + p = null } controller = fetching({ @@ -297,6 +302,7 @@ function fetch (input, init = undefined) { processResponse, dispatcher: requestObject[kDispatcher] // undici }) + // 14. Return p. return p.promise }) @@ -420,6 +426,7 @@ function fetching ({ useParallelQueue = false, dispatcher = getGlobalDispatcher() // undici }) { + console.log('request', request) // Ensure that the dispatcher is set accordingly assert(dispatcher) @@ -494,9 +501,9 @@ function fetching ({ // 9. If request’s origin is "client", then set request’s origin to request’s // client’s origin. + console.log('reqyest', request) if (request.origin === 'client') { - // TODO: What if request.client is null? - request.origin = request.client?.origin + request.origin = request.client.origin } // 10. If all of the following conditions are true: diff --git a/test/node-test/diagnostics-channel/fetch.js b/test/node-test/diagnostics-channel/fetch.js index 4987f15bfac..49fec9bed36 100644 --- a/test/node-test/diagnostics-channel/fetch.js +++ b/test/node-test/diagnostics-channel/fetch.js @@ -34,47 +34,47 @@ describe('diagnosticsChannel for fetch', { skip }, () => { t = tspl(t, { plan: 17 }) let startCalled = 0 - diagnosticsChannel.channel('tracing:undici:fetch:start').subscribe(({ input, init, result, error }) => { + diagnosticsChannel.channel('tracing:undici:fetch:start').subscribe(({ req, input, init, result, error }) => { startCalled += 1 if (input.redirect) { t.strictEqual(input, 'badrequest') t.deepStrictEqual(init, { redirect: 'error' }) } else { t.strictEqual(input, `http://localhost:${server.address().port}`) - t.deepStrictEqual(init, {}) + t.deepStrictEqual(init, undefined) } }) let endCalled = 0 - diagnosticsChannel.channel('tracing:undici:fetch:end').subscribe(({ input, init, result, error }) => { + diagnosticsChannel.channel('tracing:undici:fetch:end').subscribe(({ req, input, init, result, error }) => { endCalled += 1 - if (init.redirect) { + if (init && init.redirect) { t.strictEqual(input, 'badrequest') t.deepStrictEqual(init, { redirect: 'error' }) } else { t.strictEqual(input, `http://localhost:${server.address().port}`) - t.deepStrictEqual(init, {}) + t.deepStrictEqual(init, undefined) } t.strictEqual(result, null) }) let asyncStartCalled = 0 - diagnosticsChannel.channel('tracing:undici:fetch:asyncStart').subscribe(({ input, init, result, error }) => { + diagnosticsChannel.channel('tracing:undici:fetch:asyncStart').subscribe(({ req, input, init, result, error }) => { asyncStartCalled += 1 - if (init.redirect) { + if (init && init.redirect) { t.strictEqual(input, 'badrequest') t.deepStrictEqual(init, { redirect: 'error' }) } else { t.strictEqual(input, `http://localhost:${server.address().port}`) - t.deepStrictEqual(init, {}) + t.deepStrictEqual(init, undefined) t.ok(result) } }) let asyncEndCalled = 0 - diagnosticsChannel.channel('tracing:undici:fetch:asyncEnd').subscribe(async ({ input, init, result, error }) => { + diagnosticsChannel.channel('tracing:undici:fetch:asyncEnd').subscribe(async ({ req, input, init, result, error }) => { asyncEndCalled += 1 - if (init.redirect) { + if (init && init.redirect) { t.strictEqual(input, 'badrequest') t.deepStrictEqual(init, { redirect: 'error' }) t.strictEqual(result, null) @@ -82,7 +82,7 @@ describe('diagnosticsChannel for fetch', { skip }, () => { t.strictEqual(error.cause.code, 'ERR_INVALID_URL') } else { t.strictEqual(input, `http://localhost:${server.address().port}`) - t.deepStrictEqual(init, {}) + t.deepStrictEqual(init, undefined) t.ok(result) t.strictEqual(result.status, 200) t.strictEqual(error, null) From 2f201371ab8d0e6e9bc2553d100d2437f19140e0 Mon Sep 17 00:00:00 2001 From: Crystal Magloire Date: Thu, 23 May 2024 11:15:38 -0400 Subject: [PATCH 6/7] resolving conflicts --- lib/web/fetch/index.js | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index afd515447da..79991224617 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -128,7 +128,6 @@ function handleFetchDone (response) { // This will publish all diagnostic events only when we have subscribers. function ifSubscribersRunStores (req, input, init, callback) { const hasSubscribers = subscribersCheck() - // console.log(arguments) if (hasSubscribers) { const context = { req, input, init, result: null, error: null } @@ -426,7 +425,6 @@ function fetching ({ useParallelQueue = false, dispatcher = getGlobalDispatcher() // undici }) { - console.log('request', request) // Ensure that the dispatcher is set accordingly assert(dispatcher) @@ -501,9 +499,9 @@ function fetching ({ // 9. If request’s origin is "client", then set request’s origin to request’s // client’s origin. - console.log('reqyest', request) if (request.origin === 'client') { - request.origin = request.client.origin + // TODO: What if request.client is null? + request.origin = request.client?.origin } // 10. If all of the following conditions are true: From 1a06cff589ac3d8a4ca8567cfa226ad031fba747 Mon Sep 17 00:00:00 2001 From: Crystal Magloire Date: Mon, 3 Jun 2024 15:33:35 -0400 Subject: [PATCH 7/7] adding comment line for p --- docs/docs/api/DiagnosticsChannel.md | 10 +++++----- lib/web/fetch/index.js | 2 ++ 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/docs/api/DiagnosticsChannel.md b/docs/docs/api/DiagnosticsChannel.md index 9fae2c40c77..faf89a8b961 100644 --- a/docs/docs/api/DiagnosticsChannel.md +++ b/docs/docs/api/DiagnosticsChannel.md @@ -210,7 +210,7 @@ This message is published when `fetch` is called, and will publish the arguments ```js import diagnosticsChannel from 'diagnostics_channel' -diagnosticsChannel.channel('tracing:undici:fetch:start').subscribe(({ input, init }) => { +diagnosticsChannel.channel('tracing:undici:fetch:start').subscribe(({ req, input, init, }) => { console.log('input', input) console.log('init', init) }) @@ -221,7 +221,7 @@ diagnosticsChannel.channel('tracing:undici:fetch:start').subscribe(({ input, ini This message is published at the end of `fetch`'s execution, and will publish any `error` from the synchronous part of `fetch`. Since `fetch` is asynchronous, this should be empty. This channel will publish the same values as `undici:fetch:start`, but we are including it to track when `fetch` finishes execution and to be consistent with [`TracingChannel`](https://nodejs.org/api/diagnostics_channel.html#class-tracingchannel). ```js import diagnosticsChannel from 'diagnostics_channel' -diagnosticsChannel.channel('tracing:undici:fetch:end').subscribe(({ input, init, error }) => { +diagnosticsChannel.channel('tracing:undici:fetch:end').subscribe(({ req, input, init, error }) => { console.log('input', input) console.log('init', init) console.log('error', error) // should be empty @@ -231,7 +231,7 @@ diagnosticsChannel.channel('tracing:undici:fetch:end').subscribe(({ input, init, This message is published after `fetch` resolves or rejects. If `fetch` resolves, it publishes the response in `result`. If it rejects, it publishes the error in `error`. ```js import diagnosticsChannel from 'diagnostics_channel' -diagnosticsChannel.channel('tracing:undici:fetch:asyncStart').subscribe(({ input, init, result, error }) => { +diagnosticsChannel.channel('tracing:undici:fetch:asyncStart').subscribe(({ req, input, init, result, error }) => { console.log('input', input) console.log('init', init) console.log('response', result) @@ -242,7 +242,7 @@ diagnosticsChannel.channel('tracing:undici:fetch:asyncStart').subscribe(({ input This channel gets published the same values as and at the same time as `tracing:undici:fetch:asyncStart` in the case of [`tracingChannel.tracePromise`](https://nodejs.org/api/diagnostics_channel.html#tracingchanneltracepromisefn-context-thisarg-args) ```js import diagnosticsChannel from 'diagnostics_channel' -diagnosticsChannel.channel('tracing:undici:fetch:asyncEnd').subscribe(({ input, init, result, error }) => { +diagnosticsChannel.channel('tracing:undici:fetch:asyncEnd').subscribe(({ req, input, init, result, error }) => { console.log('input', input) console.log('init', init) console.log('response', result) @@ -253,7 +253,7 @@ diagnosticsChannel.channel('tracing:undici:fetch:asyncEnd').subscribe(({ input, This message is published when an error is thrown or promise rejects while calling `fetch`. ```js import diagnosticsChannel from 'diagnostics_channel' -diagnosticsChannel.channel('tracing:undici:fetch:error').subscribe(({ input, init, error }) => { +diagnosticsChannel.channel('tracing:undici:fetch:error').subscribe(({ req, input, init, error }) => { console.log('input', input) console.log('init', init) console.log('error', error) diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index 79991224617..d6a00f129d7 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -186,6 +186,8 @@ function createInstrumentedDeferredPromise (context) { function fetch (input, init = undefined) { webidl.argumentLengthCheck(arguments, 1, 'globalThis.fetch') + // 1. Let p be a new promise. + // 2. Let requestObject be the result of invoking the initial value of // Request as constructor with input and init as arguments. If this throws // an exception, reject p with it and return p.