diff --git a/benchmarks/src/bin/cache_client.rs b/benchmarks/src/bin/cache_client.rs deleted file mode 100644 index 08f1fb1a33247..0000000000000 --- a/benchmarks/src/bin/cache_client.rs +++ /dev/null @@ -1,122 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::util::pretty; -use datafusion::error::Result; -use datafusion::physical_plan::collect; -use datafusion::physical_plan::display::DisplayableExecutionPlan; -use datafusion::prelude::SessionContext; -use datafusion_common::exec_datafusion_err; -use datafusion_flight_table::sql::{FlightSqlDriver, USERNAME}; -use datafusion_flight_table::FlightTableFactory; -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use structopt::StructOpt; - -#[derive(Debug, StructOpt)] -struct Options { - #[structopt(long)] - queries_path: PathBuf, - - #[structopt(long)] - query: Option, -} - -struct AllQueries { - queries: Vec, -} - -impl AllQueries { - fn try_new(path: &Path) -> Result { - // ClickBench has all queries in a single file identified by line number - let all_queries = std::fs::read_to_string(path) - .map_err(|e| exec_datafusion_err!("Could not open {path:?}: {e}"))?; - Ok(Self { - queries: all_queries.lines().map(|s| s.to_string()).collect(), - }) - } - - /// Returns the text of query `query_id` - fn get_query(&self, query_id: usize) -> Result<&str> { - self.queries - .get(query_id) - .ok_or_else(|| { - let min_id = self.min_query_id(); - let max_id = self.max_query_id(); - exec_datafusion_err!( - "Invalid query id {query_id}. Must be between {min_id} and {max_id}" - ) - }) - .map(|s| s.as_str()) - } - - fn min_query_id(&self) -> usize { - 0 - } - - fn max_query_id(&self) -> usize { - self.queries.len() - 1 - } -} - -#[tokio::main] -async fn main() -> datafusion::common::Result<()> { - env_logger::init(); - let options = Options::from_args(); - let all_queries = AllQueries::try_new(options.queries_path.as_path())?; - - let query_id = options.query.unwrap_or(0); - let sql = all_queries.get_query(query_id)?; - - let ctx = SessionContext::new(); - let mut state = ctx.state(); - state - .config_mut() - .options_mut() - .execution - .parquet - .pushdown_filters = false; - - let flight_sql = FlightTableFactory::new(Arc::new(FlightSqlDriver::default())); - let table = flight_sql - .open_table( - "http://localhost:50051", - HashMap::from([(USERNAME.into(), "whatever".into())]), - "hits", - ) - .await?; - ctx.register_table("hits", Arc::new(table))?; - - let plan = ctx.sql(sql).await?; - let (state, plan) = plan.into_parts(); - let plan = state.optimize(&plan)?; - - println!("logical plan: {}", plan); - let physical_plan = state.create_physical_plan(&plan).await?; - let result = collect(physical_plan.clone(), state.task_ctx()).await?; - println!( - "=== Physical plan with metrics ===\n{}\n", - DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent(true) - ); - if !result.is_empty() { - // do not call print_batches if there are no batches as the result is confusing - // and makes it look like there is a batch with no columns - pretty::print_batches(&result)?; - } - Ok(()) -} diff --git a/benchmarks/src/bin/cache_server.rs b/benchmarks/src/bin/cache_server.rs index 64ad9cdb2faed..639879da365c1 100644 --- a/benchmarks/src/bin/cache_server.rs +++ b/benchmarks/src/bin/cache_server.rs @@ -60,6 +60,9 @@ macro_rules! status { struct Options { #[structopt(short, long)] path: PathBuf, + + #[structopt(long)] + partitions: Option, } #[tokio::main] @@ -82,7 +85,8 @@ async fn main() -> Result<(), Box> { let mut session_config = SessionConfig::from_env() .map_err(|e| Status::internal(format!("Error building plan: {e}")))? - .with_information_schema(true); + .with_information_schema(true) + .with_target_partitions(options.partitions.unwrap_or(num_cpus::get())); session_config .options_mut() @@ -275,10 +279,10 @@ impl FlightSqlService for FlightSqlServiceImpl { let execution_plan = self.get_result(&handle)?; let displayable = - datafusion::physical_plan::display::DisplayableExecutionPlan::with_metrics( + datafusion::physical_plan::display::DisplayableExecutionPlan::new( execution_plan.as_ref(), ); - info!("physical plan:\n{}", displayable.indent(true)); + info!("physical plan:\n{}", displayable.indent(false)); let ctx = self.get_ctx(&request)?; diff --git a/benchmarks/src/clickbench.rs b/benchmarks/src/clickbench.rs index 1ac25f58f5892..80b7fa3180556 100644 --- a/benchmarks/src/clickbench.rs +++ b/benchmarks/src/clickbench.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::env; use std::fs::File; use std::path::Path; @@ -34,6 +35,9 @@ use datafusion::{ }; use datafusion_common::exec_datafusion_err; use datafusion_common::instant::Instant; +use datafusion_flight_table::sql::FlightSqlDriver; +use datafusion_flight_table::sql::USERNAME; +use datafusion_flight_table::FlightTableFactory; use object_store::aws::AmazonS3Builder; use object_store::ObjectStore; use parquet::arrow::arrow_cache::ArrowArrayCache; @@ -101,6 +105,9 @@ pub struct RunOpt { /// Predicate pushdown #[structopt(long)] pushdown_filters: bool, + + #[structopt(long)] + flight_cache: Option, } struct AllQueries { @@ -171,7 +178,7 @@ impl RunOpt { config.options_mut().execution.parquet.pushdown_filters = self.pushdown_filters; let ctx = SessionContext::new_with_config(config); - self.register_hits(&ctx).await?; + self.register_hits(&ctx, &self.flight_cache).await?; let iterations = self.common.iterations; let mut benchmark_run = BenchmarkRun::new(); @@ -298,41 +305,65 @@ impl RunOpt { Ok(()) } - /// Registrs the `hits.parquet` as a table named `hits` - async fn register_hits(&self, ctx: &SessionContext) -> Result<()> { + /// Registers the `hits.parquet` as a table named `hits` + async fn register_hits( + &self, + ctx: &SessionContext, + flight_cache: &Option, + ) -> Result<()> { let path = self.path.as_os_str().to_str().unwrap(); - let object_store: Arc = if path.starts_with("minio://") { - let url = Url::parse(path).unwrap(); - let bucket_name = url.host_str().unwrap_or("parquet-oo"); - let object_store = AmazonS3Builder::new() - .with_bucket_name(bucket_name) - .with_endpoint("http://c220g5-110910.wisc.cloudlab.us:9000") - .with_allow_http(true) - .with_region("us-east-1") - .with_access_key_id(env::var("MINIO_ACCESS_KEY_ID").unwrap()) - .with_secret_access_key(env::var("MINIO_SECRET_ACCESS_KEY").unwrap()) - .build()?; - let object_store = Arc::new(object_store); - ctx.register_object_store(&url, object_store.clone()); - object_store - } else { - let url = ObjectStoreUrl::local_filesystem(); - let object_store = ctx.runtime_env().object_store(url).unwrap(); - Arc::new(object_store) - }; - - let mut options: ParquetReadOptions<'_> = Default::default(); - use datafusion::datasource::physical_plan::parquet::Parquet7FileReaderFactory; - options.reader = Some(Arc::new(Parquet7FileReaderFactory::new(object_store))); + match flight_cache { + Some(flight_cache) => { + let flight_sql = + FlightTableFactory::new(Arc::new(FlightSqlDriver::default())); + let table = flight_sql + .open_table( + flight_cache, + HashMap::from([(USERNAME.into(), "whatever".into())]), + "hits", + ) + .await?; + ctx.register_table("hits", Arc::new(table))?; + Ok(()) + } + None => { + let object_store: Arc = if path.starts_with("minio://") { + let url = Url::parse(path).unwrap(); + let bucket_name = url.host_str().unwrap_or("parquet-oo"); + let object_store = AmazonS3Builder::new() + .with_bucket_name(bucket_name) + .with_endpoint("http://c220g5-110910.wisc.cloudlab.us:9000") + .with_allow_http(true) + .with_region("us-east-1") + .with_access_key_id(env::var("MINIO_ACCESS_KEY_ID").unwrap()) + .with_secret_access_key( + env::var("MINIO_SECRET_ACCESS_KEY").unwrap(), + ) + .build()?; + let object_store = Arc::new(object_store); + ctx.register_object_store(&url, object_store.clone()); + object_store + } else { + let url = ObjectStoreUrl::local_filesystem(); + let object_store = ctx.runtime_env().object_store(url).unwrap(); + Arc::new(object_store) + }; - ctx.register_parquet("hits", &path, options) - .await - .map_err(|e| { - DataFusionError::Context( - format!("Registering 'hits' as {path}"), - Box::new(e), - ) - }) + let mut options: ParquetReadOptions<'_> = Default::default(); + use datafusion::datasource::physical_plan::parquet::Parquet7FileReaderFactory; + options.reader = + Some(Arc::new(Parquet7FileReaderFactory::new(object_store))); + + ctx.register_parquet("hits", &path, options) + .await + .map_err(|e| { + DataFusionError::Context( + format!("Registering 'hits' as {path}"), + Box::new(e), + ) + }) + } + } } } diff --git a/datafusion/flight-table/src/exec.rs b/datafusion/flight-table/src/exec.rs index ba605f27a6a0a..859a391b50095 100644 --- a/datafusion/flight-table/src/exec.rs +++ b/datafusion/flight-table/src/exec.rs @@ -18,27 +18,33 @@ //! Execution plan for reading flights from Arrow Flight services use std::any::Any; -use std::error::Error; use std::fmt::{Debug, Formatter}; +use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; +use std::task::{ready, Context, Poll}; +use crate::metrics::{FlightStreamMetrics, FlightTableMetrics}; use crate::table::{flight_channel, to_df_err, FlightMetadata, FlightProperties}; -use arrow_flight::error::FlightError; +use arrow_array::RecordBatch; use arrow_flight::flight_service_client::FlightServiceClient; use arrow_flight::{FlightClient, FlightEndpoint, Ticket}; use arrow_schema::SchemaRef; use datafusion::config::ConfigOptions; use datafusion_common::arrow::datatypes::ToByteSlice; +use datafusion_common::project_schema; use datafusion_common::Result; -use datafusion_common::{project_schema, DataFusionError}; -use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_plan::metrics::{ + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, +}; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, }; -use futures::TryStreamExt; +use futures::future::BoxFuture; +use futures::{Stream, TryStreamExt}; use serde::{Deserialize, Serialize}; use tonic::metadata::{AsciiMetadataKey, MetadataMap}; @@ -48,6 +54,7 @@ pub(crate) struct FlightExec { config: FlightConfig, plan_properties: PlanProperties, metadata_map: Arc, + metrics: ExecutionPlanMetricsSet, } impl FlightExec { @@ -92,10 +99,14 @@ impl FlightExec { let value = v.parse().expect("invalid header value"); mm.insert(key, value); } + + let metrics = ExecutionPlanMetricsSet::new(); + Ok(Self { config, plan_properties, metadata_map: Arc::from(mm), + metrics, }) } } @@ -112,7 +123,7 @@ pub(crate) struct FlightConfig { /// The minimum information required for fetching a flight stream. #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] struct FlightPartition { - locations: Arc<[String]>, + locations: String, ticket: FlightTicket, } @@ -138,7 +149,7 @@ impl Debug for FlightTicket { impl FlightPartition { fn new(endpoint: &FlightEndpoint, fallback_location: String) -> Self { let locations = if endpoint.location.is_empty() { - [fallback_location].into() + fallback_location } else { endpoint .location @@ -175,35 +186,21 @@ async fn flight_stream( partition: FlightPartition, schema: SchemaRef, grpc_headers: Arc, + mut flight_metrics: FlightTableMetrics, ) -> Result { - let mut errors: Vec> = vec![]; - for loc in partition.locations.iter() { - let client = flight_client(loc, grpc_headers.as_ref()).await?; - match try_fetch_stream(client, &partition.ticket, schema.clone()).await { - Ok(stream) => return Ok(stream), - Err(e) => errors.push(Box::new(e)), - } - } - let err = errors.into_iter().last().unwrap_or_else(|| { - Box::new(FlightError::ProtocolError(format!( - "No available location for endpoint {:?}", - partition.locations - ))) - }); - Err(DataFusionError::External(err)) -} + flight_metrics.time_creating_client.start(); + let mut client = flight_client(partition.locations, grpc_headers.as_ref()).await?; + flight_metrics.time_creating_client.stop(); -async fn try_fetch_stream( - mut client: FlightClient, - ticket: &FlightTicket, - schema: SchemaRef, -) -> arrow_flight::error::Result { - let ticket = Ticket::new(ticket.0.to_vec()); - let stream = client.do_get(ticket).await?.map_err(to_df_err); - Ok(Box::pin(RecordBatchStreamAdapter::new( + let ticket = Ticket::new(partition.ticket.0.to_vec()); + flight_metrics.time_getting_stream.start(); + let stream = client.do_get(ticket).await.unwrap().map_err(to_df_err); + flight_metrics.time_getting_stream.stop(); + + return Ok(Box::pin(RecordBatchStreamAdapter::new( schema.clone(), stream, - ))) + ))); } impl DisplayAs for FlightExec { @@ -217,8 +214,10 @@ impl DisplayAs for FlightExec { ), DisplayFormatType::Verbose => write!( f, - "FlightExec: origin={}, partitions={:?}, properties={:?}", - self.config.origin, self.config.partitions, self.config.properties, + "FlightExec: origin={}, streams={}, properties={:?}", + self.config.origin, + self.config.partitions.len(), + self.config.properties, ), } } @@ -253,16 +252,23 @@ impl ExecutionPlan for FlightExec { partition: usize, _context: Arc, ) -> Result { + let flight_metrics = FlightTableMetrics::new(&self.metrics, partition); let future_stream = flight_stream( self.config.partitions[partition].clone(), self.schema(), self.metadata_map.clone(), + flight_metrics, ); - let stream = futures::stream::once(future_stream).try_flatten(); - Ok(Box::pin(RecordBatchStreamAdapter::new( - self.schema(), - stream, - ))) + let stream_metrics = FlightStreamMetrics::new(&self.metrics, partition); + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + Ok(Box::pin(FlightStream { + metrics: stream_metrics, + baseline_metrics, + _partition: partition, + state: FlightStreamState::Init, + future_stream: Some(Box::pin(future_stream)), + schema: self.schema().clone(), + })) } fn fetch(&self) -> Option { @@ -279,4 +285,71 @@ impl ExecutionPlan for FlightExec { Partitioning::UnknownPartitioning(self.config.partitions.len()); Ok(Some(Arc::new(new_plan))) } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } +} + +enum FlightStreamState { + Init, + GetStream(BoxFuture<'static, Result>), + Processing(SendableRecordBatchStream), +} + +struct FlightStream { + metrics: FlightStreamMetrics, + baseline_metrics: BaselineMetrics, + _partition: usize, + state: FlightStreamState, + future_stream: Option>>, + schema: SchemaRef, +} + +impl FlightStream { + fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { + loop { + match &mut self.state { + FlightStreamState::Init => { + self.metrics.time_reading_total.start(); + self.state = + FlightStreamState::GetStream(self.future_stream.take().unwrap()); + continue; + } + FlightStreamState::GetStream(fut) => { + let stream = ready!(fut.as_mut().poll(cx)).unwrap(); + self.state = FlightStreamState::Processing(stream); + continue; + } + FlightStreamState::Processing(stream) => { + let result = stream.as_mut().poll_next(cx); + self.metrics.poll_count.add(1); + return result; + } + } + } + } +} + +impl Stream for FlightStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.metrics.time_processing.start(); + let result = self.poll_inner(cx); + self.metrics.time_processing.stop(); + if let Poll::Ready(None) = result { + self.metrics.time_reading_total.stop(); + } + self.baseline_metrics.record_poll(result) + } +} + +impl RecordBatchStream for FlightStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } } diff --git a/datafusion/flight-table/src/lib.rs b/datafusion/flight-table/src/lib.rs index 77d45c26ced68..3d0243856f04c 100644 --- a/datafusion/flight-table/src/lib.rs +++ b/datafusion/flight-table/src/lib.rs @@ -16,6 +16,7 @@ // under the License. mod exec; +mod metrics; pub mod sql; mod table; pub use table::FlightTableFactory; diff --git a/datafusion/flight-table/src/metrics.rs b/datafusion/flight-table/src/metrics.rs new file mode 100644 index 0000000000000..9933760f11ba3 --- /dev/null +++ b/datafusion/flight-table/src/metrics.rs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::instant::Instant; +use datafusion_physical_plan::metrics::{ + Count, ExecutionPlanMetricsSet, MetricBuilder, Time, +}; + +/// A timer that can be started and stopped. +pub struct StartableTime { + pub(crate) metrics: Time, + // use for record each part cost time, will eventually add into 'metrics'. + pub(crate) start: Option, +} + +impl StartableTime { + pub(crate) fn start(&mut self) { + assert!(self.start.is_none()); + self.start = Some(Instant::now()); + } + + pub(crate) fn stop(&mut self) { + if let Some(start) = self.start.take() { + self.metrics.add_elapsed(start); + } + } +} + +pub(crate) struct FlightTableMetrics { + pub time_creating_client: StartableTime, + pub time_getting_stream: StartableTime, +} + +impl FlightTableMetrics { + pub(crate) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + Self { + time_creating_client: StartableTime { + metrics: MetricBuilder::new(metrics) + .subset_time("time_creating_client", partition), + start: None, + }, + time_getting_stream: StartableTime { + metrics: MetricBuilder::new(metrics) + .subset_time("time_getting_stream", partition), + start: None, + }, + } + } +} + +pub(crate) struct FlightStreamMetrics { + pub time_processing: StartableTime, + pub time_reading_total: StartableTime, + pub poll_count: Count, +} + +impl FlightStreamMetrics { + pub(crate) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + Self { + time_processing: StartableTime { + metrics: MetricBuilder::new(metrics) + .subset_time("time_processing", partition), + start: None, + }, + time_reading_total: StartableTime { + metrics: MetricBuilder::new(metrics) + .subset_time("time_reading_total", partition), + start: None, + }, + poll_count: MetricBuilder::new(metrics).counter("poll_count", partition), + } + } +} diff --git a/datafusion/flight-table/src/table.rs b/datafusion/flight-table/src/table.rs index 038b11f122386..e900f07b11b77 100644 --- a/datafusion/flight-table/src/table.rs +++ b/datafusion/flight-table/src/table.rs @@ -26,7 +26,7 @@ use datafusion_sql::unparser::Unparser; use datafusion_sql::TableReference; use log::info; use serde::{Deserialize, Serialize}; -use tonic::transport::{Channel, ClientTlsConfig}; +use tonic::transport::Channel; /// Generic Arrow Flight data source. Requires a [FlightDriver] that allows implementors /// to integrate any custom Flight RPC service by producing a [FlightMetadata] for some DDL. @@ -249,10 +249,9 @@ pub(crate) fn to_df_err(err: E) -> DataFusionE } pub(crate) async fn flight_channel(source: impl Into) -> Result { - let tls_config = ClientTlsConfig::new().with_enabled_roots(); + // No tls here, to avoid the overhead of TLS + // we assume both server and client are running on the trusted network. Channel::from_shared(source.into()) - .map_err(to_df_err)? - .tls_config(tls_config) .map_err(to_df_err)? .connect() .await