From caaf9396f641afbdd5dda9654fc00192e11265eb Mon Sep 17 00:00:00 2001 From: Alexey Orlenko Date: Tue, 17 Dec 2024 10:21:12 +0100 Subject: [PATCH] fix(qe-wasm): make traces consistent with node-api engine (#5090) * fix(query-engine-wasm): expand scope of `prisma:engine:query` span Expand the scope of the `prisma:engine:query` span to contain both the request handler and the response serialization, identical to the Node-API engine, preventing `prisma:engine:response_json_serialization` from being orphaned outside the trace. This is a follow up to https://github.com/prisma/prisma-engines/pull/5089. * [integration] * Add missing bits for transactions * [integration] * strip traceparents in attributes with driver adapters * [integration] --- Cargo.lock | 2 + libs/telemetry/src/formatting.rs | 22 +++++++++ libs/telemetry/src/lib.rs | 1 + quaint/Cargo.toml | 1 + quaint/src/connector/metrics.rs | 27 ++--------- quaint/src/connector/postgres/native/cache.rs | 3 +- query-engine/driver-adapters/Cargo.toml | 1 + query-engine/driver-adapters/src/queryable.rs | 5 +- .../query-engine-wasm/src/wasm/engine.rs | 47 ++++++++++--------- 9 files changed, 61 insertions(+), 48 deletions(-) create mode 100644 libs/telemetry/src/formatting.rs diff --git a/Cargo.lock b/Cargo.lock index e5eecdf2a1e..8167930b59e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1210,6 +1210,7 @@ dependencies = [ "serde-wasm-bindgen", "serde_json", "serde_repr", + "telemetry", "tokio", "tracing", "tracing-core", @@ -3586,6 +3587,7 @@ dependencies = [ "serde", "serde_json", "sqlformat", + "telemetry", "thiserror", "tiberius", "tokio", diff --git a/libs/telemetry/src/formatting.rs b/libs/telemetry/src/formatting.rs new file mode 100644 index 00000000000..9b3bdcf29be --- /dev/null +++ b/libs/telemetry/src/formatting.rs @@ -0,0 +1,22 @@ +use std::fmt; + +pub struct QueryForTracing<'a>(pub &'a str); + +impl fmt::Display for QueryForTracing<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", strip_query_traceparent(self.0)) + } +} + +pub fn strip_query_traceparent(query: &str) -> &str { + query.rsplit_once("/* traceparent=").map_or(query, |(str, remainder)| { + if remainder + .split_once("*/") + .is_some_and(|(_, suffix)| suffix.trim_end().is_empty()) + { + str.trim_end() + } else { + query + } + }) +} diff --git a/libs/telemetry/src/lib.rs b/libs/telemetry/src/lib.rs index 57f8338d2d5..0149243d6ac 100644 --- a/libs/telemetry/src/lib.rs +++ b/libs/telemetry/src/lib.rs @@ -1,6 +1,7 @@ pub mod collector; pub mod exporter; pub mod filter; +pub mod formatting; pub mod id; pub mod layer; pub mod models; diff --git a/quaint/Cargo.toml b/quaint/Cargo.toml index 7d066e74466..0c388c44596 100644 --- a/quaint/Cargo.toml +++ b/quaint/Cargo.toml @@ -71,6 +71,7 @@ fmt-sql = ["sqlformat"] [dependencies] connection-string = "0.2" percent-encoding = "2" +telemetry.path = "../libs/telemetry" tracing.workspace = true tracing-futures.workspace = true async-trait.workspace = true diff --git a/quaint/src/connector/metrics.rs b/quaint/src/connector/metrics.rs index 6d9449d7b0f..8607082f0a2 100644 --- a/quaint/src/connector/metrics.rs +++ b/quaint/src/connector/metrics.rs @@ -1,9 +1,11 @@ +use std::future::Future; + +use crosstarget_utils::time::ElapsedTimeCounter; use prisma_metrics::{counter, histogram}; +use telemetry::formatting::QueryForTracing; use tracing::{info_span, Instrument}; use crate::ast::{Params, Value}; -use crosstarget_utils::time::ElapsedTimeCounter; -use std::{fmt, future::Future}; pub async fn query<'a, F, T, U>( tag: &'static str, @@ -102,24 +104,3 @@ fn trace_query<'a>(query: &'a str, params: &'a [Value<'_>], result: &str, start: duration_ms = start.elapsed_time().as_millis() as u64, ); } - -struct QueryForTracing<'a>(&'a str); - -impl fmt::Display for QueryForTracing<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", strip_query_traceparent(self.0)) - } -} - -pub(super) fn strip_query_traceparent(query: &str) -> &str { - query.rsplit_once("/* traceparent=").map_or(query, |(str, remainder)| { - if remainder - .split_once("*/") - .is_some_and(|(_, suffix)| suffix.trim_end().is_empty()) - { - str.trim_end() - } else { - query - } - }) -} diff --git a/quaint/src/connector/postgres/native/cache.rs b/quaint/src/connector/postgres/native/cache.rs index 447326608fa..a9e4f2e9603 100644 --- a/quaint/src/connector/postgres/native/cache.rs +++ b/quaint/src/connector/postgres/native/cache.rs @@ -6,11 +6,10 @@ use std::{ use async_trait::async_trait; use lru_cache::LruCache; use postgres_types::Type; +use telemetry::formatting::strip_query_traceparent; use tokio::sync::Mutex; use tokio_postgres::{Client, Error, Statement}; -use crate::connector::metrics::strip_query_traceparent; - use super::query::{PreparedQuery, QueryMetadata, TypedQuery}; /// Types that can be used as a cache for prepared queries and statements. diff --git a/query-engine/driver-adapters/Cargo.toml b/query-engine/driver-adapters/Cargo.toml index 606b33e9642..bb942b44eb1 100644 --- a/query-engine/driver-adapters/Cargo.toml +++ b/query-engine/driver-adapters/Cargo.toml @@ -15,6 +15,7 @@ once_cell = "1.15" prisma-metrics.path = "../../libs/metrics" serde.workspace = true serde_json.workspace = true +telemetry.path = "../../libs/telemetry" tracing.workspace = true tracing-core = "0.1" uuid.workspace = true diff --git a/query-engine/driver-adapters/src/queryable.rs b/query-engine/driver-adapters/src/queryable.rs index fbd355e68d9..7746d6137f1 100644 --- a/query-engine/driver-adapters/src/queryable.rs +++ b/query-engine/driver-adapters/src/queryable.rs @@ -13,6 +13,7 @@ use quaint::{ prelude::{Query as QuaintQuery, Queryable as QuaintQueryable, ResultSet, TransactionCapable}, visitor::{self, Visitor}, }; +use telemetry::formatting::QueryForTracing; use tracing::{info_span, Instrument}; /// A JsQueryable adapts a Proxy to implement quaint's Queryable interface. It has the @@ -188,7 +189,7 @@ impl JsBaseQueryable { "prisma:engine:js:query:sql", "otel.kind" = "client", "db.system" = %self.db_system_name, - "db.query.text" = %sql, + "db.query.text" = %QueryForTracing(sql), user_facing = true, ); let result_set = self.proxy.query_raw(query).instrument(sql_span).await?; @@ -225,7 +226,7 @@ impl JsBaseQueryable { "prisma:engine:js:query:sql", "otel.kind" = "client", "db.system" = %self.db_system_name, - "db.query.text" = %sql, + "db.query.text" = %QueryForTracing(sql), user_facing = true, ); let affected_rows = self.proxy.execute_raw(query).instrument(sql_span).await?; diff --git a/query-engine/query-engine-wasm/src/wasm/engine.rs b/query-engine/query-engine-wasm/src/wasm/engine.rs index 637b0484e81..5adc474e5cf 100644 --- a/query-engine/query-engine-wasm/src/wasm/engine.rs +++ b/query-engine/query-engine-wasm/src/wasm/engine.rs @@ -207,24 +207,22 @@ impl QueryEngine { let query = RequestBody::try_from_str(&body, engine.engine_protocol())?; - async move { - let span = tracing::info_span!( - "prisma:engine:query", - user_facing = true, - request_id = tracing::field::Empty, - ); + let span = tracing::info_span!( + "prisma:engine:query", + user_facing = true, + request_id = tracing::field::Empty, + ); - let traceparent = start_trace(&request_id, &trace, &span, &exporter).await?; + let traceparent = start_trace(&request_id, &trace, &span, &exporter).await?; + async move { let handler = RequestHandler::new(engine.executor(), engine.query_schema(), engine.engine_protocol()); - let response = handler - .handle(query, tx_id.map(TxId::from), traceparent) - .instrument(span) - .await; + let response = handler.handle(query, tx_id.map(TxId::from), traceparent).await; let serde_span = tracing::info_span!("prisma:engine:response_json_serialization", user_facing = true); Ok(serde_span.in_scope(|| serde_json::to_string(&response))?) } + .instrument(span) .await } .with_subscriber(dispatcher) @@ -250,17 +248,22 @@ impl QueryEngine { user_facing = true, request_id = tracing::field::Empty, ); + start_trace(&request_id, &trace, &span, &exporter).await?; - let tx_opts: TransactionOptions = serde_json::from_str(&input)?; - match engine - .executor() - .start_tx(engine.query_schema().clone(), engine.engine_protocol(), tx_opts) - .await - { - Ok(tx_id) => Ok(json!({ "id": tx_id.to_string() }).to_string()), - Err(err) => Ok(map_known_error(err)?), + async move { + let tx_opts: TransactionOptions = serde_json::from_str(&input)?; + match engine + .executor() + .start_tx(engine.query_schema().clone(), engine.engine_protocol(), tx_opts) + .await + { + Ok(tx_id) => Ok(json!({ "id": tx_id.to_string() }).to_string()), + Err(err) => Ok(map_known_error(err)?), + } } + .instrument(span) + .await } .with_subscriber(dispatcher) .await @@ -286,9 +289,10 @@ impl QueryEngine { user_facing = true, request_id = tracing::field::Empty ); + start_trace(&request_id, &trace, &span, &exporter).await?; - match engine.executor().commit_tx(TxId::from(tx_id)).await { + match engine.executor().commit_tx(TxId::from(tx_id)).instrument(span).await { Ok(_) => Ok("{}".to_string()), Err(err) => Ok(map_known_error(err)?), } @@ -317,9 +321,10 @@ impl QueryEngine { user_facing = true, request_id = tracing::field::Empty, ); + start_trace(&request_id, &trace, &span, &exporter).await?; - match engine.executor().rollback_tx(TxId::from(tx_id)).await { + match engine.executor().rollback_tx(TxId::from(tx_id)).instrument(span).await { Ok(_) => Ok("{}".to_string()), Err(err) => Ok(map_known_error(err)?), }