Skip to content

Commit

Permalink
fix(qe-wasm): make traces consistent with node-api engine (#5090)
Browse files Browse the repository at this point in the history
* fix(query-engine-wasm): expand scope of `prisma:engine:query` span

Expand the scope of the `prisma:engine:query` span to contain both the
request handler and the response serialization, identical to the
Node-API engine, preventing `prisma:engine:response_json_serialization`
from being orphaned outside the trace.

This is a follow up to #5089.

* [integration]

* Add missing bits for transactions

* [integration]

* strip traceparents in attributes with driver adapters

* [integration]
  • Loading branch information
aqrln authored Dec 17, 2024
1 parent 40232c9 commit caaf939
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 48 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions libs/telemetry/src/formatting.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::fmt;

pub struct QueryForTracing<'a>(pub &'a str);

impl fmt::Display for QueryForTracing<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", strip_query_traceparent(self.0))
}
}

pub fn strip_query_traceparent(query: &str) -> &str {
query.rsplit_once("/* traceparent=").map_or(query, |(str, remainder)| {
if remainder
.split_once("*/")
.is_some_and(|(_, suffix)| suffix.trim_end().is_empty())
{
str.trim_end()
} else {
query
}
})
}
1 change: 1 addition & 0 deletions libs/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod collector;
pub mod exporter;
pub mod filter;
pub mod formatting;
pub mod id;
pub mod layer;
pub mod models;
Expand Down
1 change: 1 addition & 0 deletions quaint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ fmt-sql = ["sqlformat"]
[dependencies]
connection-string = "0.2"
percent-encoding = "2"
telemetry.path = "../libs/telemetry"
tracing.workspace = true
tracing-futures.workspace = true
async-trait.workspace = true
Expand Down
27 changes: 4 additions & 23 deletions quaint/src/connector/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::future::Future;

use crosstarget_utils::time::ElapsedTimeCounter;
use prisma_metrics::{counter, histogram};
use telemetry::formatting::QueryForTracing;
use tracing::{info_span, Instrument};

use crate::ast::{Params, Value};
use crosstarget_utils::time::ElapsedTimeCounter;
use std::{fmt, future::Future};

pub async fn query<'a, F, T, U>(
tag: &'static str,
Expand Down Expand Up @@ -102,24 +104,3 @@ fn trace_query<'a>(query: &'a str, params: &'a [Value<'_>], result: &str, start:
duration_ms = start.elapsed_time().as_millis() as u64,
);
}

struct QueryForTracing<'a>(&'a str);

impl fmt::Display for QueryForTracing<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", strip_query_traceparent(self.0))
}
}

pub(super) fn strip_query_traceparent(query: &str) -> &str {
query.rsplit_once("/* traceparent=").map_or(query, |(str, remainder)| {
if remainder
.split_once("*/")
.is_some_and(|(_, suffix)| suffix.trim_end().is_empty())
{
str.trim_end()
} else {
query
}
})
}
3 changes: 1 addition & 2 deletions quaint/src/connector/postgres/native/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use std::{
use async_trait::async_trait;
use lru_cache::LruCache;
use postgres_types::Type;
use telemetry::formatting::strip_query_traceparent;
use tokio::sync::Mutex;
use tokio_postgres::{Client, Error, Statement};

use crate::connector::metrics::strip_query_traceparent;

use super::query::{PreparedQuery, QueryMetadata, TypedQuery};

/// Types that can be used as a cache for prepared queries and statements.
Expand Down
1 change: 1 addition & 0 deletions query-engine/driver-adapters/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ once_cell = "1.15"
prisma-metrics.path = "../../libs/metrics"
serde.workspace = true
serde_json.workspace = true
telemetry.path = "../../libs/telemetry"
tracing.workspace = true
tracing-core = "0.1"
uuid.workspace = true
Expand Down
5 changes: 3 additions & 2 deletions query-engine/driver-adapters/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use quaint::{
prelude::{Query as QuaintQuery, Queryable as QuaintQueryable, ResultSet, TransactionCapable},
visitor::{self, Visitor},
};
use telemetry::formatting::QueryForTracing;
use tracing::{info_span, Instrument};

/// A JsQueryable adapts a Proxy to implement quaint's Queryable interface. It has the
Expand Down Expand Up @@ -188,7 +189,7 @@ impl JsBaseQueryable {
"prisma:engine:js:query:sql",
"otel.kind" = "client",
"db.system" = %self.db_system_name,
"db.query.text" = %sql,
"db.query.text" = %QueryForTracing(sql),
user_facing = true,
);
let result_set = self.proxy.query_raw(query).instrument(sql_span).await?;
Expand Down Expand Up @@ -225,7 +226,7 @@ impl JsBaseQueryable {
"prisma:engine:js:query:sql",
"otel.kind" = "client",
"db.system" = %self.db_system_name,
"db.query.text" = %sql,
"db.query.text" = %QueryForTracing(sql),
user_facing = true,
);
let affected_rows = self.proxy.execute_raw(query).instrument(sql_span).await?;
Expand Down
47 changes: 26 additions & 21 deletions query-engine/query-engine-wasm/src/wasm/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,24 +207,22 @@ impl QueryEngine {

let query = RequestBody::try_from_str(&body, engine.engine_protocol())?;

async move {
let span = tracing::info_span!(
"prisma:engine:query",
user_facing = true,
request_id = tracing::field::Empty,
);
let span = tracing::info_span!(
"prisma:engine:query",
user_facing = true,
request_id = tracing::field::Empty,
);

let traceparent = start_trace(&request_id, &trace, &span, &exporter).await?;
let traceparent = start_trace(&request_id, &trace, &span, &exporter).await?;

async move {
let handler = RequestHandler::new(engine.executor(), engine.query_schema(), engine.engine_protocol());
let response = handler
.handle(query, tx_id.map(TxId::from), traceparent)
.instrument(span)
.await;
let response = handler.handle(query, tx_id.map(TxId::from), traceparent).await;

let serde_span = tracing::info_span!("prisma:engine:response_json_serialization", user_facing = true);
Ok(serde_span.in_scope(|| serde_json::to_string(&response))?)
}
.instrument(span)
.await
}
.with_subscriber(dispatcher)
Expand All @@ -250,17 +248,22 @@ impl QueryEngine {
user_facing = true,
request_id = tracing::field::Empty,
);

start_trace(&request_id, &trace, &span, &exporter).await?;

let tx_opts: TransactionOptions = serde_json::from_str(&input)?;
match engine
.executor()
.start_tx(engine.query_schema().clone(), engine.engine_protocol(), tx_opts)
.await
{
Ok(tx_id) => Ok(json!({ "id": tx_id.to_string() }).to_string()),
Err(err) => Ok(map_known_error(err)?),
async move {
let tx_opts: TransactionOptions = serde_json::from_str(&input)?;
match engine
.executor()
.start_tx(engine.query_schema().clone(), engine.engine_protocol(), tx_opts)
.await
{
Ok(tx_id) => Ok(json!({ "id": tx_id.to_string() }).to_string()),
Err(err) => Ok(map_known_error(err)?),
}
}
.instrument(span)
.await
}
.with_subscriber(dispatcher)
.await
Expand All @@ -286,9 +289,10 @@ impl QueryEngine {
user_facing = true,
request_id = tracing::field::Empty
);

start_trace(&request_id, &trace, &span, &exporter).await?;

match engine.executor().commit_tx(TxId::from(tx_id)).await {
match engine.executor().commit_tx(TxId::from(tx_id)).instrument(span).await {
Ok(_) => Ok("{}".to_string()),
Err(err) => Ok(map_known_error(err)?),
}
Expand Down Expand Up @@ -317,9 +321,10 @@ impl QueryEngine {
user_facing = true,
request_id = tracing::field::Empty,
);

start_trace(&request_id, &trace, &span, &exporter).await?;

match engine.executor().rollback_tx(TxId::from(tx_id)).await {
match engine.executor().rollback_tx(TxId::from(tx_id)).instrument(span).await {
Ok(_) => Ok("{}".to_string()),
Err(err) => Ok(map_known_error(err)?),
}
Expand Down

0 comments on commit caaf939

Please sign in to comment.