Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure AI inference events timestamp in AzMon workaround #38955

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 70 additions & 38 deletions sdk/ai/azure-ai-inference/azure/ai/inference/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import importlib
import logging
import os
from time import time_ns
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
from urllib.parse import urlparse

Expand Down Expand Up @@ -193,23 +194,27 @@ def _set_attributes(self, span: "AbstractSpan", *attrs: Tuple[str, Any]) -> None
if value is not None:
span.add_attribute(key, value)

def _add_request_chat_message_event(self, span: "AbstractSpan", **kwargs: Any) -> None:
def _add_request_chat_message_events(self, span: "AbstractSpan", **kwargs: Any) -> int:
timestamp = 0
for message in kwargs.get("messages", []):
try:
message = message.as_dict()
except AttributeError:
pass

if message.get("role"):
name = f"gen_ai.{message.get('role')}.message"
span.span_instance.add_event(
name=name,
attributes={
timestamp = self._record_event(
span,
f"gen_ai.{message.get('role')}.message",
{
"gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME,
"gen_ai.event.content": json.dumps(message),
},
timestamp,
)

return timestamp

def _parse_url(self, url):
parsed = urlparse(url)
server_address = parsed.hostname
Expand Down Expand Up @@ -280,8 +285,11 @@ def _get_finish_reason_for_choice(self, choice):

return "none"

def _add_response_chat_message_event(self, span: "AbstractSpan", result: _models.ChatCompletions) -> None:
def _add_response_chat_message_events(self, span: "AbstractSpan",
result: _models.ChatCompletions, last_event_timestamp_ns: int
) -> None:
for choice in result.choices:
attributes = {}
if _trace_inference_content:
full_response: Dict[str, Any] = {
"message": {"content": choice.message.content},
Expand Down Expand Up @@ -312,7 +320,7 @@ def _add_response_chat_message_event(self, span: "AbstractSpan", result: _models
"gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME,
"gen_ai.event.content": json.dumps(response),
}
span.span_instance.add_event(name="gen_ai.choice", attributes=attributes)
last_event_timestamp_ns = self._record_event(span, "gen_ai.choice", attributes, last_event_timestamp_ns)

def _add_response_chat_attributes(
self,
Expand All @@ -336,15 +344,16 @@ def _add_response_chat_attributes(
if not finish_reasons is None:
span.add_attribute("gen_ai.response.finish_reasons", finish_reasons) # type: ignore

def _add_request_span_attributes(self, span: "AbstractSpan", _span_name: str, args: Any, kwargs: Any) -> None:
def _add_request_details(self, span: "AbstractSpan", args: Any, kwargs: Any) -> int:
self._add_request_chat_attributes(span, *args, **kwargs)
if _trace_inference_content:
self._add_request_chat_message_event(span, **kwargs)
return self._add_request_chat_message_events(span, **kwargs)
return 0

def _add_response_span_attributes(self, span: "AbstractSpan", result: object) -> None:
def _add_response_details(self, span: "AbstractSpan", result: object, last_event_timestamp_ns: int) -> None:
if isinstance(result, _models.ChatCompletions):
self._add_response_chat_attributes(span, result)
self._add_response_chat_message_event(span, result)
self._add_response_chat_message_events(span, result, last_event_timestamp_ns)
# TODO add more models here

def _accumulate_response(self, item, accumulate: Dict[str, Any]) -> None:
Expand Down Expand Up @@ -410,7 +419,7 @@ def _accumulate_async_streaming_response(self, item, accumulate: Dict[str, Any])
accumulate["message"]["tool_calls"][-1]["function"]["arguments"] += tool_call.function.arguments

def _wrapped_stream(
self, stream_obj: _models.StreamingChatCompletions, span: "AbstractSpan"
self, stream_obj: _models.StreamingChatCompletions, span: "AbstractSpan", previous_event_timestamp: int
) -> _models.StreamingChatCompletions:
class StreamWrapper(_models.StreamingChatCompletions):
def __init__(self, stream_obj, instrumentor):
Expand Down Expand Up @@ -467,29 +476,27 @@ def __iter__( # pyright: ignore [reportIncompatibleMethodOverride]
accumulate["message"]["tool_calls"] = list(
tool_calls_function_names_and_arguments_removed
)

span.span_instance.add_event(
name="gen_ai.choice",
attributes={
"gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME,
"gen_ai.event.content": json.dumps(accumulate),
},
)
attributes={
"gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME,
"gen_ai.event.content": json.dumps(accumulate),
}
self._instrumentor._record_event(span, "gen_ai.choice", attributes, previous_event_timestamp)
span.finish()

return StreamWrapper(stream_obj, self)

def _async_wrapped_stream(
self, stream_obj: _models.AsyncStreamingChatCompletions, span: "AbstractSpan"
self, stream_obj: _models.AsyncStreamingChatCompletions, span: "AbstractSpan", last_event_timestamp_ns: int
) -> _models.AsyncStreamingChatCompletions:
class AsyncStreamWrapper(_models.AsyncStreamingChatCompletions):
def __init__(self, stream_obj, instrumentor, span):
def __init__(self, stream_obj, instrumentor, span, last_event_timestamp_ns):
super().__init__(stream_obj._response)
self._instrumentor = instrumentor
self._accumulate: Dict[str, Any] = {}
self._stream_obj = stream_obj
self.span = span
self._last_result = None
self._last_event_timestamp_ns = last_event_timestamp_ns

async def __anext__(self) -> "_models.StreamingChatCompletionsUpdate":
try:
Expand Down Expand Up @@ -523,19 +530,44 @@ def _trace_stream_content(self) -> None:
self._accumulate["message"]["tool_calls"]
)
self._accumulate["message"]["tool_calls"] = list(tools_no_recording)

self.span.span_instance.add_event(
name="gen_ai.choice",
attributes={
"gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME,
"gen_ai.event.content": json.dumps(self._accumulate),
},
)
attributes={
"gen_ai.system": _INFERENCE_GEN_AI_SYSTEM_NAME,
"gen_ai.event.content": json.dumps(self._accumulate),
}
self._last_event_timestamp_ns = self._instrumentor._record_event( # pylint: disable=protected-access, line-too-long # pyright: ignore [reportFunctionMemberAccess]
span,
"gen_ai.choice",
attributes,
self._last_event_timestamp_ns
)
span.finish()

async_stream_wrapper = AsyncStreamWrapper(stream_obj, self, span)
async_stream_wrapper = AsyncStreamWrapper(stream_obj, self, span, last_event_timestamp_ns)
return async_stream_wrapper

def _record_event(self, span: "AbstractSpan", name: str,
attributes: Dict[str, Any], last_event_timestamp_ns: int
) -> int:
timestamp = time_ns()

# we're recording multiple events, some of them are emitted within (hundreds of) nanoseconds of each other.
# time.time_ns resolution is not high enough on windows to guarantee unique timestamps for each message.
# Also Azure Monitor truncates resolution to microseconds and some other backends truncate to milliseconds.
#
# But we need to give users a way to restore event order, so we're incrementing the timestamp
# by 1 microsecond for each message.
#
# This is a workaround, we'll find a generic and better solution - see
# https://github.com/open-telemetry/semantic-conventions/issues/1701
if last_event_timestamp_ns > 0 and timestamp <= (last_event_timestamp_ns + 1000):
timestamp = last_event_timestamp_ns + 1000

span.span_instance.add_event(name=name,
attributes=attributes,
timestamp=timestamp)

return timestamp

def _trace_sync_function(
self,
function: Callable,
Expand Down Expand Up @@ -580,16 +612,16 @@ def inner(*args, **kwargs):
name=span_name,
kind=SpanKind.CLIENT, # pyright: ignore [reportPossiblyUnboundVariable]
)

try:
# tracing events not supported in azure-core-tracing-opentelemetry
# so need to access the span instance directly
with span_impl_type.change_context(span.span_instance):
self._add_request_span_attributes(span, span_name, args, kwargs)
last_event_timestamp_ns = self._add_request_details(span, args, kwargs)
result = function(*args, **kwargs)
if kwargs.get("stream") is True:
return self._wrapped_stream(result, span)
self._add_response_span_attributes(span, result)

return self._wrapped_stream(result, span, last_event_timestamp_ns)
self._add_response_details(span, result, last_event_timestamp_ns)
except Exception as exc:
# Set the span status to error
if isinstance(span.span_instance, Span): # pyright: ignore [reportPossiblyUnboundVariable]
Expand Down Expand Up @@ -659,11 +691,11 @@ async def inner(*args, **kwargs):
# tracing events not supported in azure-core-tracing-opentelemetry
# so need to access the span instance directly
with span_impl_type.change_context(span.span_instance):
self._add_request_span_attributes(span, span_name, args, kwargs)
last_event_timestamp_ns = self._add_request_details(span, args, kwargs)
result = await function(*args, **kwargs)
if kwargs.get("stream") is True:
return self._async_wrapped_stream(result, span)
self._add_response_span_attributes(span, result)
return self._async_wrapped_stream(result, span, last_event_timestamp_ns)
self._add_response_details(span, result, last_event_timestamp_ns)

except Exception as exc:
# Set the span status to error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
where `your-deployment-name` is your unique AI Model deployment name, and
`your-azure-region` is the Azure region where your model is deployed.
2) AZURE_AI_CHAT_KEY - Your model key (a 32-character string). Keep it secret.
3) AZURE_TRACING_GEN_AI_CONTENT_RECORDING_ENABLED - Optional. Set to 'true'
for detailed traces, including chat request and response messages.
3) AZURE_TRACING_GEN_AI_CONTENT_RECORDING_ENABLED - Set to 'true' to enable content recording.
"""


Expand Down
7 changes: 7 additions & 0 deletions sdk/ai/azure-ai-inference/tests/gen_ai_trace_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,11 @@ def check_span_events(self, span, expected_events):
if len(span_events) > 0: # If there are any additional events in the span_events
return False

prev_event = None
for actual_event in list(span.events):
if prev_event is not None and actual_event.timestamp <= prev_event.timestamp:
print(f"Event {actual_event.name} has a timestamp {actual_event.timestamp} that is not greater than the previous event's timestamp {prev_event.timestamp}, {prev_event.name}")
return False
prev_event = actual_event

return True
Loading