Skip to content

Commit

Permalink
ServiceBusProcessor instrumentation is working
Browse files Browse the repository at this point in the history
  • Loading branch information
tippmar-nr committed Nov 20, 2024
1 parent 2418bd4 commit cbc8de2
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 138 deletions.
1 change: 1 addition & 0 deletions src/Agent/NewRelic/Agent/Core/Metrics/MetricNames.cs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ public enum MessageBrokerAction
Consume,
Peek,
Purge,
Process
}

public const string MessageBrokerPrefix = "MessageBroker";
Expand Down
2 changes: 2 additions & 0 deletions src/Agent/NewRelic/Agent/Core/Transactions/Transaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,8 @@ private static MetricNames.MessageBrokerAction AgentWrapperApiEnumToMetricNamesE
return MetricNames.MessageBrokerAction.Produce;
case MessageBrokerAction.Purge:
return MetricNames.MessageBrokerAction.Purge;
case MessageBrokerAction.Process:
return MetricNames.MessageBrokerAction.Process;
default:
throw new InvalidOperationException("Unexpected enum value: " + wrapper);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public enum MessageBrokerAction
Consume,
Peek,
Purge,
Process
}

///<summary>This enum must be a sequence of values starting with 0 and incrementing by 1. See MetricNames.GetEnumerationFunc</summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
using System.Threading.Tasks;
using NewRelic.Agent.Api;
using NewRelic.Agent.Extensions.Providers.Wrapper;
using NewRelic.Agent.Extensions.SystemExtensions;

namespace NewRelic.Providers.Wrapper.AzureServiceBus;

public class AzureServiceBusProcessorWrapper : AzureServiceBusWrapperBase
{
public override bool IsTransactionRequired => false;
public override bool IsTransactionRequired => true;

public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo)
{
Expand All @@ -20,50 +19,29 @@ public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMetho

public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction)
{
dynamic serviceBusProcessor = instrumentedMethodCall.MethodCall.InvocationTarget;
string queueName = serviceBusProcessor.EntityPath; // some-queue-name
string fqns = serviceBusProcessor.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net

//transaction = agent.CreateTransaction(
// destinationType: MessageBrokerDestinationType.Queue,
// BrokerVendorName,
// destination: queueName);

// ???
var segment = agent.CurrentTransaction.StartMethodSegment(instrumentedMethodCall.MethodCall, "Azure.Messaging.ServiceBus.ServiceBusProcessor", "ProcessMessageAsync");

//// start a message broker segment ???
//var segment = transaction.StartMessageBrokerSegment(
// instrumentedMethodCall.MethodCall,
// MessageBrokerDestinationType.Queue,
// MessageBrokerAction.Consume,
// BrokerVendorName,
// queueName,
// serverAddress: fqns);
if (instrumentedMethodCall.IsAsync)
transaction.AttachToAsync();

// this call wraps the client event handler callback, so start a method segment that will time the callback
var segment = transaction.StartMethodSegment(
instrumentedMethodCall.MethodCall,
instrumentedMethodCall.MethodCall.Method.Type.Name,
instrumentedMethodCall.MethodCall.Method.MethodName);

return instrumentedMethodCall.IsAsync ?
Delegates.GetAsyncDelegateFor<Task>(
agent,
segment,
true, // TODO Is this correct??
t =>
{
if (t.IsFaulted)
false, // TODO Is this correct?
onComplete: t =>
{
transaction.NoticeError(t.Exception);
}
if (t.Status == TaskStatus.Faulted)
transaction.NoticeError(t.Exception);
segment.End();
// transaction.End();
}, TaskContinuationOptions.ExecuteSynchronously)
:
Delegates.GetDelegateFor(
onFailure: transaction.NoticeError,
onComplete: () =>
{
segment.End();
// transaction.End();
});
}
)
:
Delegates.GetDelegateFor(segment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
using NewRelic.Agent.Api;
using NewRelic.Agent.Api.Experimental;
using NewRelic.Agent.Extensions.Providers.Wrapper;
using NewRelic.Reflection;

namespace NewRelic.Providers.Wrapper.AzureServiceBus;

public class AzureServiceBusReceiveWrapper : AzureServiceBusWrapperBase
{
private static readonly ConcurrentDictionary<Type, Func<object, object>> _getResultFromGenericTask = new();

private Func<object, object> _innerReceiverAccessor;
private Func<object, bool> _innerReceiverIsProcessorAccessor;

Expand Down Expand Up @@ -50,17 +54,26 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho
_ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}")
};

// if the inner receiver is configured as a processor and this is a ReceiveMessagesAsync call, start a transaction
// the transaction will end at the conclusion of ReceiverManager.ProcessOneMessage()
// If the inner receiver is configured as a processor and this is a ReceiveMessagesAsync call, start a transaction.
// The transaction will end at the conclusion of ReceiverManager.ProcessOneMessageWithinScopeAsync()
if (isProcessor && instrumentedMethodCall.MethodCall.Method.MethodName == "ReceiveMessagesAsync")
{
transaction = agent.CreateTransaction(
destinationType: MessageBrokerDestinationType.Queue,
BrokerVendorName,
destination: queueName);

if (instrumentedMethodCall.IsAsync)
transaction.DetachFromPrimary();
}

if (instrumentedMethodCall.IsAsync)
{
transaction.AttachToAsync();
}

// start a message broker segment

// start a message broker segment (only happens if transaction is not NoOpTransaction)
var segment = transaction.StartMessageBrokerSegment(
instrumentedMethodCall.MethodCall,
MessageBrokerDestinationType.Queue,
Expand All @@ -77,7 +90,7 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho
Delegates.GetAsyncDelegateFor<Task>(
agent,
segment,
true,
true, // TODO Is this correct??
(responseTask) =>
{
try
Expand All @@ -90,9 +103,84 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho
}
},
TaskContinuationOptions.ExecuteSynchronously)
: Delegates.GetDelegateFor<object>(
:
Delegates.GetDelegateFor<object>(
onFailure: transaction.NoticeError,
onComplete: segment.End,
onSuccess: (resultObj) => ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName));
}

private static object GetTaskResultFromObject(object taskObj)
{
var task = taskObj as Task;
if (task == null)
{
return null;
}
if (task.IsFaulted)
{
return null;
}
if (task.IsCanceled)
{
return null;
}

var getResponse = _getResultFromGenericTask.GetOrAdd(task.GetType(), t => VisibilityBypasser.Instance.GeneratePropertyAccessor<object>(t, "Result"));
return getResponse(task);
}

private static void HandleReceiveResponse(Task responseTask, string instrumentedMethodName, ITransaction transaction)
{
if (responseTask.IsCanceled)
return;

if (responseTask.IsFaulted)
{
transaction.NoticeError(responseTask.Exception);
return;
}

var resultObj = GetTaskResultFromObject(responseTask);
ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName);
}
private static void ExtractDTHeadersIfAvailable(object resultObj, ITransaction transaction, string instrumentedMethodName)
{
if (resultObj != null)
{
switch (instrumentedMethodName)
{
case "ReceiveMessagesAsync":
case "ReceiveDeferredMessagesAsync":
case "PeekMessagesInternalAsync":
// the response contains a list of messages.
// get the first message from the response and extract DT headers
dynamic messages = resultObj;
if (messages.Count > 0)
{
var msg = messages[0];
if (msg.ApplicationProperties is ReadOnlyDictionary<string, object> applicationProperties)
{
transaction.AcceptDistributedTraceHeaders(applicationProperties, ProcessHeaders, TransportType.Queue);
}
}
break;
}
}
}

private static IEnumerable<string> ProcessHeaders(ReadOnlyDictionary<string, object> applicationProperties, string key)
{
var headerValues = new List<string>();
foreach (var item in applicationProperties)
{
if (item.Key.Equals(key, StringComparison.OrdinalIgnoreCase))
{
headerValues.Add(item.Value as string);
}
}

return headerValues;
}

}
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
// Copyright 2020 New Relic, Inc. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Threading.Tasks;
using NewRelic.Agent.Api;
using NewRelic.Agent.Extensions.Providers.Wrapper;
using NewRelic.Reflection;

namespace NewRelic.Providers.Wrapper.AzureServiceBus
{
public class AzureServiceBusReceiverManagerWrapper : AzureServiceBusWrapperBase
{
public override bool IsTransactionRequired => false;
private Func<object, object> _receiverAccessor;
public override bool IsTransactionRequired => true;

public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo)
{
Expand All @@ -20,22 +23,41 @@ public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMetho
public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent,
ITransaction transaction)
{
// TODO not working at present -- transaction is always NoOpTransaction here but shouldn't be
// make sure the transaction ends when the receiver manager is done processing messages
var receiverManager = instrumentedMethodCall.MethodCall.InvocationTarget;
_receiverAccessor ??= VisibilityBypasser.Instance.GeneratePropertyAccessor<object>(receiverManager.GetType(), "Receiver");
dynamic receiver = _receiverAccessor(receiverManager);

string queueName = receiver.EntityPath; // some-queue-name
string fqns = receiver.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net

if (instrumentedMethodCall.IsAsync)
transaction.AttachToAsync();

// start a new MessageBroker segment that wraps ProcessOneMessageWithinScopeAsync
var segment = transaction.StartMessageBrokerSegment(
instrumentedMethodCall.MethodCall,
MessageBrokerDestinationType.Queue,
MessageBrokerAction.Process, // TODO: This is a new action, added for this instrumentation
BrokerVendorName,
queueName,
serverAddress: fqns);

if (instrumentedMethodCall.IsAsync)
{
return Delegates.GetAsyncDelegateFor<Task>(
agent,
agent.CurrentTransaction.CurrentSegment,
false,
segment,
true,
onComplete: _ =>
{
agent.CurrentTransaction.End();
segment.End();
transaction.End();
});
}
return Delegates.GetDelegateFor(onComplete: () =>
{
agent.CurrentTransaction.End();
segment.End();
transaction.End();
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho
_ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}")
};

if (instrumentedMethodCall.IsAsync)
{
transaction.AttachToAsync();
}

// start a message broker segment
var segment = transaction.StartMessageBrokerSegment(
instrumentedMethodCall.MethodCall,
Expand Down
Loading

0 comments on commit cbc8de2

Please sign in to comment.