diff --git a/benchmarks/src/bin/cache_server.rs b/benchmarks/src/bin/cache_server.rs index efc89d61375e2..afcfe9f18731c 100644 --- a/benchmarks/src/bin/cache_server.rs +++ b/benchmarks/src/bin/cache_server.rs @@ -21,26 +21,26 @@ use arrow_flight::flight_descriptor::DescriptorType; use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; use arrow_flight::sql::server::{FlightSqlService, PeekableFlightDataStream}; use arrow_flight::sql::{ - ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest, - ActionCreatePreparedStatementResult, Any, CommandGetDbSchemas, CommandGetTables, - CommandPreparedStatementQuery, CommandPreparedStatementUpdate, CommandStatementQuery, - ProstMessageExt, SqlInfo, + ActionClosePreparedStatementRequest, Any, CommandGetDbSchemas, + CommandPreparedStatementUpdate, CommandStatementQuery, ProstMessageExt, SqlInfo, }; use arrow_flight::{ Action, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, - HandshakeResponse, IpcMessage, SchemaAsIpc, Ticket, + HandshakeResponse, Ticket, }; use dashmap::DashMap; use datafusion::execution::object_store::ObjectStoreUrl; -use datafusion::logical_expr::LogicalPlan; use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; +use datafusion_flight_table::GcStream; use futures::{Stream, TryStreamExt}; use log::{debug, info}; use mimalloc::MiMalloc; +use parquet::arrow::arrow_cache::ArrowArrayCache; use prost::Message; use std::path::PathBuf; use std::pin::Pin; +use std::sync::atomic::AtomicU32; use std::sync::Arc; use structopt::StructOpt; use tonic::metadata::MetadataValue; @@ -114,15 +114,15 @@ async fn main() -> Result<(), Box> { ctx.runtime_env().object_store(url).unwrap() }; let mut parquet_options: ParquetReadOptions<'_> = Default::default(); - use datafusion::datasource::physical_plan::parquet::Parquet7FileReaderFactory; - parquet_options.reader = Some(Arc::new(Parquet7FileReaderFactory::new(object_store))); + use datafusion::datasource::physical_plan::parquet::Parquet8FileReaderFactory; + parquet_options.reader = Some(Arc::new(Parquet8FileReaderFactory::new(object_store))); // register parquet file with the execution context ctx.register_parquet(&file_name, &options.path.to_string_lossy(), parquet_options) .await .map_err(|e| status!("Error registering table", e))?; - let service = FlightSqlServiceImpl::new(file_name, options.path, ctx); + let service = FlightSqlServiceImpl::new(file_name, ctx); info!("Listening on {addr:?}"); let svc = FlightServiceServer::new(service); @@ -132,77 +132,36 @@ async fn main() -> Result<(), Box> { } pub struct FlightSqlServiceImpl { - contexts: Arc>>, - statements: Arc>, - results: Arc>>, + execution_plans: Arc>>, default_ctx: Arc, table_name: String, - table_path: String, + iteration: AtomicU32, } impl FlightSqlServiceImpl { - fn new(table_name: String, path: PathBuf, default_ctx: Arc) -> Self { + fn new(table_name: String, default_ctx: Arc) -> Self { Self { - contexts: Default::default(), - statements: Default::default(), - results: Default::default(), + execution_plans: Default::default(), table_name, - table_path: path.to_string_lossy().to_string(), default_ctx, + iteration: AtomicU32::new(0), } } } impl FlightSqlServiceImpl { - async fn create_ctx(&self) -> Result { - let uuid = Uuid::new_v4().hyphenated().to_string(); - - let session_config = SessionConfig::from_env() - .map_err(|e| Status::internal(format!("Error building plan: {e}")))? - .with_information_schema(true); - let ctx = Arc::new(SessionContext::new_with_config(session_config)); - - // register parquet file with the execution context - ctx.register_parquet( - &self.table_name, - &self.table_path, - ParquetReadOptions::default(), - ) - .await - .map_err(|e| status!("Error registering table", e))?; - - self.contexts.insert(uuid.clone(), ctx); + fn create_ctx(&self) -> Result { + let uuid = Uuid::new_v4(); debug!("Created context with uuid: {uuid}"); Ok(uuid) } - fn get_ctx(&self, req: &Request) -> Result, Status> { - // get the token from the authorization header on Request - if let Some(auth) = req.metadata().get("authorization") { - let str = auth - .to_str() - .map_err(|e| Status::internal(format!("Error parsing header: {e}")))?; - let authorization = str.to_string(); - let bearer = "Bearer "; - if !authorization.starts_with(bearer) { - Err(Status::internal("Invalid auth header!"))?; - } - let auth = authorization[bearer.len()..].to_string(); - - if let Some(context) = self.contexts.get(&auth) { - Ok(context.clone()) - } else { - Err(Status::internal(format!( - "Context handle not found: {auth}" - )))? - } - } else { - Ok(self.default_ctx.clone()) - } + fn get_ctx(&self, _req: &Request) -> Result, Status> { + Ok(self.default_ctx.clone()) } fn get_result(&self, handle: &str) -> Result, Status> { - if let Some(result) = self.results.get(handle) { + if let Some(result) = self.execution_plans.get(handle) { Ok(result.clone()) } else { Err(Status::internal(format!( @@ -211,13 +170,8 @@ impl FlightSqlServiceImpl { } } - fn remove_plan(&self, handle: &str) -> Result<(), Status> { - self.statements.remove(&handle.to_string()); - Ok(()) - } - fn remove_result(&self, handle: &str) -> Result<(), Status> { - self.results.remove(&handle.to_string()); + self.execution_plans.remove(&handle.to_string()); Ok(()) } } @@ -238,7 +192,7 @@ impl FlightSqlService for FlightSqlServiceImpl { // see Ballista implementation for example of basic auth // in this case, we simply accept the connection and create a new SessionContext // the SessionContext will be re-used within this same connection/session - let token = self.create_ctx().await?; + let token = self.create_ctx()?; let result = HandshakeResponse { protocol_version: 0, @@ -307,11 +261,10 @@ impl FlightSqlService for FlightSqlServiceImpl { debug!("execution plan schema: {:?}", schema); let stream = execution_plan .execute(fetch_results.partition as usize, ctx.task_ctx()) - .unwrap() - // .map_err(|e| status!("Error executing plan", e))? - .map_err(|e| { - panic!("Error executing plan: {:?}", e); - }); + .unwrap(); + let stream = GcStream::new(stream).map_err(|e| { + panic!("Error executing plan: {:?}", e); + }); let ipc_options = IpcWriteOptions::default().with_preserve_dict_id(false); let stream = FlightDataEncoderBuilder::new() @@ -330,6 +283,16 @@ impl FlightSqlService for FlightSqlServiceImpl { let user_query = cmd.query.as_str(); info!("running query: {user_query}"); + let iteration = self + .iteration + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if iteration == 1 { + // A dirty hack to show memory usage. + let current_cache_memory_usage = + ArrowArrayCache::get().stats().memory_usage(); + info!("cache memory usage: {}", current_cache_memory_usage); + } + let ctx = self.get_ctx(&request)?; let plan = ctx.sql(user_query).await.expect("Error generating plan"); @@ -345,7 +308,7 @@ impl FlightSqlService for FlightSqlServiceImpl { let schema = physical_plan.schema(); let handle = Uuid::new_v4().hyphenated().to_string(); - self.results.insert(handle.clone(), physical_plan); + self.execution_plans.insert(handle.clone(), physical_plan); let flight_desc = FlightDescriptor { r#type: DescriptorType::Cmd.into(), @@ -373,24 +336,6 @@ impl FlightSqlService for FlightSqlServiceImpl { Ok(resp) } - async fn get_flight_info_prepared_statement( - &self, - _cmd: CommandPreparedStatementQuery, - _request: Request, - ) -> Result, Status> { - info!("get_flight_info_prepared_statement"); - panic!("not implemented"); - } - - async fn get_flight_info_tables( - &self, - _query: CommandGetTables, - _request: Request, - ) -> Result, Status> { - info!("get_flight_info_tables"); - panic!("not implemented"); - } - async fn do_put_prepared_statement_update( &self, _handle: CommandPreparedStatementUpdate, @@ -402,42 +347,6 @@ impl FlightSqlService for FlightSqlServiceImpl { Ok(-1) } - async fn do_action_create_prepared_statement( - &self, - query: ActionCreatePreparedStatementRequest, - request: Request, - ) -> Result { - let user_query = query.query.as_str(); - info!("do_action_create_prepared_statement: {user_query}"); - - let ctx = self.get_ctx(&request)?; - - let plan = ctx - .sql(user_query) - .await - .and_then(|df| df.into_optimized_plan()) - .map_err(|e| Status::internal(format!("Error building plan: {e}")))?; - - // store a copy of the plan, it will be used for execution - let plan_uuid = Uuid::new_v4().hyphenated().to_string(); - self.statements.insert(plan_uuid.clone(), plan.clone()); - - let plan_schema = plan.schema(); - - let arrow_schema = (&**plan_schema).into(); - let message = SchemaAsIpc::new(&arrow_schema, &IpcWriteOptions::default()) - .try_into() - .map_err(|e| status!("Unable to serialize schema", e))?; - let IpcMessage(schema_bytes) = message; - - let res = ActionCreatePreparedStatementResult { - prepared_statement_handle: plan_uuid.into(), - dataset_schema: schema_bytes, - parameter_schema: Default::default(), - }; - Ok(res) - } - async fn do_action_close_prepared_statement( &self, handle: ActionClosePreparedStatementRequest, @@ -446,7 +355,6 @@ impl FlightSqlService for FlightSqlServiceImpl { let handle = std::str::from_utf8(&handle.prepared_statement_handle); if let Ok(handle) = handle { info!("do_action_close_prepared_statement: removing plan and results for {handle}"); - let _ = self.remove_plan(handle); let _ = self.remove_result(handle); } Ok(()) diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index fe0519d974345..31afa138ce33d 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -29,6 +29,8 @@ use datafusion::execution::cache::cache_unit::Cache37; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::physical_plan::collect; use datafusion::physical_plan::display::DisplayableExecutionPlan; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::ExecutionPlanVisitor; use datafusion::prelude::ParquetReadOptions; use datafusion::{ error::{DataFusionError, Result}, @@ -38,6 +40,7 @@ use datafusion_common::exec_datafusion_err; use datafusion_common::instant::Instant; use datafusion_flight_table::sql::FlightSqlDriver; use datafusion_flight_table::sql::USERNAME; +use datafusion_flight_table::FlightExec; use datafusion_flight_table::FlightTableFactory; use object_store::aws::AmazonS3Builder; use object_store::ObjectStore; @@ -229,6 +232,7 @@ impl RunOpt { pretty::print_batches(&result)?; } } + let elapsed = start.elapsed(); let ms = elapsed.as_secs_f64() * 1000.0; let row_count: usize = result.iter().map(|b| b.num_rows()).sum(); @@ -297,10 +301,17 @@ impl RunOpt { } } - benchmark_run.write_iter( + let mut metrics_visitor = ExecutionPlanMetricCollector::new(); + datafusion::physical_plan::visit_execution_plan( + physical_plan.as_ref(), + &mut metrics_visitor, + )?; + + benchmark_run.write_iter_with_metrics( elapsed, row_count, Cache37::consume_bytes_read(), + metrics_visitor.into_hashmap(), ); } } @@ -375,3 +386,40 @@ impl RunOpt { } } } + +struct ExecutionPlanMetricCollector { + metrics: HashMap, +} + +impl ExecutionPlanMetricCollector { + fn new() -> Self { + Self { + metrics: HashMap::new(), + } + } + + fn into_hashmap(self) -> HashMap { + self.metrics + } +} + +impl ExecutionPlanVisitor for ExecutionPlanMetricCollector { + type Error = DataFusionError; + + fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { + if let Some(_flight_exec) = plan.as_any().downcast_ref::() { + if let Some(metrics) = plan.metrics() { + let agg = metrics.aggregate_by_name().timestamps_removed(); + for m in agg.iter() { + self.metrics + .insert(m.value().name().to_string(), m.value().as_usize()); + } + } + } + Ok(true) + } + + fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> Result { + Ok(true) + } +} diff --git a/benchmarks/src/util/run.rs b/benchmarks/src/util/run.rs index 64b78e42b73e7..b3562b64d40ff 100644 --- a/benchmarks/src/util/run.rs +++ b/benchmarks/src/util/run.rs @@ -84,6 +84,7 @@ struct QueryIter { elapsed: Duration, row_count: usize, parquet_cache_bytes_read: usize, + customized_metrics: Option>, } /// A single benchmark case #[derive(Debug, Serialize)] @@ -147,6 +148,26 @@ impl BenchmarkRun { elapsed, row_count, parquet_cache_bytes_read, + customized_metrics: None, + }) + } else { + panic!("no cases existed yet"); + } + } + + pub fn write_iter_with_metrics( + &mut self, + elapsed: Duration, + row_count: usize, + parquet_cache_bytes_read: usize, + customized_metrics: HashMap, + ) { + if let Some(idx) = self.current_case { + self.queries[idx].iterations.push(QueryIter { + elapsed, + row_count, + parquet_cache_bytes_read, + customized_metrics: Some(customized_metrics), }) } else { panic!("no cases existed yet"); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 2115fa3e069cd..b039ffbe95280 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -62,7 +62,10 @@ pub use access_plan::{ParquetAccessPlan, RowGroupAccess}; pub use metrics::ParquetFileMetrics; use opener::ParquetOpener; pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory}; -pub use reader7::{Parquet7FileReader, Parquet7FileReaderFactory}; +pub use reader7::{ + Parquet7FileReader, Parquet7FileReaderFactory, Parquet8FileReader, + Parquet8FileReaderFactory, +}; pub use row_filter::can_expr_be_pushed_down_with_schemas; pub use writer::plan_to_parquet; diff --git a/datafusion/core/src/datasource/physical_plan/parquet/reader7.rs b/datafusion/core/src/datasource/physical_plan/parquet/reader7.rs index 8bee3970aa967..9257b5635a4f0 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/reader7.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/reader7.rs @@ -61,6 +61,97 @@ pub struct Parquet7FileReader { } impl AsyncFileReader for Parquet7FileReader { + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let total = ranges.iter().map(|r| r.end - r.start).sum(); + self.file_metrics.bytes_scanned.add(total); + self.inner.get_byte_ranges(ranges) + } + + fn get_bytes( + &mut self, + range: Range, + ) -> BoxFuture<'_, parquet::errors::Result> { + self.file_metrics.bytes_scanned.add(range.end - range.start); + self.inner.get_bytes(range) + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let cache = Cache37::meta_cache().read().unwrap(); + let path = &self.inner.meta.location; + + if let Some(meta) = cache.get(path) { + let meta = meta.clone(); + return async move { Ok(meta) }.boxed(); + } + + drop(cache); + + let path = self.inner.meta.location.clone(); + let get_meta = self.inner.get_metadata(); + async move { + let meta = get_meta.await?; + let mut cache = Cache37::meta_cache().write().unwrap(); + cache.entry(path).or_insert(meta.clone()); + Ok(meta) + } + .boxed() + } +} + +/// Doc +#[derive(Debug)] +pub struct Parquet8FileReaderFactory { + store: Arc, +} + +impl Parquet8FileReaderFactory { + /// Doc + pub fn new(store: Arc) -> Self { + Self { store } + } +} + +impl ParquetFileReaderFactory for Parquet8FileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option, + metrics: &ExecutionPlanMetricsSet, + ) -> datafusion_common::Result> { + let file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + let store = Arc::clone(&self.store); + let mut inner = ParquetObjectReader::new(store, file_meta.object_meta); + + if let Some(hint) = metadata_size_hint { + inner = inner.with_footer_size_hint(hint) + }; + + Ok(Box::new(Parquet7FileReader { + inner, + file_metrics, + })) + } +} + +/// doc +pub struct Parquet8FileReader { + /// doc + pub file_metrics: ParquetFileMetrics, + /// doc + pub inner: ParquetObjectReader, +} + +impl AsyncFileReader for Parquet8FileReader { fn get_byte_ranges( &mut self, ranges: Vec>, diff --git a/datafusion/flight-table/src/exec.rs b/datafusion/flight-table/src/exec.rs index 2af941fc27848..70e80c88b9f98 100644 --- a/datafusion/flight-table/src/exec.rs +++ b/datafusion/flight-table/src/exec.rs @@ -49,7 +49,7 @@ use tonic::metadata::{AsciiMetadataKey, MetadataMap}; /// Arrow Flight physical plan that maps flight endpoints to partitions #[derive(Clone, Debug)] -pub(crate) struct FlightExec { +pub struct FlightExec { config: FlightConfig, plan_properties: PlanProperties, metadata_map: Arc, @@ -348,6 +348,9 @@ impl Stream for FlightStream { match result { Poll::Ready(Some(Ok(batch))) => { self.metrics.output_rows.add(batch.num_rows()); + self.metrics + .bytes_transferred + .add(batch.get_array_memory_size()); let new_batch = self.schema_mapper.map_batch(batch).unwrap(); return Poll::Ready(Some(Ok(new_batch))); } diff --git a/datafusion/flight-table/src/lib.rs b/datafusion/flight-table/src/lib.rs index 3d0243856f04c..9129f32f0ddad 100644 --- a/datafusion/flight-table/src/lib.rs +++ b/datafusion/flight-table/src/lib.rs @@ -20,3 +20,6 @@ mod metrics; pub mod sql; mod table; pub use table::FlightTableFactory; +pub use exec::FlightExec; +mod utils; +pub use utils::GcStream; diff --git a/datafusion/flight-table/src/metrics.rs b/datafusion/flight-table/src/metrics.rs index f88ab39ff59cc..eddbfecdc5f52 100644 --- a/datafusion/flight-table/src/metrics.rs +++ b/datafusion/flight-table/src/metrics.rs @@ -67,6 +67,7 @@ pub(crate) struct FlightStreamMetrics { pub time_reading_total: StartableTime, pub poll_count: Count, pub output_rows: Count, + pub bytes_transferred: Count, } impl FlightStreamMetrics { @@ -84,6 +85,8 @@ impl FlightStreamMetrics { }, output_rows: MetricBuilder::new(metrics).output_rows(partition), poll_count: MetricBuilder::new(metrics).counter("poll_count", partition), + bytes_transferred: MetricBuilder::new(metrics) + .counter("bytes_transferred", partition), } } } diff --git a/datafusion/flight-table/src/utils.rs b/datafusion/flight-table/src/utils.rs new file mode 100644 index 0000000000000..fcbe39fbc28b7 --- /dev/null +++ b/datafusion/flight-table/src/utils.rs @@ -0,0 +1,72 @@ +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use arrow_array::{ + builder::StringDictionaryBuilder, cast::AsArray, types::UInt16Type, Array, + RecordBatch, StringArray, +}; +use datafusion::error::Result; +use futures::{ready, Stream}; +use futures::{stream::BoxStream, StreamExt}; + +/// A stream that garbage collects the memory of the record batches. +/// Applies to DictionaryArray and StringViewArray where the data may not be compact. +/// Useful before sending the data over the network. +pub struct GcStream { + inner: BoxStream<'static, Result>, +} + +impl GcStream { + pub fn new> + Send + 'static>(inner: S) -> Self { + Self { + inner: inner.boxed(), + } + } +} + +impl Stream for GcStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let batch = ready!(self.inner.poll_next_unpin(cx)); + match batch { + Some(Ok(batch)) => { + let batch = gc_batch(batch); + Poll::Ready(Some(Ok(batch))) + } + Some(Err(e)) => Poll::Ready(Some(Err(e))), + None => Poll::Ready(None), + } + } +} + +fn gc_batch(batch: RecordBatch) -> RecordBatch { + let new_columns = batch.columns().into_iter().map(|column| { + if let Some(dict_array) = column.as_dictionary_opt::() { + if let Some(typed_dict_array) = dict_array.downcast_dict::() { + let array_len = dict_array.len(); + let values_len = typed_dict_array.values().len(); + if values_len > array_len { + let mut gc_array = + StringDictionaryBuilder::::with_capacity( + array_len, values_len, 1024, + ); + for v in typed_dict_array.into_iter() { + gc_array.append_option(v); + } + let gc_array = gc_array.finish(); + return Arc::new(gc_array) as _; + } + } + } + column.clone() + }); + + RecordBatch::try_new(batch.schema(), new_columns.collect()).unwrap() +} diff --git a/dev/flight-cache.sh b/dev/flight-cache.sh new file mode 100644 index 0000000000000..fa99667cb0d90 --- /dev/null +++ b/dev/flight-cache.sh @@ -0,0 +1,44 @@ +#!/bin/bash + +# Define log files +query_log="output_queries.log" +server_log="output_server.log" + +# Loop from query 0 to 42 +for i in {0..42} +do + # Kill any existing processes listening on port 50051 + lsof -ti:50051 | xargs -r kill -9 + # Start the server in background and capture its PID + RUST_LOG=cache_server=info \ + cargo run --profile release-nonlto --bin cache_server \ + -- --path benchmarks/data/hits.parquet >> "$server_log" 2>&1 & + server_pid=$! + + # Give the server a moment to start up + sleep 1 + + # Run the query + echo "Running query $i..." | tee -a $query_log + cargo run --profile release-nonlto --bin dfbench -- clickbench \ + --queries-path benchmarks/queries/clickbench/queries.sql \ + --iterations 4 --path benchmarks/data/hits.parquet \ + --query $i --pushdown-filters \ + --flight-cache http://localhost:50051 &>> $query_log + + query_status=$? + + # Kill the server + kill $server_pid + wait $server_pid 2>/dev/null + + # Check query status + if [ $query_status -eq 0 ]; then + echo "Query $i completed successfully." | tee -a $query_log + else + echo "Query $i failed. Check $query_log for details." | tee -a $query_log + fi + + # Clean up cache + rm -rf target/arrow-cache +done