From 6dce41905953c3b157b15b1d46a37d1db4ede389 Mon Sep 17 00:00:00 2001 From: gurke Date: Tue, 28 Mar 2023 12:58:16 +0200 Subject: [PATCH] fix(amqp): publish span not being transmitted when confirm cb is missing (#745) --- .../messaging/amqp/publisherCallbacks.js | 18 ++++++++++++++++++ .../messaging/amqp/publisherControls.js | 12 ++++++++++++ .../messaging/amqp/publisherPromises.js | 18 ++++++++++++++++++ .../test/tracing/messaging/amqp/test.js | 14 ++++++++++++++ .../tracing/instrumentation/messaging/amqp.js | 13 ++++++++++++- 5 files changed, 74 insertions(+), 1 deletion(-) diff --git a/packages/collector/test/tracing/messaging/amqp/publisherCallbacks.js b/packages/collector/test/tracing/messaging/amqp/publisherCallbacks.js index abf9cf3904..61fdc228c5 100644 --- a/packages/collector/test/tracing/messaging/amqp/publisherCallbacks.js +++ b/packages/collector/test/tracing/messaging/amqp/publisherCallbacks.js @@ -170,6 +170,24 @@ app.post('/send-to-get-queue', (req, res) => { }); }); +app.post('/publish-to-confirm-channel-without-callback', (req, res) => { + // @golevelup/nestjs-rabbitmq 3.3.0 did not pass the confirm callback + // eslint-disable-next-line max-len + // https://github.com/golevelup/nestjs/blob/%40golevelup/nestjs-rabbitmq%403.3.0/packages/rabbitmq/src/amqp/connection.ts#L541 + // https://github.com/jwalton/node-amqp-connection-manager/blob/v4.1.11/src/ChannelWrapper.ts#L13 + // https://github.com/amqp-node/amqplib/blob/v0.10.3/lib/channel_model.js#L265 + confirmChannel.publish(exchange, '', Buffer.from(req.body.message)); + + request(`http://127.0.0.1:${agentPort}`) + .then(() => { + res.status(201).send('OK'); + }) + .catch(err2 => { + log(err2); + res.sendStatus(500); + }); +}); + app.post('/send-to-confirm-queue', (req, res) => { confirmChannel.sendToQueue(queueNameConfirm, Buffer.from(req.body.message), {}, err => { if (err) { diff --git a/packages/collector/test/tracing/messaging/amqp/publisherControls.js b/packages/collector/test/tracing/messaging/amqp/publisherControls.js index 11b241a02e..c720305042 100644 --- a/packages/collector/test/tracing/messaging/amqp/publisherControls.js +++ b/packages/collector/test/tracing/messaging/amqp/publisherControls.js @@ -90,6 +90,18 @@ exports.sendToGetQueue = (message, headers) => } }); +exports.publishToConfirmChannelWithoutCallback = (message, headers) => + request({ + method: 'POST', + url: `http://127.0.0.1:${appPort}/publish-to-confirm-channel-without-callback`, + json: true, + simple: true, + headers, + body: { + message + } + }); + exports.sendToConfirmQueue = (message, headers) => request({ method: 'POST', diff --git a/packages/collector/test/tracing/messaging/amqp/publisherPromises.js b/packages/collector/test/tracing/messaging/amqp/publisherPromises.js index d2ca9df5a0..03e9aea634 100644 --- a/packages/collector/test/tracing/messaging/amqp/publisherPromises.js +++ b/packages/collector/test/tracing/messaging/amqp/publisherPromises.js @@ -120,6 +120,24 @@ app.post('/send-to-get-queue', (req, res) => { }); }); +app.post('/publish-to-confirm-channel-without-callback', (req, res) => { + // @golevelup/nestjs-rabbitmq 3.3.0 did not pass the confirm callback + // eslint-disable-next-line max-len + // https://github.com/golevelup/nestjs/blob/%40golevelup/nestjs-rabbitmq%403.3.0/packages/rabbitmq/src/amqp/connection.ts#L541 + // https://github.com/jwalton/node-amqp-connection-manager/blob/v4.1.11/src/ChannelWrapper.ts#L13 + // https://github.com/amqp-node/amqplib/blob/v0.10.3/lib/channel_model.js#L265 + confirmChannel.publish(exchange, '', Buffer.from(req.body.message)); + + request(`http://127.0.0.1:${agentPort}`) + .then(() => { + res.status(201).send('OK'); + }) + .catch(err2 => { + log(err2); + res.sendStatus(500); + }); +}); + app.post('/send-to-confirm-queue', (req, res) => { // Even with a ConfirmChannel and even in the promise API case, sendToQueue/publish do not return a promise, but only // accept a callback, see diff --git a/packages/collector/test/tracing/messaging/amqp/test.js b/packages/collector/test/tracing/messaging/amqp/test.js index e48f04a0cc..5fc21ce04e 100644 --- a/packages/collector/test/tracing/messaging/amqp/test.js +++ b/packages/collector/test/tracing/messaging/amqp/test.js @@ -125,6 +125,20 @@ const mochaSuiteFn = supportedVersion(process.versions.node) ? describe : descri ) )); + it('must record an exit span for ConfirmChannel#publish without confirm callback', () => + publisherControls.publishToConfirmChannelWithoutCallback('Ohai!').then(() => + retry(() => + agentControls.getSpans().then(spans => { + const httpEntry = verifyHttpEntry(spans); + const rabbitMqExit = verifyRabbitMqExit(spans, httpEntry); + expect(rabbitMqExit.data.rabbitmq.exchange).to.eql(exchange); + expect(rabbitMqExit.data.rabbitmq.key).to.not.exist; + verifyHttpExit(spans, httpEntry); + }) + ) + )); + + // `sendToQueue` calls .publish internally it('must record an exit span for ConfirmChannel#sendToQueue', () => publisherControls.sendToConfirmQueue('Ohai!').then(() => retry(() => diff --git a/packages/core/src/tracing/instrumentation/messaging/amqp.js b/packages/core/src/tracing/instrumentation/messaging/amqp.js index 2672c4ae9d..2763fd7e8c 100644 --- a/packages/core/src/tracing/instrumentation/messaging/amqp.js +++ b/packages/core/src/tracing/instrumentation/messaging/amqp.js @@ -62,7 +62,6 @@ function instrumentedSendMessage(ctx, originalSendMessage, originalArgs) { propagateSuppression(originalArgs[0]); propagateSuppression(originalArgs[1]); } - return originalSendMessage.apply(ctx, originalArgs); } @@ -71,6 +70,7 @@ function instrumentedSendMessage(ctx, originalSendMessage, originalArgs) { return originalSendMessage.apply(ctx, originalArgs); } + // instrumentedChannelModelPublish starts the span without data. The fn is responsible to transmit the span. if (isExitSpan && parentSpan.n === 'rabbitmq') { // if ConfirmChannel#publish/sendToQueue has been invoked, we have already created a new cls context in // instrumentedChannelModelPublish and must not do so again here. @@ -412,6 +412,7 @@ function instrumentedChannelModelPublish(ctx, originalFunction, originalArgs) { return cls.ns.runAndReturn(() => { const span = cls.startSpan('rabbitmq', constants.EXIT); + // everything else is handled in instrumentedSendMessage/processExitSpan if (originalArgs.length >= 5 && typeof originalArgs[4] === 'function') { const originalCb = originalArgs[4]; @@ -420,7 +421,12 @@ function instrumentedChannelModelPublish(ctx, originalFunction, originalArgs) { span.transmit(); originalCb.apply(this, arguments); }); + } else { + // CASE: confirm callback missing. amqplib does not throw any error, just transmit the span + span.d = Date.now() - span.ts; + span.transmit(); } + return originalFunction.apply(ctx, originalArgs); }); } @@ -453,7 +459,12 @@ function instrumentedCallbackModelPublish(ctx, originalFunction, originalArgs) { span.transmit(); originalCb.apply(this, arguments); }); + } else { + // CASE: confirm callback missing. amqplib does not throw any error, just transmit the span + span.d = Date.now() - span.ts; + span.transmit(); } + return originalFunction.apply(ctx, originalArgs); }); }