diff --git a/Cargo.lock b/Cargo.lock index 776d4b09a62..8cc836f4c64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3303,6 +3303,7 @@ dependencies = [ "test_helpers", "thiserror 1.0.69", "tokio", + "trace", "url", "uuid", ] diff --git a/influxdb3_cache/src/parquet_cache/mod.rs b/influxdb3_cache/src/parquet_cache/mod.rs index 7cb6727b1aa..25c7fa251df 100644 --- a/influxdb3_cache/src/parquet_cache/mod.rs +++ b/influxdb3_cache/src/parquet_cache/mod.rs @@ -158,6 +158,9 @@ pub trait ParquetCacheOracle: Send + Sync + Debug { // Get a receiver that is notified when a prune takes place and how much memory was freed fn prune_notifier(&self) -> watch::Receiver; + + // check in cache already + fn in_cache(&self, path: &Path) -> bool; } /// Concrete implementation of the [`ParquetCacheOracle`] @@ -246,6 +249,10 @@ impl ParquetCacheOracle for MemCacheOracle { fn prune_notifier(&self) -> watch::Receiver { self.prune_notifier_tx.subscribe() } + + fn in_cache(&self, path: &Path) -> bool { + self.mem_store.cache.path_already_fetched(path) + } } /// Helper function for creation of a [`MemCachedObjectStore`] and [`MemCacheOracle`] diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index 0e5b56a7d75..e04bee6ac10 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -43,8 +43,8 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; use tokio::sync::Semaphore; -use trace::ctx::SpanContext; use trace::span::{Span, SpanExt, SpanRecorder}; +use trace::{ctx::SpanContext, span::MetaValue}; use trace_http::ctx::RequestLogContext; use tracker::{ AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore, @@ -142,6 +142,12 @@ impl QueryExecutor for QueryExecutorImpl { ) -> Result { info!(%database, %query, ?params, "executing sql query"); let db = self.get_db_namespace(database, &span_ctx).await?; + let span_ctxt = span_ctx + .clone() + .map(|span| span.child("query_database_sql")); + let mut recorder = SpanRecorder::new(span_ctxt); + recorder.set_metadata("query", MetaValue::String(query.to_string().into())); + query_database_sql( db, query, diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index 170d035bc5c..ddc5b141705 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -25,6 +25,7 @@ metric.workspace = true parquet_file.workspace = true observability_deps.workspace = true schema.workspace = true +trace.workspace = true # Local deps influxdb3_cache = { path = "../influxdb3_cache" } diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index eca5244a527..36bc977df03 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -5,6 +5,7 @@ pub mod persisted_files; pub mod queryable_buffer; mod table_buffer; use tokio::sync::{oneshot, watch::Receiver}; +use trace::span::{MetaValue, SpanRecorder}; pub mod validator; use crate::persister::Persister; @@ -41,14 +42,14 @@ use influxdb3_wal::{ use influxdb3_wal::{CatalogOp::CreateLastCache, DeleteTableDefinition}; use influxdb3_wal::{DatabaseDefinition, FieldDefinition}; use influxdb3_wal::{DeleteDatabaseDefinition, object_store::WalObjectStore}; -use iox_query::QueryChunk; use iox_query::chunk_statistics::{NoColumnRanges, create_chunk_statistics}; +use iox_query::{QueryChunk, exec::SessionContextIOxExt}; use iox_time::{Time, TimeProvider}; use metric::Registry; use metrics::WriteMetrics; use object_store::path::Path as ObjPath; use object_store::{ObjectMeta, ObjectStore}; -use observability_deps::tracing::{debug, error, warn}; +use observability_deps::tracing::{debug, warn}; use parquet_file::storage::ParquetExecInput; use queryable_buffer::QueryableBufferArgs; use schema::Schema; @@ -331,6 +332,9 @@ impl WriteBufferImpl { projection: Option<&Vec>, ctx: &dyn Session, ) -> Result>, DataFusionError> { + let span_ctx = ctx.span_ctx().map(|span| span.child("table_chunks")); + let mut recorder = SpanRecorder::new(span_ctx); + let mut chunks = self.buffer.get_table_chunks( Arc::clone(&db_schema), Arc::clone(&table_def), @@ -338,10 +342,20 @@ impl WriteBufferImpl { projection, ctx, )?; + let num_chunks_from_buffer = chunks.len(); + recorder.set_metadata( + "buffer_chunks", + MetaValue::Int(num_chunks_from_buffer as i64), + ); let parquet_files = self.persisted_files .get_files_filtered(db_schema.id, table_def.table_id, filter); + let num_parquet_files_needed = parquet_files.len(); + recorder.set_metadata( + "parquet_files", + MetaValue::Int(num_parquet_files_needed as i64), + ); if parquet_files.len() > self.query_file_limit { return Err(DataFusionError::External( @@ -358,6 +372,30 @@ impl WriteBufferImpl { )); } + if let Some(parquet_cache) = &self.parquet_cache { + let num_files_already_in_cache = parquet_files + .iter() + .filter(|f| { + parquet_cache + .in_cache(&ObjPath::parse(&f.path).expect("obj path should be parseable")) + }) + .count(); + + recorder.set_metadata( + "parquet_files_already_in_cache", + MetaValue::Int(num_files_already_in_cache as i64), + ); + debug!( + num_chunks_from_buffer, + num_parquet_files_needed, num_files_already_in_cache, ">>> query chunks breakdown" + ); + } else { + debug!( + num_chunks_from_buffer, + num_parquet_files_needed, ">>> query chunks breakdown (cache disabled)" + ); + } + let mut chunk_order = chunks.len() as i64; // Although this sends a cache request, it does not mean all these // files will be cached. This depends on parquet cache's capacity