From 11f085a2012c0f4778414c8db2651556ee0ef959 Mon Sep 17 00:00:00 2001 From: Alexey Orlenko Date: Tue, 17 Dec 2024 12:24:38 +0100 Subject: [PATCH] fix(telemetry): assign our own span IDs (#5091) Our logic assumed that span IDs were unique within a trace but that's incorrect: they can be reused as long as the spans don't overlap in time (e.g. sequential siblings). Now we assign our own IDs which are guaranteed to be unique within a trace as long as the trace has less than 18446744073709551616 (2**64) spans. `tracing` tries not to reuse them immediately so this wasn't caught in tests until we started running tests with WASM where the issue immediately became evident (presumably due to the lack of a high quality source of randomness but I haven't actually investigated what the reason behind the difference in behaviour is). --- libs/telemetry/src/collector.rs | 13 ++- libs/telemetry/src/exporter.rs | 6 +- libs/telemetry/src/id.rs | 95 +++++++++++++------ libs/telemetry/src/layer.rs | 28 ++++-- libs/telemetry/src/lib.rs | 2 +- query-engine/query-engine/src/cli.rs | 2 +- query-engine/query-engine/src/context.rs | 2 +- query-engine/query-engine/src/server/mod.rs | 2 +- query-engine/query-engine/src/tests/errors.rs | 2 +- 9 files changed, 103 insertions(+), 49 deletions(-) diff --git a/libs/telemetry/src/collector.rs b/libs/telemetry/src/collector.rs index 8273eae7179..2bed931624a 100644 --- a/libs/telemetry/src/collector.rs +++ b/libs/telemetry/src/collector.rs @@ -7,6 +7,7 @@ use serde::Serialize; use crate::id::{RequestId, SpanId}; use crate::models::{LogLevel, SpanKind}; +use crate::NextId; #[derive(Debug, Clone)] #[cfg_attr(test, derive(Serialize))] @@ -37,10 +38,10 @@ pub(crate) struct SpanBuilder { } impl SpanBuilder { - pub fn new(name: &'static str, id: impl Into, attrs_size_hint: usize) -> Self { + pub fn new(name: &'static str, attrs_size_hint: usize) -> Self { Self { request_id: None, - id: id.into(), + id: SpanId::next(), name: name.into(), start_time: SystemTime::now(), elapsed: ElapsedTimeCounter::start(), @@ -54,6 +55,10 @@ impl SpanBuilder { self.request_id } + pub fn span_id(&self) -> SpanId { + self.id + } + pub fn set_request_id(&mut self, request_id: RequestId) { self.request_id = Some(request_id); } @@ -74,10 +79,10 @@ impl SpanBuilder { self.links.push(link); } - pub fn end(self, parent_id: Option>) -> CollectedSpan { + pub fn end(self, parent_id: Option) -> CollectedSpan { CollectedSpan { id: self.id, - parent_id: parent_id.map(Into::into), + parent_id, name: self.name, start_time: self.start_time, duration: self.elapsed.elapsed_time(), diff --git a/libs/telemetry/src/exporter.rs b/libs/telemetry/src/exporter.rs index 7032999536b..7c5891e945a 100644 --- a/libs/telemetry/src/exporter.rs +++ b/libs/telemetry/src/exporter.rs @@ -272,6 +272,8 @@ impl AllowAttribute for InternalAttributesFilter { mod tests { use std::time::{Duration, SystemTime}; + use crate::NextId; + use super::*; use CaptureTarget::*; @@ -327,7 +329,7 @@ mod tests { let request_id = exporter.start_capturing(RequestId::next(), capture_all()).await; let span = CollectedSpan { - id: tracing::span::Id::from_u64(1).into(), + id: SpanId::try_from(1).unwrap(), parent_id: None, name: "test_span".into(), start_time: SystemTime::UNIX_EPOCH, @@ -381,7 +383,7 @@ mod tests { let request_id = exporter.start_capturing(RequestId::next(), capture_spans()).await; let span = CollectedSpan { - id: tracing::span::Id::from_u64(1).into(), + id: SpanId::try_from(1).unwrap(), parent_id: None, name: "test_span".into(), start_time: SystemTime::UNIX_EPOCH, diff --git a/libs/telemetry/src/id.rs b/libs/telemetry/src/id.rs index fce0b8ba305..b7ce6b6578b 100644 --- a/libs/telemetry/src/id.rs +++ b/libs/telemetry/src/id.rs @@ -16,9 +16,13 @@ impl SerializableNonZeroU64 { pub fn into_u64(self) -> u64 { self.0.get() } +} + +impl TryFrom for SerializableNonZeroU64 { + type Error = u64; - pub fn from_u64(value: u64) -> Option { - NonZeroU64::new(value).map(Self) + fn try_from(value: u64) -> Result { + NonZeroU64::new(value).map(Self).ok_or(value) } } @@ -66,24 +70,35 @@ impl<'de> Deserialize<'de> for SerializableNonZeroU64 { } } -/// A unique identifier for a span. It maps directly to [`tracing::span::Id`] assigned by -/// [`tracing_subscriber::registry::Registry`]. +/// A unique identifier for a span. +/// +/// We don't use the original span IDs assigned by the `tracing` `Subscriber` +/// because they are are only guaranteed to be unique among the spans active at +/// the same time. They may be reused after a span is closed (even for +/// successive sibling spans in the same trace as long as they don't overlap in +/// time), so they are ephemeral and cannot be stored. Since we do need to store +/// them and can only tolerate reuse across different traces but not in a single +/// trace, we generate our own IDs. #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] #[repr(transparent)] pub struct SpanId(SerializableNonZeroU64); -impl From<&tracing::span::Id> for SpanId { - fn from(id: &tracing::span::Id) -> Self { - Self(SerializableNonZeroU64(id.into_non_zero_u64())) +impl From for SpanId { + fn from(value: NonZeroU64) -> Self { + Self(SerializableNonZeroU64(value)) } } -impl From for SpanId { - fn from(id: tracing::span::Id) -> Self { - Self::from(&id) +impl TryFrom for SpanId { + type Error = u64; + + fn try_from(value: u64) -> Result { + SerializableNonZeroU64::try_from(value).map(Self) } } +impl NextId for SpanId {} + /// A unique identifier for an engine trace, representing a tree of spans. These /// internal traces *do not* correspond to OpenTelemetry trace IDs. One /// OpenTelemetry trace may contain multiple Prisma Client operations, each of @@ -91,39 +106,42 @@ impl From for SpanId { /// requests to the engine, we call these trace IDs "request IDs" to /// disambiguate and avoid confusion. /// -/// We don't use IDs of the root spans themselves for this purpose because span -/// IDs are only guaranteed to be unique among the spans active at the same -/// time. They may be reused after a span is closed, so they are not -/// historically unique. We store the collected spans and events for some short -/// time after the spans are closed until the client requests them, so we need -/// request IDs that are guaranteed to be unique for a very long period of time -/// (although they still don't necessarily have to be unique for the whole -/// lifetime of the process). +/// We store the collected spans and events for some short time after the spans +/// are closed until the client requests them, so we need request IDs that are +/// guaranteed to be unique for a very long period of time (although they still +/// don't necessarily have to be unique for the whole lifetime of the process). +/// +/// We don't use the root span IDs as the request IDs to have more flexibility +/// and allow clients to generate the request IDs on the client side, rather +/// than having us send the generated request ID back to the client. This +/// guarantees we can still fetch the traces from the engine even in a case of +/// an error and no response sent back to the client. #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] #[repr(transparent)] pub struct RequestId(SerializableNonZeroU64); impl RequestId { - pub fn next() -> Self { - static NEXT_ID: AtomicU64 = AtomicU64::new(1); - - let mut id = 0; - while id == 0 { - id = NEXT_ID.fetch_add(1, Ordering::Relaxed); - } - - Self(SerializableNonZeroU64(NonZeroU64::new(id).unwrap())) - } - pub fn into_u64(self) -> u64 { self.0.into_u64() } +} - pub fn from_u64(value: u64) -> Option { - SerializableNonZeroU64::from_u64(value).map(Self) +impl From for RequestId { + fn from(value: NonZeroU64) -> Self { + Self(SerializableNonZeroU64(value)) } } +impl TryFrom for RequestId { + type Error = u64; + + fn try_from(value: u64) -> Result { + SerializableNonZeroU64::try_from(value).map(Self) + } +} + +impl NextId for RequestId {} + impl Default for RequestId { fn default() -> Self { Self::next() @@ -137,3 +155,18 @@ impl FromStr for RequestId { SerializableNonZeroU64::from_str(s).map(Self) } } + +/// A trait for types that represent sequential IDs and can be losslessly +/// converted from [`NonZeroU64`]. +pub trait NextId: Sized + From { + fn next() -> Self { + static NEXT_ID: AtomicU64 = AtomicU64::new(1); + + let mut id = 0; + while id == 0 { + id = NEXT_ID.fetch_add(1, Ordering::Relaxed); + } + + Self::from(NonZeroU64::new(id).unwrap()) + } +} diff --git a/libs/telemetry/src/layer.rs b/libs/telemetry/src/layer.rs index cc0afc1b5d7..64a2bb66b0d 100644 --- a/libs/telemetry/src/layer.rs +++ b/libs/telemetry/src/layer.rs @@ -72,7 +72,7 @@ where { fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { let span = Self::require_span(id, &ctx); - let mut span_builder = SpanBuilder::new(span.name(), id, attrs.fields().len()); + let mut span_builder = SpanBuilder::new(span.name(), attrs.fields().len()); if let Some(request_id) = span .parent() @@ -98,11 +98,16 @@ where } fn on_follows_from(&self, span: &Id, follows: &Id, ctx: Context<'_, S>) { + let followed_span = Self::require_span(follows, &ctx); + let Some(followed_id) = followed_span.extensions().get::().map(|sb| sb.span_id()) else { + return; + }; + let span = Self::require_span(span, &ctx); let mut extensions = span.extensions_mut(); if let Some(span_builder) = extensions.get_mut::() { - span_builder.add_link(follows.into()); + span_builder.add_link(followed_id); } } @@ -116,12 +121,18 @@ where return; }; - let Some(request_id) = parent.extensions().get::().and_then(|sb| sb.request_id()) else { + let extensions = parent.extensions(); + + let Some(span_builder) = extensions.get::() else { + return; + }; + + let Some(request_id) = span_builder.request_id() else { return; }; let mut event_builder = EventBuilder::new( - parent.id().into(), + span_builder.span_id(), event.metadata().target(), event.metadata().level().into(), event.metadata().fields().len(), @@ -145,7 +156,10 @@ where return; }; - let parent_id = span.parent().map(|parent| parent.id()); + let parent_id = span + .parent() + .and_then(|parent| parent.extensions().get::().map(|sb| sb.span_id())); + let collected_span = span_builder.end(parent_id); self.collector.add_span(request_id, collected_span); @@ -182,7 +196,7 @@ impl field::Visit for SpanAttributeVisitor<'_, Filter> { fn record_u64(&mut self, field: &field::Field, value: u64) { match field.name() { REQUEST_ID_FIELD => { - if let Some(request_id) = RequestId::from_u64(value) { + if let Ok(request_id) = RequestId::try_from(value) { self.span_builder.set_request_id(request_id); } } @@ -279,7 +293,7 @@ impl field::Visit for EventAttributeVisitor<'_, Filter> #[cfg(test)] mod tests { use crate::collector::{AllowAttribute, CollectedEvent, CollectedSpan}; - use crate::id::RequestId; + use crate::id::{NextId, RequestId}; use super::*; diff --git a/libs/telemetry/src/lib.rs b/libs/telemetry/src/lib.rs index 0149243d6ac..8c85d3e086c 100644 --- a/libs/telemetry/src/lib.rs +++ b/libs/telemetry/src/lib.rs @@ -9,6 +9,6 @@ pub mod time; pub mod traceparent; pub use exporter::Exporter; -pub use id::RequestId; +pub use id::{NextId, RequestId}; pub use layer::layer; pub use traceparent::TraceParent; diff --git a/query-engine/query-engine/src/cli.rs b/query-engine/query-engine/src/cli.rs index d797b11e934..f2fbda908ec 100644 --- a/query-engine/query-engine/src/cli.rs +++ b/query-engine/query-engine/src/cli.rs @@ -9,7 +9,7 @@ use psl::parser_database::Files; use query_core::{protocol::EngineProtocol, schema}; use request_handlers::{dmmf, RequestBody, RequestHandler}; use std::{env, sync::Arc}; -use telemetry::RequestId; +use telemetry::{NextId, RequestId}; pub struct ExecuteRequest { query: String, diff --git a/query-engine/query-engine/src/context.rs b/query-engine/query-engine/src/context.rs index 2a0c8a0c48e..6155b8f3b46 100644 --- a/query-engine/query-engine/src/context.rs +++ b/query-engine/query-engine/src/context.rs @@ -12,7 +12,7 @@ use query_core::{ use request_handlers::{load_executor, ConnectorKind}; use std::{env, fmt, sync::Arc}; use telemetry::exporter::{CaptureSettings, CaptureTarget}; -use telemetry::RequestId; +use telemetry::{NextId, RequestId}; use tracing::Instrument; /// Prisma request context containing all immutable state of the process. diff --git a/query-engine/query-engine/src/server/mod.rs b/query-engine/query-engine/src/server/mod.rs index cf1e658f0a8..b0580fbf685 100644 --- a/query-engine/query-engine/src/server/mod.rs +++ b/query-engine/query-engine/src/server/mod.rs @@ -13,7 +13,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, Instant}; use telemetry::exporter::{CaptureSettings, CaptureTarget, TraceData}; -use telemetry::{RequestId, TraceParent}; +use telemetry::{NextId, RequestId, TraceParent}; use tracing::{Instrument, Span}; /// Starts up the graphql query engine server diff --git a/query-engine/query-engine/src/tests/errors.rs b/query-engine/query-engine/src/tests/errors.rs index 3e3a524e8d7..b144b1100b3 100644 --- a/query-engine/query-engine/src/tests/errors.rs +++ b/query-engine/query-engine/src/tests/errors.rs @@ -3,7 +3,7 @@ use enumflags2::make_bitflags; use indoc::{formatdoc, indoc}; use query_core::protocol::EngineProtocol; use serde_json::json; -use telemetry::RequestId; +use telemetry::{NextId, RequestId}; #[tokio::test] async fn connection_string_problems_give_a_nice_error() {