Skip to content

Commit

Permalink
feat: query path instrumentation
Browse files Browse the repository at this point in the history
- spans added for buffer, parquet chunks along with number of files that
  are already in parquet cache along with the sql
  • Loading branch information
praveen-influx committed Mar 6, 2025
1 parent a1f0f2b commit 6ff33bc
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions influxdb3_cache/src/parquet_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ pub trait ParquetCacheOracle: Send + Sync + Debug {

// Get a receiver that is notified when a prune takes place and how much memory was freed
fn prune_notifier(&self) -> watch::Receiver<usize>;

// check in cache already
fn in_cache(&self, path: &Path) -> bool;
}

/// Concrete implementation of the [`ParquetCacheOracle`]
Expand Down Expand Up @@ -246,6 +249,10 @@ impl ParquetCacheOracle for MemCacheOracle {
fn prune_notifier(&self) -> watch::Receiver<usize> {
self.prune_notifier_tx.subscribe()
}

fn in_cache(&self, path: &Path) -> bool {
self.mem_store.cache.path_already_fetched(path)
}
}

/// Helper function for creation of a [`MemCachedObjectStore`] and [`MemCacheOracle`]
Expand Down
8 changes: 7 additions & 1 deletion influxdb3_server/src/query_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use tokio::sync::Semaphore;
use trace::ctx::SpanContext;
use trace::span::{Span, SpanExt, SpanRecorder};
use trace::{ctx::SpanContext, span::MetaValue};
use trace_http::ctx::RequestLogContext;
use tracker::{
AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore,
Expand Down Expand Up @@ -142,6 +142,12 @@ impl QueryExecutor for QueryExecutorImpl {
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
info!(%database, %query, ?params, "executing sql query");
let db = self.get_db_namespace(database, &span_ctx).await?;
let span_ctxt = span_ctx
.clone()
.map(|span| span.child("query_database_sql"));
let mut recorder = SpanRecorder::new(span_ctxt);
recorder.set_metadata("query", MetaValue::String(query.to_string().into()));

query_database_sql(
db,
query,
Expand Down
1 change: 1 addition & 0 deletions influxdb3_write/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ metric.workspace = true
parquet_file.workspace = true
observability_deps.workspace = true
schema.workspace = true
trace.workspace = true

# Local deps
influxdb3_cache = { path = "../influxdb3_cache" }
Expand Down
42 changes: 40 additions & 2 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod persisted_files;
pub mod queryable_buffer;
mod table_buffer;
use tokio::sync::{oneshot, watch::Receiver};
use trace::span::{MetaValue, SpanRecorder};
pub mod validator;

use crate::persister::Persister;
Expand Down Expand Up @@ -41,14 +42,14 @@ use influxdb3_wal::{
use influxdb3_wal::{CatalogOp::CreateLastCache, DeleteTableDefinition};
use influxdb3_wal::{DatabaseDefinition, FieldDefinition};
use influxdb3_wal::{DeleteDatabaseDefinition, object_store::WalObjectStore};
use iox_query::QueryChunk;
use iox_query::chunk_statistics::{NoColumnRanges, create_chunk_statistics};
use iox_query::{QueryChunk, exec::SessionContextIOxExt};
use iox_time::{Time, TimeProvider};
use metric::Registry;
use metrics::WriteMetrics;
use object_store::path::Path as ObjPath;
use object_store::{ObjectMeta, ObjectStore};
use observability_deps::tracing::{debug, error, warn};
use observability_deps::tracing::{debug, warn};
use parquet_file::storage::ParquetExecInput;
use queryable_buffer::QueryableBufferArgs;
use schema::Schema;
Expand Down Expand Up @@ -331,17 +332,30 @@ impl WriteBufferImpl {
projection: Option<&Vec<usize>>,
ctx: &dyn Session,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
let span_ctx = ctx.span_ctx().map(|span| span.child("table_chunks"));
let mut recorder = SpanRecorder::new(span_ctx);

let mut chunks = self.buffer.get_table_chunks(
Arc::clone(&db_schema),
Arc::clone(&table_def),
filter,
projection,
ctx,
)?;
let num_chunks_from_buffer = chunks.len();
recorder.set_metadata(
"buffer_chunks",
MetaValue::Int(num_chunks_from_buffer as i64),
);

let parquet_files =
self.persisted_files
.get_files_filtered(db_schema.id, table_def.table_id, filter);
let num_parquet_files_needed = parquet_files.len();
recorder.set_metadata(
"parquet_files",
MetaValue::Int(num_parquet_files_needed as i64),
);

if parquet_files.len() > self.query_file_limit {
return Err(DataFusionError::External(
Expand All @@ -358,6 +372,30 @@ impl WriteBufferImpl {
));
}

if let Some(parquet_cache) = &self.parquet_cache {
let num_files_already_in_cache = parquet_files
.iter()
.filter(|f| {
parquet_cache
.in_cache(&ObjPath::parse(&f.path).expect("obj path should be parseable"))
})
.count();

recorder.set_metadata(
"parquet_files_already_in_cache",
MetaValue::Int(num_files_already_in_cache as i64),
);
debug!(
num_chunks_from_buffer,
num_parquet_files_needed, num_files_already_in_cache, ">>> query chunks breakdown"
);
} else {
debug!(
num_chunks_from_buffer,
num_parquet_files_needed, ">>> query chunks breakdown (cache disabled)"
);
}

let mut chunk_order = chunks.len() as i64;
// Although this sends a cache request, it does not mean all these
// files will be cached. This depends on parquet cache's capacity
Expand Down

0 comments on commit 6ff33bc

Please sign in to comment.