From 4eaf6dc481f1a12ef174feb53e90f4b7c7825c25 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Sat, 4 May 2024 16:15:46 +0300 Subject: [PATCH] Use revised TransactionObservabilityManager signature that prevents overwriting ongoing transactions with the same span id --- packages/amqp/lib/AbstractAmqpConsumer.ts | 6 +++++- packages/core/lib/types/MessageQueueTypes.ts | 2 +- packages/sqs/lib/sqs/AbstractSqsConsumer.ts | 6 +++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/packages/amqp/lib/AbstractAmqpConsumer.ts b/packages/amqp/lib/AbstractAmqpConsumer.ts index d9215ff9..8075b919 100644 --- a/packages/amqp/lib/AbstractAmqpConsumer.ts +++ b/packages/amqp/lib/AbstractAmqpConsumer.ts @@ -136,7 +136,11 @@ export abstract class AbstractAmqpConsumer< deserializedMessage.result[this.messageTypeField] }` - this.transactionObservabilityManager?.start(transactionSpanId) + this.transactionObservabilityManager?.start( + transactionSpanId, + // @ts-ignore + deserializedMessage.result[this.messageIdField], + ) if (this.logMessages) { const resolvedLogMessage = this.resolveMessageLog(deserializedMessage.result, messageType) this.logMessage(resolvedLogMessage) diff --git a/packages/core/lib/types/MessageQueueTypes.ts b/packages/core/lib/types/MessageQueueTypes.ts index 3c1dab6a..5787f3d7 100644 --- a/packages/core/lib/types/MessageQueueTypes.ts +++ b/packages/core/lib/types/MessageQueueTypes.ts @@ -21,7 +21,7 @@ export interface AsyncPublisher { } export type TransactionObservabilityManager = { - start: (transactionSpanId: string) => unknown + start: (transactionSpanId: string, uniqueTransactionKey: string) => unknown stop: (transactionSpanId: string) => unknown } diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts index befa7d65..052755c6 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts @@ -204,7 +204,11 @@ export abstract class AbstractSqsConsumer< const messageType = deserializedMessage.result[this.messageTypeField] const transactionSpanId = `queue_${this.queueName}:${messageType}` - this.transactionObservabilityManager?.start(transactionSpanId) + this.transactionObservabilityManager?.start( + transactionSpanId, + // @ts-ignore + deserializedMessage.result[this.messageIdField], + ) if (this.logMessages) { const resolvedLogMessage = this.resolveMessageLog(deserializedMessage.result, messageType) this.logMessage(resolvedLogMessage)