Skip to content

Commit

Permalink
fix(amqp): publish span not being transmitted when confirm cb is miss…
Browse files Browse the repository at this point in the history
…ing (#745)
  • Loading branch information
kirrg001 authored Mar 28, 2023
1 parent d0ac134 commit 6dce419
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,24 @@ app.post('/send-to-get-queue', (req, res) => {
});
});

app.post('/publish-to-confirm-channel-without-callback', (req, res) => {
// @golevelup/nestjs-rabbitmq 3.3.0 did not pass the confirm callback
// eslint-disable-next-line max-len
// https://github.com/golevelup/nestjs/blob/%40golevelup/nestjs-rabbitmq%403.3.0/packages/rabbitmq/src/amqp/connection.ts#L541
// https://github.com/jwalton/node-amqp-connection-manager/blob/v4.1.11/src/ChannelWrapper.ts#L13
// https://github.com/amqp-node/amqplib/blob/v0.10.3/lib/channel_model.js#L265
confirmChannel.publish(exchange, '', Buffer.from(req.body.message));

request(`http://127.0.0.1:${agentPort}`)
.then(() => {
res.status(201).send('OK');
})
.catch(err2 => {
log(err2);
res.sendStatus(500);
});
});

app.post('/send-to-confirm-queue', (req, res) => {
confirmChannel.sendToQueue(queueNameConfirm, Buffer.from(req.body.message), {}, err => {
if (err) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ exports.sendToGetQueue = (message, headers) =>
}
});

exports.publishToConfirmChannelWithoutCallback = (message, headers) =>
request({
method: 'POST',
url: `http://127.0.0.1:${appPort}/publish-to-confirm-channel-without-callback`,
json: true,
simple: true,
headers,
body: {
message
}
});

exports.sendToConfirmQueue = (message, headers) =>
request({
method: 'POST',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,24 @@ app.post('/send-to-get-queue', (req, res) => {
});
});

app.post('/publish-to-confirm-channel-without-callback', (req, res) => {
// @golevelup/nestjs-rabbitmq 3.3.0 did not pass the confirm callback
// eslint-disable-next-line max-len
// https://github.com/golevelup/nestjs/blob/%40golevelup/nestjs-rabbitmq%403.3.0/packages/rabbitmq/src/amqp/connection.ts#L541
// https://github.com/jwalton/node-amqp-connection-manager/blob/v4.1.11/src/ChannelWrapper.ts#L13
// https://github.com/amqp-node/amqplib/blob/v0.10.3/lib/channel_model.js#L265
confirmChannel.publish(exchange, '', Buffer.from(req.body.message));

request(`http://127.0.0.1:${agentPort}`)
.then(() => {
res.status(201).send('OK');
})
.catch(err2 => {
log(err2);
res.sendStatus(500);
});
});

app.post('/send-to-confirm-queue', (req, res) => {
// Even with a ConfirmChannel and even in the promise API case, sendToQueue/publish do not return a promise, but only
// accept a callback, see
Expand Down
14 changes: 14 additions & 0 deletions packages/collector/test/tracing/messaging/amqp/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ const mochaSuiteFn = supportedVersion(process.versions.node) ? describe : descri
)
));

it('must record an exit span for ConfirmChannel#publish without confirm callback', () =>
publisherControls.publishToConfirmChannelWithoutCallback('Ohai!').then(() =>
retry(() =>
agentControls.getSpans().then(spans => {
const httpEntry = verifyHttpEntry(spans);
const rabbitMqExit = verifyRabbitMqExit(spans, httpEntry);
expect(rabbitMqExit.data.rabbitmq.exchange).to.eql(exchange);
expect(rabbitMqExit.data.rabbitmq.key).to.not.exist;
verifyHttpExit(spans, httpEntry);
})
)
));

// `sendToQueue` calls .publish internally
it('must record an exit span for ConfirmChannel#sendToQueue', () =>
publisherControls.sendToConfirmQueue('Ohai!').then(() =>
retry(() =>
Expand Down
13 changes: 12 additions & 1 deletion packages/core/src/tracing/instrumentation/messaging/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ function instrumentedSendMessage(ctx, originalSendMessage, originalArgs) {
propagateSuppression(originalArgs[0]);
propagateSuppression(originalArgs[1]);
}

return originalSendMessage.apply(ctx, originalArgs);
}

Expand All @@ -71,6 +70,7 @@ function instrumentedSendMessage(ctx, originalSendMessage, originalArgs) {
return originalSendMessage.apply(ctx, originalArgs);
}

// instrumentedChannelModelPublish starts the span without data. The fn is responsible to transmit the span.
if (isExitSpan && parentSpan.n === 'rabbitmq') {
// if ConfirmChannel#publish/sendToQueue has been invoked, we have already created a new cls context in
// instrumentedChannelModelPublish and must not do so again here.
Expand Down Expand Up @@ -412,6 +412,7 @@ function instrumentedChannelModelPublish(ctx, originalFunction, originalArgs) {

return cls.ns.runAndReturn(() => {
const span = cls.startSpan('rabbitmq', constants.EXIT);

// everything else is handled in instrumentedSendMessage/processExitSpan
if (originalArgs.length >= 5 && typeof originalArgs[4] === 'function') {
const originalCb = originalArgs[4];
Expand All @@ -420,7 +421,12 @@ function instrumentedChannelModelPublish(ctx, originalFunction, originalArgs) {
span.transmit();
originalCb.apply(this, arguments);
});
} else {
// CASE: confirm callback missing. amqplib does not throw any error, just transmit the span
span.d = Date.now() - span.ts;
span.transmit();
}

return originalFunction.apply(ctx, originalArgs);
});
}
Expand Down Expand Up @@ -453,7 +459,12 @@ function instrumentedCallbackModelPublish(ctx, originalFunction, originalArgs) {
span.transmit();
originalCb.apply(this, arguments);
});
} else {
// CASE: confirm callback missing. amqplib does not throw any error, just transmit the span
span.d = Date.now() - span.ts;
span.transmit();
}

return originalFunction.apply(ctx, originalArgs);
});
}
Expand Down

0 comments on commit 6dce419

Please sign in to comment.