diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index b02bfee2454e..be742a1a3c64 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -350,7 +350,7 @@ run_tpch() { RESULTS_FILE="${RESULTS_DIR}/tpch_sf${SCALE_FACTOR}.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running tpch benchmark..." - $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}" + $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --query 18 --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}" } # Runs the tpch in memory diff --git a/benchmarks/queries/clickbench/queries.sql b/benchmarks/queries/clickbench/queries.sql index 52e72e02e1e0..7ffbd31662d2 100644 --- a/benchmarks/queries/clickbench/queries.sql +++ b/benchmarks/queries/clickbench/queries.sql @@ -1,43 +1 @@ -SELECT COUNT(*) FROM hits; -SELECT COUNT(*) FROM hits WHERE "AdvEngineID" <> 0; -SELECT SUM("AdvEngineID"), COUNT(*), AVG("ResolutionWidth") FROM hits; -SELECT AVG("UserID") FROM hits; -SELECT COUNT(DISTINCT "UserID") FROM hits; -SELECT COUNT(DISTINCT "SearchPhrase") FROM hits; -SELECT MIN("EventDate"::INT::DATE), MAX("EventDate"::INT::DATE) FROM hits; -SELECT "AdvEngineID", COUNT(*) FROM hits WHERE "AdvEngineID" <> 0 GROUP BY "AdvEngineID" ORDER BY COUNT(*) DESC; -SELECT "RegionID", COUNT(DISTINCT "UserID") AS u FROM hits GROUP BY "RegionID" ORDER BY u DESC LIMIT 10; -SELECT "RegionID", SUM("AdvEngineID"), COUNT(*) AS c, AVG("ResolutionWidth"), COUNT(DISTINCT "UserID") FROM hits GROUP BY "RegionID" ORDER BY c DESC LIMIT 10; -SELECT "MobilePhoneModel", COUNT(DISTINCT "UserID") AS u FROM hits WHERE "MobilePhoneModel" <> '' GROUP BY "MobilePhoneModel" ORDER BY u DESC LIMIT 10; -SELECT "MobilePhone", "MobilePhoneModel", COUNT(DISTINCT "UserID") AS u FROM hits WHERE "MobilePhoneModel" <> '' GROUP BY "MobilePhone", "MobilePhoneModel" ORDER BY u DESC LIMIT 10; -SELECT "SearchPhrase", COUNT(*) AS c FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10; -SELECT "SearchPhrase", COUNT(DISTINCT "UserID") AS u FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY u DESC LIMIT 10; -SELECT "SearchEngineID", "SearchPhrase", COUNT(*) AS c FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchEngineID", "SearchPhrase" ORDER BY c DESC LIMIT 10; -SELECT "UserID", COUNT(*) FROM hits GROUP BY "UserID" ORDER BY COUNT(*) DESC LIMIT 10; -SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10; -SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" LIMIT 10; -SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10; -SELECT "UserID" FROM hits WHERE "UserID" = 435090932899640449; -SELECT COUNT(*) FROM hits WHERE "URL" LIKE '%google%'; -SELECT "SearchPhrase", MIN("URL"), COUNT(*) AS c FROM hits WHERE "URL" LIKE '%google%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10; -SELECT "SearchPhrase", MIN("URL"), MIN("Title"), COUNT(*) AS c, COUNT(DISTINCT "UserID") FROM hits WHERE "Title" LIKE '%Google%' AND "URL" NOT LIKE '%.google.%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10; -SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY to_timestamp_seconds("EventTime") LIMIT 10; -SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY to_timestamp_seconds("EventTime") LIMIT 10; -SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "SearchPhrase" LIMIT 10; -SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY to_timestamp_seconds("EventTime"), "SearchPhrase" LIMIT 10; -SELECT "CounterID", AVG(length("URL")) AS l, COUNT(*) AS c FROM hits WHERE "URL" <> '' GROUP BY "CounterID" HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; -SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer") FROM hits WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25; -SELECT SUM("ResolutionWidth"), SUM("ResolutionWidth" + 1), SUM("ResolutionWidth" + 2), SUM("ResolutionWidth" + 3), SUM("ResolutionWidth" + 4), SUM("ResolutionWidth" + 5), SUM("ResolutionWidth" + 6), SUM("ResolutionWidth" + 7), SUM("ResolutionWidth" + 8), SUM("ResolutionWidth" + 9), SUM("ResolutionWidth" + 10), SUM("ResolutionWidth" + 11), SUM("ResolutionWidth" + 12), SUM("ResolutionWidth" + 13), SUM("ResolutionWidth" + 14), SUM("ResolutionWidth" + 15), SUM("ResolutionWidth" + 16), SUM("ResolutionWidth" + 17), SUM("ResolutionWidth" + 18), SUM("ResolutionWidth" + 19), SUM("ResolutionWidth" + 20), SUM("ResolutionWidth" + 21), SUM("ResolutionWidth" + 22), SUM("ResolutionWidth" + 23), SUM("ResolutionWidth" + 24), SUM("ResolutionWidth" + 25), SUM("ResolutionWidth" + 26), SUM("ResolutionWidth" + 27), SUM("ResolutionWidth" + 28), SUM("ResolutionWidth" + 29), SUM("ResolutionWidth" + 30), SUM("ResolutionWidth" + 31), SUM("ResolutionWidth" + 32), SUM("ResolutionWidth" + 33), SUM("ResolutionWidth" + 34), SUM("ResolutionWidth" + 35), SUM("ResolutionWidth" + 36), SUM("ResolutionWidth" + 37), SUM("ResolutionWidth" + 38), SUM("ResolutionWidth" + 39), SUM("ResolutionWidth" + 40), SUM("ResolutionWidth" + 41), SUM("ResolutionWidth" + 42), SUM("ResolutionWidth" + 43), SUM("ResolutionWidth" + 44), SUM("ResolutionWidth" + 45), SUM("ResolutionWidth" + 46), SUM("ResolutionWidth" + 47), SUM("ResolutionWidth" + 48), SUM("ResolutionWidth" + 49), SUM("ResolutionWidth" + 50), SUM("ResolutionWidth" + 51), SUM("ResolutionWidth" + 52), SUM("ResolutionWidth" + 53), SUM("ResolutionWidth" + 54), SUM("ResolutionWidth" + 55), SUM("ResolutionWidth" + 56), SUM("ResolutionWidth" + 57), SUM("ResolutionWidth" + 58), SUM("ResolutionWidth" + 59), SUM("ResolutionWidth" + 60), SUM("ResolutionWidth" + 61), SUM("ResolutionWidth" + 62), SUM("ResolutionWidth" + 63), SUM("ResolutionWidth" + 64), SUM("ResolutionWidth" + 65), SUM("ResolutionWidth" + 66), SUM("ResolutionWidth" + 67), SUM("ResolutionWidth" + 68), SUM("ResolutionWidth" + 69), SUM("ResolutionWidth" + 70), SUM("ResolutionWidth" + 71), SUM("ResolutionWidth" + 72), SUM("ResolutionWidth" + 73), SUM("ResolutionWidth" + 74), SUM("ResolutionWidth" + 75), SUM("ResolutionWidth" + 76), SUM("ResolutionWidth" + 77), SUM("ResolutionWidth" + 78), SUM("ResolutionWidth" + 79), SUM("ResolutionWidth" + 80), SUM("ResolutionWidth" + 81), SUM("ResolutionWidth" + 82), SUM("ResolutionWidth" + 83), SUM("ResolutionWidth" + 84), SUM("ResolutionWidth" + 85), SUM("ResolutionWidth" + 86), SUM("ResolutionWidth" + 87), SUM("ResolutionWidth" + 88), SUM("ResolutionWidth" + 89) FROM hits; -SELECT "SearchEngineID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY "SearchEngineID", "ClientIP" ORDER BY c DESC LIMIT 10; -SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits WHERE "SearchPhrase" <> '' GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10; -SELECT "WatchID", "ClientIP", COUNT(*) AS c, SUM("IsRefresh"), AVG("ResolutionWidth") FROM hits GROUP BY "WatchID", "ClientIP" ORDER BY c DESC LIMIT 10; -SELECT "URL", COUNT(*) AS c FROM hits GROUP BY "URL" ORDER BY c DESC LIMIT 10; -SELECT 1, "URL", COUNT(*) AS c FROM hits GROUP BY 1, "URL" ORDER BY c DESC LIMIT 10; -SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3 ORDER BY c DESC LIMIT 10; -SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "URL" <> '' GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10; -SELECT "Title", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "DontCountHits" = 0 AND "IsRefresh" = 0 AND "Title" <> '' GROUP BY "Title" ORDER BY PageViews DESC LIMIT 10; -SELECT "URL", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "IsLink" <> 0 AND "IsDownload" = 0 GROUP BY "URL" ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; -SELECT "TraficSourceID", "SearchEngineID", "AdvEngineID", CASE WHEN ("SearchEngineID" = 0 AND "AdvEngineID" = 0) THEN "Referer" ELSE '' END AS Src, "URL" AS Dst, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 GROUP BY "TraficSourceID", "SearchEngineID", "AdvEngineID", Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; -SELECT "URLHash", "EventDate"::INT::DATE, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "TraficSourceID" IN (-1, 6) AND "RefererHash" = 3594120000172545465 GROUP BY "URLHash", "EventDate"::INT::DATE ORDER BY PageViews DESC LIMIT 10 OFFSET 100; -SELECT "WindowClientWidth", "WindowClientHeight", COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-01' AND "EventDate"::INT::DATE <= '2013-07-31' AND "IsRefresh" = 0 AND "DontCountHits" = 0 AND "URLHash" = 2868770270353813622 GROUP BY "WindowClientWidth", "WindowClientHeight" ORDER BY PageViews DESC LIMIT 10 OFFSET 10000; -SELECT DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) AS M, COUNT(*) AS PageViews FROM hits WHERE "CounterID" = 62 AND "EventDate"::INT::DATE >= '2013-07-14' AND "EventDate"::INT::DATE <= '2013-07-15' AND "IsRefresh" = 0 AND "DontCountHits" = 0 GROUP BY DATE_TRUNC('minute', to_timestamp_seconds("EventTime")) ORDER BY DATE_TRUNC('minute', M) LIMIT 10 OFFSET 1000; +SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY to_timestamp_seconds("EventTime") LIMIT 10; \ No newline at end of file diff --git a/datafusion/physical-plan/src/coalesce/mod.rs b/datafusion/physical-plan/src/coalesce/mod.rs index 46875fae94fc..ad31b343ffa7 100644 --- a/datafusion/physical-plan/src/coalesce/mod.rs +++ b/datafusion/physical-plan/src/coalesce/mod.rs @@ -114,6 +114,7 @@ impl BatchCoalescer { /// Push next batch, and returns [`CoalescerState`] indicating the current /// state of the buffer. pub fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState { + // println!("batch num_rows: {}", batch.num_rows()); let batch = gc_string_view_batch(&batch); if self.limit_reached(&batch) { CoalescerState::LimitReached @@ -124,6 +125,18 @@ impl BatchCoalescer { } } + /// Push next batch, and returns [`CoalescerState`] indicating the current + /// state of the buffer. + pub fn push_batch_without_gc(&mut self, batch: RecordBatch) -> CoalescerState { + if self.limit_reached(&batch) { + CoalescerState::LimitReached + } else if self.target_reached(batch) { + CoalescerState::TargetReached + } else { + CoalescerState::Continue + } + } + /// Return true if the there is no data buffered pub fn is_empty(&self) -> bool { self.buffer.is_empty() @@ -170,6 +183,7 @@ impl BatchCoalescer { /// Concatenates and returns all buffered batches, and clears the buffer. pub fn finish_batch(&mut self) -> datafusion_common::Result { + // println!("buffered_rows: {}", self.buffered_rows); let batch = concat_batches(&self.schema, &self.buffer)?; self.buffer.clear(); self.buffered_rows = 0; @@ -244,6 +258,7 @@ fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { // Re-creating the array copies data and can be time consuming. // We only do it if the array is sparse if actual_buffer_size > (ideal_buffer_size * 2) { + // println!("apply gc!"); // We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches. // See https://github.com/apache/arrow-rs/issues/6094 for more details. let mut builder = StringViewBuilder::with_capacity(s.len()); diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 11678e7a4696..49b1c3485706 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -271,7 +271,7 @@ impl Stream for CoalesceBatchesStream { /// - updated buffer: `{[4000]}` /// - next state: `Exhausted` #[derive(Debug, Clone, Eq, PartialEq)] -enum CoalesceBatchesStreamState { +pub enum CoalesceBatchesStreamState { /// State to pull a new batch from the input stream. Pull, /// State to return a buffered batch. @@ -295,16 +295,26 @@ impl CoalesceBatchesStream { let _timer = cloned_time.timer(); match input_batch { - Some(Ok(batch)) => match self.coalescer.push_batch(batch) { - CoalescerState::Continue => {} - CoalescerState::LimitReached => { - self.inner_state = CoalesceBatchesStreamState::Exhausted; + Some(Ok(batch)) => { + // let num_rows = batch.num_rows(); + // println!("num_rows: {:?}", num_rows); + + // if num_rows >= 8192 { + // return Poll::Ready(Some(Ok(batch))); + // } + + match self.coalescer.push_batch(batch) { + CoalescerState::Continue => {} + CoalescerState::LimitReached => { + self.inner_state = + CoalesceBatchesStreamState::Exhausted; + } + CoalescerState::TargetReached => { + self.inner_state = + CoalesceBatchesStreamState::ReturnBuffer; + } } - CoalescerState::TargetReached => { - self.inner_state = - CoalesceBatchesStreamState::ReturnBuffer; - } - }, + } None => { // End of input stream, but buffered batches might still be present. self.inner_state = CoalesceBatchesStreamState::Exhausted; diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 07898e8d22d8..ed1cb4827840 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +mod filter_coalesce; + use std::any::Any; use std::pin::Pin; use std::sync::Arc; @@ -24,6 +26,7 @@ use super::{ ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use crate::coalesce::{BatchCoalescer, CoalescerState}; use crate::common::can_project; use crate::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, @@ -31,8 +34,14 @@ use crate::{ }; use arrow::compute::filter_record_batch; -use arrow::datatypes::{DataType, SchemaRef}; +use arrow::datatypes::{ + BinaryViewType, DataType, Date32Type, Date64Type, Float32Type, Float64Type, + Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef, StringViewType, UInt16Type, + UInt32Type, UInt64Type, UInt8Type, +}; use arrow::record_batch::RecordBatch; +use arrow::row; +use arrow_array::{BooleanArray, RecordBatchOptions}; use datafusion_common::cast::as_boolean_array; use datafusion_common::stats::Precision; use datafusion_common::{ @@ -40,6 +49,7 @@ use datafusion_common::{ }; use datafusion_execution::TaskContext; use datafusion_expr::Operator; +use datafusion_physical_expr::binary_map::OutputType; use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::expressions::BinaryExpr; use datafusion_physical_expr::intervals::utils::check_support; @@ -47,11 +57,34 @@ use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ analyze, split_conjunction, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, }; +use filter_coalesce::{ + ByteFilterBuilder, ByteViewFilterBuilder, FilterBuilder, FilterCoalescer, + PrimitiveFilterBuilder, +}; use crate::execution_plan::CardinalityEffect; use futures::stream::{Stream, StreamExt}; use log::trace; +/// instantiates a [`PrimitiveFilterBuilder`] and pushes it into $v +/// +/// Arguments: +/// `$v`: the vector to push the new builder into +/// `$nullable`: whether the input can contains nulls +/// `$t`: the primitive type of the builder +/// +macro_rules! instantiate_primitive { + ($v:expr, $nullable:expr, $t:ty) => { + if $nullable { + let b = PrimitiveFilterBuilder::<$t, true>::new(); + $v.push(Box::new(b) as _) + } else { + let b = PrimitiveFilterBuilder::<$t, false>::new(); + $v.push(Box::new(b) as _) + } + }; +} + /// FilterExec evaluates a boolean predicate against all input batches to determine which rows to /// include in its output batches. #[derive(Debug, Clone)] @@ -355,12 +388,71 @@ impl ExecutionPlan for FilterExec { ) -> Result { trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + + let len = self.schema().fields().len(); + let mut v = Vec::with_capacity(len); + let schema = self.schema(); + // print!("types: {:?}", schema.fields().iter().map(|f|f.data_type()).collect::>()); + if filter_coalesce::supported_schema(&schema) { + let nullable = true; + for f in self.schema().fields().iter() { + match *f.data_type() { + DataType::Int8 => instantiate_primitive!(v, nullable, Int8Type), + DataType::Int16 => instantiate_primitive!(v, nullable, Int16Type), + DataType::Int32 => instantiate_primitive!(v, nullable, Int32Type), + DataType::Int64 => instantiate_primitive!(v, nullable, Int64Type), + DataType::UInt8 => instantiate_primitive!(v, nullable, UInt8Type), + DataType::UInt16 => instantiate_primitive!(v, nullable, UInt16Type), + DataType::UInt32 => instantiate_primitive!(v, nullable, UInt32Type), + DataType::UInt64 => instantiate_primitive!(v, nullable, UInt64Type), + DataType::Float32 => { + instantiate_primitive!(v, nullable, Float32Type) + } + DataType::Float64 => { + instantiate_primitive!(v, nullable, Float64Type) + } + DataType::Date32 => instantiate_primitive!(v, nullable, Date32Type), + DataType::Date64 => instantiate_primitive!(v, nullable, Date64Type), + DataType::Utf8 => { + let b = ByteFilterBuilder::::new(OutputType::Utf8); + v.push(Box::new(b) as _) + } + DataType::LargeUtf8 => { + let b = ByteFilterBuilder::::new(OutputType::Utf8); + v.push(Box::new(b) as _) + } + DataType::Binary => { + let b = ByteFilterBuilder::::new(OutputType::Binary); + v.push(Box::new(b) as _) + } + DataType::LargeBinary => { + let b = ByteFilterBuilder::::new(OutputType::Binary); + v.push(Box::new(b) as _) + } + DataType::Utf8View => { + let b = ByteViewFilterBuilder::::new(); + v.push(Box::new(b) as _) + } + DataType::BinaryView => { + let b = ByteViewFilterBuilder::::new(); + v.push(Box::new(b) as _) + } + _ => todo!(), + } + } + assert!(!v.is_empty()); + } else { + assert_eq!(v.len(), 0); + } + // println!("created filter builders: {:?}", v.len()); Ok(Box::pin(FilterExecStream { schema: self.schema(), predicate: Arc::clone(&self.predicate), input: self.input.execute(partition, context)?, baseline_metrics, projection: self.projection.clone(), + coalescer: BatchCoalescer::new(self.schema(), 8192, None), + filter_builder: v, })) } @@ -434,6 +526,12 @@ struct FilterExecStream { baseline_metrics: BaselineMetrics, /// The projection indices of the columns in the input schema projection: Option>, + /// Buffer for combining batches + coalescer: BatchCoalescer, + // The current inner state of the stream. This state dictates the current + // action or operation to be performed in the streaming process. + // inner_state: CoalesceBatchesStreamState, + filter_builder: Vec>, } pub fn batch_filter( @@ -476,6 +574,67 @@ fn filter_and_project( }) } +impl FilterExecStream { + pub fn filter_record_batch_v2( + &mut self, + record_batch: &RecordBatch, + predicate: &BooleanArray, + ) -> Result<()> { + let mut filter_builder = FilterBuilder::new(predicate); + if record_batch.num_columns() > 1 { + // Only optimize if filtering more than one column + // Otherwise, the overhead of optimization can be more than the benefit + filter_builder = filter_builder.optimize(); + } + let filter = filter_builder.build(); + + record_batch + .columns() + .iter() + .enumerate() + .map(|(i, a)| self.filter_builder[i].append_filtered_array(a, &filter)) + .collect::, _>>()?; + + let row_count = self.filter_builder[0].row_count(); + for b in &self.filter_builder { + debug_assert_eq!(b.row_count(), row_count); + } + + Ok(()) + } + + fn emit_if_possible(&mut self) -> Result> { + let row_count = self.filter_builder[0].row_count(); + // println!("row_count: {:?}", row_count); + if row_count >= 8192 { + // println!("emit early"); + return self.emit(); + } + + Ok(None) + } + + fn emit(&mut self) -> Result> { + let row_count = self.filter_builder[0].row_count(); + // println!("emit with row_count: {:?}", row_count); + + let filter_builder = std::mem::take(&mut self.filter_builder); + + let filtered_arrays = filter_builder + .into_iter() + .map(|b| b.build()) + .collect::>(); + + assert_eq!(self.filter_builder.len(), 0); + + let options = RecordBatchOptions::default().with_row_count(Some(row_count)); + // map arrow error to datafusion error + let r = + RecordBatch::try_new_with_options(self.schema(), filtered_arrays, &options)?; + Ok(Some(r)) + } +} + impl Stream for FilterExecStream { type Item = Result; @@ -487,14 +646,53 @@ impl Stream for FilterExecStream { loop { match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { - let timer = self.baseline_metrics.elapsed_compute().timer(); - let filtered_batch = filter_and_project( - &batch, - &self.predicate, - self.projection.as_ref(), - &self.schema, - )?; - timer.done(); + // TODO: add timer back, there is mutable borrow issue + // let timer = self.baseline_metrics.elapsed_compute().timer(); + + let array = self + .predicate + .evaluate(&batch) + .and_then(|v| v.into_array(batch.num_rows()))?; + let filtered_batch = + match (as_boolean_array(&array), self.projection.as_ref()) { + // Apply filter array to record batch + (Ok(filter_array), None) => { + let force_to_old = false; + if force_to_old || self.filter_builder.is_empty() { + Ok(Some(filter_record_batch(&batch, filter_array)?)) + } else { + self.filter_record_batch_v2(&batch, filter_array)?; + self.emit_if_possible() + } + } + (Ok(filter_array), Some(projection)) => { + let projected_columns = projection + .iter() + .map(|i| Arc::clone(batch.column(*i))) + .collect(); + let projected_batch = RecordBatch::try_new( + Arc::clone(&self.schema), + projected_columns, + )?; + let b = + filter_record_batch(&projected_batch, filter_array)?; + Ok(Some(b)) + } + (Err(_), _) => { + internal_err!( + "Cannot create filter_array from non-boolean predicates" + ) + } + }?; + + // timer.done(); + + if filtered_batch.is_none() { + continue; + } + + let filtered_batch = filtered_batch.unwrap(); + // Skip entirely filtered batches if filtered_batch.num_rows() == 0 { continue; @@ -503,7 +701,16 @@ impl Stream for FilterExecStream { break; } value => { - poll = Poll::Ready(value); + if self.filter_builder.is_empty() { + poll = Poll::Ready(value); + } else { + if let Some(rb) = self.emit()? { + poll = Poll::Ready(Some(Ok(rb))); + } else { + poll = Poll::Ready(value); + } + } + break; } } diff --git a/datafusion/physical-plan/src/filter/filter_coalesce.rs b/datafusion/physical-plan/src/filter/filter_coalesce.rs new file mode 100644 index 000000000000..b55689f76838 --- /dev/null +++ b/datafusion/physical-plan/src/filter/filter_coalesce.rs @@ -0,0 +1,396 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod bytes; +mod bytes_view; + +use std::sync::Arc; + +use arrow_schema::{DataType, Schema}; +pub(crate) use bytes::ByteFilterBuilder; +pub(crate) use bytes_view::ByteViewFilterBuilder; + +use arrow::{ + array::AsArray, + compute::{prep_null_mask_filter, SlicesIterator}, +}; +use arrow_array::{ + new_empty_array, Array, ArrayRef, ArrowPrimitiveType, BooleanArray, PrimitiveArray, +}; +use arrow_buffer::{bit_iterator::BitIndexIterator, ScalarBuffer}; +use datafusion_common::Result; +use datafusion_expr::sqlparser::keywords::NULL; + +use crate::null_builder::MaybeNullBufferBuilder; + +/// If the filter selects more than this fraction of rows, use +/// [`SlicesIterator`] to copy ranges of values. Otherwise iterate +/// over individual rows using [`IndexIterator`] +/// +/// Threshold of 0.8 chosen based on +/// +const FILTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8; + +/// The iteration strategy used to evaluate [`FilterPredicate`] +#[derive(Debug)] +enum IterationStrategy { + /// A lazily evaluated iterator of ranges + SlicesIterator, + /// A lazily evaluated iterator of indices + IndexIterator, + /// A precomputed list of indices + Indices(Vec), + /// A precomputed array of ranges + Slices(Vec<(usize, usize)>), + /// Select all rows + All, + /// Select no rows + None, +} + +impl IterationStrategy { + /// The default [`IterationStrategy`] for a filter of length `filter_length` + /// and selecting `filter_count` rows + fn default_strategy(filter_length: usize, filter_count: usize) -> Self { + if filter_length == 0 || filter_count == 0 { + return IterationStrategy::None; + } + + if filter_count == filter_length { + return IterationStrategy::All; + } + + // Compute the selectivity of the predicate by dividing the number of true + // bits in the predicate by the predicate's total length + // + // This can then be used as a heuristic for the optimal iteration strategy + let selectivity_frac = filter_count as f64 / filter_length as f64; + if selectivity_frac > FILTER_SLICES_SELECTIVITY_THRESHOLD { + return IterationStrategy::SlicesIterator; + } + IterationStrategy::IndexIterator + } +} + +/// A filtering predicate that can be applied to an [`Array`] +#[derive(Debug)] +pub struct FilterPredicate { + filter: BooleanArray, + count: usize, + strategy: IterationStrategy, +} + +impl FilterPredicate { + // /// Selects rows from `values` based on this [`FilterPredicate`] + // pub fn filter(&self, values: &dyn Array) -> Result { + // filter_array(values, self) + // } + + /// Number of rows being selected based on this [`FilterPredicate`] + pub fn count(&self) -> usize { + self.count + } +} + +pub trait FilterCoalescer: Send + Sync { + fn append_filtered_array( + &mut self, + array: &ArrayRef, + predicate: &FilterPredicate, + ) -> Result<()>; + + fn row_count(&self) -> usize; + fn build(self: Box) -> ArrayRef; +} + +#[derive(Debug)] +pub struct PrimitiveFilterBuilder { + filter_values: Vec, + nulls: MaybeNullBufferBuilder, +} + +impl PrimitiveFilterBuilder +where + T: ArrowPrimitiveType, +{ + pub fn new() -> Self { + Self { + filter_values: vec![], + nulls: MaybeNullBufferBuilder::new(), + } + } +} + +impl FilterCoalescer for PrimitiveFilterBuilder +where + T: ArrowPrimitiveType, +{ + fn append_filtered_array( + &mut self, + array: &ArrayRef, + predicate: &FilterPredicate, + ) -> Result<()> { + let arr = array.as_primitive::(); + let values = arr.values(); + + match &predicate.strategy { + IterationStrategy::SlicesIterator => { + if NULLABLE { + for (start, end) in SlicesIterator::new(&predicate.filter) { + for row in start..end { + if arr.is_valid(row) { + self.filter_values.push(values[row]); + self.nulls.append(false); + } else { + self.filter_values.push(T::default_value()); + self.nulls.append(true); + } + } + } + } else { + for (start, end) in SlicesIterator::new(&predicate.filter) { + self.filter_values.extend(&values[start..end]); + } + } + } + IterationStrategy::Slices(slices) => { + if NULLABLE { + for (start, end) in slices { + let start = *start; + let end = *end; + for row in start..end { + if arr.is_valid(row) { + self.filter_values.push(values[row]); + self.nulls.append(false); + } else { + self.filter_values.push(T::default_value()); + self.nulls.append(true); + } + } + } + } else { + for (start, end) in SlicesIterator::new(&predicate.filter) { + self.filter_values.extend(&values[start..end]); + } + } + } + IterationStrategy::IndexIterator => { + if NULLABLE { + for row in IndexIterator::new(&predicate.filter, predicate.count) { + if arr.is_valid(row) { + self.filter_values.push(values[row]); + self.nulls.append(false); + } else { + self.filter_values.push(T::default_value()); + self.nulls.append(true); + } + } + } else { + for row in IndexIterator::new(&predicate.filter, predicate.count) { + self.filter_values.push(values[row]); + } + } + } + IterationStrategy::Indices(indices) => { + if NULLABLE { + for row in indices { + if arr.is_valid(*row) { + self.filter_values.push(values[*row]); + self.nulls.append(false); + } else { + self.filter_values.push(T::default_value()); + self.nulls.append(true); + } + } + } else { + let iter = indices.iter().map(|x| values[*x]); + self.filter_values.extend(iter); + } + } + IterationStrategy::None => {} + IterationStrategy::All => { + for v in arr.iter() { + if let Some(v) = v { + self.filter_values.push(v); + self.nulls.append(false); + } else { + self.filter_values.push(T::default_value()); + self.nulls.append(true); + } + } + } + } + + Ok(()) + } + + fn row_count(&self) -> usize { + self.filter_values.len() + } + + fn build(self: Box) -> ArrayRef { + let Self { + filter_values, + nulls, + } = *self; + + let nulls = nulls.build(); + if !NULLABLE { + assert!(nulls.is_none(), "unexpected nulls in non nullable input"); + } + + Arc::new(PrimitiveArray::::new( + ScalarBuffer::from(filter_values), + nulls, + )) + } +} + +/// An iterator of `usize` whose index in [`BooleanArray`] is true +/// +/// This provides the best performance on most predicates, apart from those which keep +/// large runs and therefore favour [`SlicesIterator`] +struct IndexIterator<'a> { + remaining: usize, + iter: BitIndexIterator<'a>, +} + +impl<'a> IndexIterator<'a> { + fn new(filter: &'a BooleanArray, remaining: usize) -> Self { + assert_eq!(filter.null_count(), 0); + let iter = filter.values().set_indices(); + Self { remaining, iter } + } +} + +impl Iterator for IndexIterator<'_> { + type Item = usize; + + fn next(&mut self) -> Option { + if self.remaining != 0 { + // Fascinatingly swapping these two lines around results in a 50% + // performance regression for some benchmarks + let next = self.iter.next().expect("IndexIterator exhausted early"); + self.remaining -= 1; + // Must panic if exhausted early as trusted length iterator + return Some(next); + } + None + } + + fn size_hint(&self) -> (usize, Option) { + (self.remaining, Some(self.remaining)) + } +} + +/// Returns true if [`GroupValuesColumn`] supported for the specified schema +pub fn supported_schema(schema: &Schema) -> bool { + schema + .fields() + .iter() + .map(|f| f.data_type()) + .all(supported_type) +} + +/// Returns true if the specified data type is supported by [`GroupValuesColumn`] +/// +/// In order to be supported, there must be a specialized implementation of +/// [`GroupColumn`] for the data type, instantiated in [`GroupValuesColumn::intern`] +fn supported_type(data_type: &DataType) -> bool { + matches!( + *data_type, + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float32 + | DataType::Float64 + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Binary + | DataType::LargeBinary + | DataType::Date32 + | DataType::Date64 + | DataType::Utf8View + | DataType::BinaryView + ) +} + +/// A builder to construct [`FilterPredicate`] +#[derive(Debug)] +pub struct FilterBuilder { + filter: BooleanArray, + count: usize, + strategy: IterationStrategy, +} + +impl FilterBuilder { + /// Create a new [`FilterBuilder`] that can be used to construct a [`FilterPredicate`] + pub fn new(filter: &BooleanArray) -> Self { + let filter = match filter.null_count() { + 0 => filter.clone(), + _ => prep_null_mask_filter(filter), + }; + + let count = filter_count(&filter); + let strategy = IterationStrategy::default_strategy(filter.len(), count); + + Self { + filter, + count, + strategy, + } + } + + /// Compute an optimised representation of the provided `filter` mask that can be + /// applied to an array more quickly. + /// + /// Note: There is limited benefit to calling this to then filter a single array + /// Note: This will likely have a larger memory footprint than the original mask + pub fn optimize(mut self) -> Self { + match self.strategy { + IterationStrategy::SlicesIterator => { + let slices = SlicesIterator::new(&self.filter).collect(); + self.strategy = IterationStrategy::Slices(slices) + } + IterationStrategy::IndexIterator => { + let indices = IndexIterator::new(&self.filter, self.count).collect(); + self.strategy = IterationStrategy::Indices(indices) + } + _ => {} + } + self + } + + /// Construct the final `FilterPredicate` + pub fn build(self) -> FilterPredicate { + FilterPredicate { + filter: self.filter, + count: self.count, + strategy: self.strategy, + } + } +} + +/// Counts the number of set bits in `filter` +fn filter_count(filter: &BooleanArray) -> usize { + filter.values().count_set_bits() +} diff --git a/datafusion/physical-plan/src/filter/filter_coalesce/bytes.rs b/datafusion/physical-plan/src/filter/filter_coalesce/bytes.rs new file mode 100644 index 000000000000..a55200567b35 --- /dev/null +++ b/datafusion/physical-plan/src/filter/filter_coalesce/bytes.rs @@ -0,0 +1,424 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::{ + array::{ArrayDataBuilder, AsArray}, + compute::SlicesIterator, + datatypes::{ByteArrayType, GenericBinaryType, GenericStringType}, +}; +use arrow_array::{ + Array, ArrayRef, GenericBinaryArray, GenericByteArray, GenericStringArray, + OffsetSizeTrait, +}; +use arrow_buffer::{ + bit_util, ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, Buffer, + BufferBuilder, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer, +}; +use arrow_schema::DataType; +use datafusion_common::Result; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls; +use datafusion_physical_expr::binary_map::OutputType; +use datafusion_physical_expr_common::binary_map::INITIAL_BUFFER_CAPACITY; + +use super::{ + FilterCoalescer, FilterPredicate, IndexIterator, IterationStrategy, + MaybeNullBufferBuilder, +}; + +pub struct ByteFilterBuilder +where + O: OffsetSizeTrait, +{ + output_type: OutputType, + buffer: BufferBuilder, + /// Offsets into `buffer` for each distinct value. These offsets as used + /// directly to create the final `GenericBinaryArray`. The `i`th string is + /// stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null values + /// are stored as a zero length string. + offsets: Vec, + nulls: MaybeNullBufferBuilder, +} + +impl ByteFilterBuilder +where + O: OffsetSizeTrait, +{ + pub fn new(output_type: OutputType) -> Self { + Self { + output_type, + buffer: BufferBuilder::new(INITIAL_BUFFER_CAPACITY), + offsets: vec![O::default()], + nulls: MaybeNullBufferBuilder::new(), + } + } + + fn append_filtered_array( + &mut self, + array: &ArrayRef, + predicate: &FilterPredicate, + ) -> Result<()> + where + B: ByteArrayType, + { + let arr = array.as_bytes::(); + match &predicate.strategy { + IterationStrategy::SlicesIterator => { + for (start, end) in SlicesIterator::new(&predicate.filter) { + for row in start..end { + if arr.is_null(row) { + self.nulls.append(true); + // nulls need a zero length in the offset buffer + let offset = self.buffer.len(); + self.offsets.push(O::usize_as(offset)); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row); + } + } + } + } + IterationStrategy::Slices(slices) => { + for (start, end) in slices { + for row in *start..*end { + if arr.is_null(row) { + self.nulls.append(true); + // nulls need a zero length in the offset buffer + let offset = self.buffer.len(); + self.offsets.push(O::usize_as(offset)); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row); + } + } + } + } + IterationStrategy::IndexIterator => { + for row in IndexIterator::new(&predicate.filter, predicate.count) { + if arr.is_null(row) { + self.nulls.append(true); + // nulls need a zero length in the offset buffer + let offset = self.buffer.len(); + self.offsets.push(O::usize_as(offset)); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row); + } + } + } + IterationStrategy::Indices(indices) => { + for row in indices.iter() { + let row = *row; + if arr.is_null(row) { + self.nulls.append(true); + // nulls need a zero length in the offset buffer + let offset = self.buffer.len(); + self.offsets.push(O::usize_as(offset)); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row); + } + } + } + IterationStrategy::None => {} + IterationStrategy::All => { + for row in 0..arr.len() { + if arr.is_null(row) { + self.nulls.append(true); + // nulls need a zero length in the offset buffer + let offset = self.buffer.len(); + self.offsets.push(O::usize_as(offset)); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row); + } + } + } + } + + Ok(()) + } + + fn do_append_val_inner(&mut self, array: &GenericByteArray, row: usize) + where + B: ByteArrayType, + { + let value: &[u8] = array.value(row).as_ref(); + self.buffer.append_slice(value); + self.offsets.push(O::usize_as(self.buffer.len())); + } +} + +impl FilterCoalescer for ByteFilterBuilder +where + O: OffsetSizeTrait, +{ + fn append_filtered_array( + &mut self, + array: &ArrayRef, + predicate: &FilterPredicate, + ) -> Result<()> { + // Sanity array type + match self.output_type { + OutputType::Binary => { + debug_assert!(matches!( + array.data_type(), + DataType::Binary | DataType::LargeBinary + )); + self.append_filtered_array::>(array, predicate) + } + OutputType::Utf8 => { + debug_assert!(matches!( + array.data_type(), + DataType::Utf8 | DataType::LargeUtf8 + )); + self.append_filtered_array::>(array, predicate) + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + } + } + + fn row_count(&self) -> usize { + self.offsets.len() - 1 + } + + fn build(self: Box) -> ArrayRef { + let Self { + output_type, + mut buffer, + offsets, + nulls, + } = *self; + + let null_buffer = nulls.build(); + + // SAFETY: the offsets were constructed correctly in `insert_if_new` -- + // monotonically increasing, overflows were checked. + let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets)) }; + let values = buffer.finish(); + match output_type { + OutputType::Binary => { + // SAFETY: the offsets were constructed correctly + Arc::new(unsafe { + GenericBinaryArray::new_unchecked(offsets, values, null_buffer) + }) + } + OutputType::Utf8 => { + // SAFETY: + // 1. the offsets were constructed safely + // + // 2. the input arrays were all the correct type and thus since + // all the values that went in were valid (e.g. utf8) so are all + // the values that come out + Arc::new(unsafe { + GenericStringArray::new_unchecked(offsets, values, null_buffer) + }) + } + _ => unreachable!("View types should use `ArrowBytesViewMap`"), + } + } +} + +/// [`FilterBytes`] is created from a source [`GenericByteArray`] and can be +/// used to build a new [`GenericByteArray`] by copying values from the source +/// +/// TODO(raphael): Could this be used for the take kernel as well? +struct FilterBytes<'a, OffsetSize> { + src_offsets: &'a [OffsetSize], + src_values: &'a [u8], + dst_offsets: MutableBuffer, + dst_values: MutableBuffer, + cur_offset: OffsetSize, +} + +impl<'a, OffsetSize> FilterBytes<'a, OffsetSize> +where + OffsetSize: OffsetSizeTrait, +{ + fn new(capacity: usize, array: &'a GenericByteArray) -> Self + where + T: ByteArrayType, + { + let num_offsets_bytes = (capacity + 1) * size_of::(); + let mut dst_offsets = MutableBuffer::new(num_offsets_bytes); + let dst_values = MutableBuffer::new(0); + let cur_offset = OffsetSize::from_usize(0).unwrap(); + dst_offsets.push(cur_offset); + + Self { + src_offsets: array.value_offsets(), + src_values: array.value_data(), + dst_offsets, + dst_values, + cur_offset, + } + } + + /// Returns the byte offset at `idx` + #[inline] + fn get_value_offset(&self, idx: usize) -> usize { + self.src_offsets[idx].as_usize() + } + + /// Returns the start and end of the value at index `idx` along with its length + #[inline] + fn get_value_range(&self, idx: usize) -> (usize, usize, OffsetSize) { + // These can only fail if `array` contains invalid data + let start = self.get_value_offset(idx); + let end = self.get_value_offset(idx + 1); + let len = OffsetSize::from_usize(end - start).expect("illegal offset range"); + (start, end, len) + } + + /// Extends the in-progress array by the indexes in the provided iterator + fn extend_idx(&mut self, iter: impl Iterator) { + for idx in iter { + let (start, end, len) = self.get_value_range(idx); + self.cur_offset += len; + self.dst_offsets.push(self.cur_offset); + self.dst_values + .extend_from_slice(&self.src_values[start..end]); + } + } + + /// Extends the in-progress array by the ranges in the provided iterator + fn extend_slices(&mut self, iter: impl Iterator) { + for (start, end) in iter { + // These can only fail if `array` contains invalid data + for idx in start..end { + let (_, _, len) = self.get_value_range(idx); + self.cur_offset += len; + self.dst_offsets.push(self.cur_offset); // push_unchecked? + } + + let value_start = self.get_value_offset(start); + let value_end = self.get_value_offset(end); + self.dst_values + .extend_from_slice(&self.src_values[value_start..value_end]); + } + } +} + +/// `filter` implementation for byte arrays +/// +/// Note: NULLs with a non-zero slot length in `array` will have the corresponding +/// data copied across. This allows handling the null mask separately from the data +fn filter_bytes( + array: &GenericByteArray, + predicate: &FilterPredicate, +) -> GenericByteArray +where + T: ByteArrayType, +{ + let src_offsets = array.value_offsets(); + let src_values = array.value_data(); + + let mut filter = FilterBytes::new(predicate.count, array); + + match &predicate.strategy { + IterationStrategy::SlicesIterator => { + filter.extend_slices(SlicesIterator::new(&predicate.filter)) + } + IterationStrategy::Slices(slices) => filter.extend_slices(slices.iter().cloned()), + IterationStrategy::IndexIterator => { + filter.extend_idx(IndexIterator::new(&predicate.filter, predicate.count)) + } + IterationStrategy::Indices(indices) => filter.extend_idx(indices.iter().cloned()), + IterationStrategy::All | IterationStrategy::None => unreachable!(), + } + + let mut builder = ArrayDataBuilder::new(T::DATA_TYPE) + .len(predicate.count) + .add_buffer(filter.dst_offsets.into()) + .add_buffer(filter.dst_values.into()); + + if let Some((null_count, nulls)) = filter_null_mask(array.nulls(), predicate) { + builder = builder.null_count(null_count).null_bit_buffer(Some(nulls)); + } + + let data = unsafe { builder.build_unchecked() }; + GenericByteArray::from(data) +} + +/// Computes a new null mask for `data` based on `predicate` +/// +/// If the predicate selected no null-rows, returns `None`, otherwise returns +/// `Some((null_count, null_buffer))` where `null_count` is the number of nulls +/// in the filtered output, and `null_buffer` is the filtered null buffer +/// +fn filter_null_mask( + nulls: Option<&NullBuffer>, + predicate: &FilterPredicate, +) -> Option<(usize, Buffer)> { + let nulls = nulls?; + if nulls.null_count() == 0 { + return None; + } + + let nulls = filter_bits(nulls.inner(), predicate); + // The filtered `nulls` has a length of `predicate.count` bits and + // therefore the null count is this minus the number of valid bits + let null_count = predicate.count - nulls.count_set_bits_offset(0, predicate.count); + + if null_count == 0 { + return None; + } + + Some((null_count, nulls)) +} + +// Filter the packed bitmask `buffer`, with `predicate` starting at bit offset `offset` +fn filter_bits(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer { + let src = buffer.values(); + let offset = buffer.offset(); + + match &predicate.strategy { + IterationStrategy::IndexIterator => { + let bits = IndexIterator::new(&predicate.filter, predicate.count) + .map(|src_idx| bit_util::get_bit(src, src_idx + offset)); + + // SAFETY: `IndexIterator` reports its size correctly + unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() } + } + IterationStrategy::Indices(indices) => { + let bits = indices + .iter() + .map(|src_idx| bit_util::get_bit(src, *src_idx + offset)); + + // SAFETY: `Vec::iter()` reports its size correctly + unsafe { MutableBuffer::from_trusted_len_iter_bool(bits).into() } + } + IterationStrategy::SlicesIterator => { + let mut builder = + BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8)); + for (start, end) in SlicesIterator::new(&predicate.filter) { + builder.append_packed_range(start + offset..end + offset, src) + } + builder.into() + } + IterationStrategy::Slices(slices) => { + let mut builder = + BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8)); + for (start, end) in slices { + builder.append_packed_range(*start + offset..*end + offset, src) + } + builder.into() + } + IterationStrategy::All | IterationStrategy::None => unreachable!(), + } +} diff --git a/datafusion/physical-plan/src/filter/filter_coalesce/bytes_view.rs b/datafusion/physical-plan/src/filter/filter_coalesce/bytes_view.rs new file mode 100644 index 000000000000..2aebbfefdca7 --- /dev/null +++ b/datafusion/physical-plan/src/filter/filter_coalesce/bytes_view.rs @@ -0,0 +1,343 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{marker::PhantomData, sync::Arc}; + +use arrow::{ + array::{AsArray, ByteView}, + compute::SlicesIterator, + datatypes::ByteViewType, +}; +use arrow_array::{Array, ArrayRef, GenericByteViewArray}; +use arrow_buffer::{Buffer, ScalarBuffer}; +use datafusion_common::Result; + +use super::{ + FilterCoalescer, FilterPredicate, IndexIterator, IterationStrategy, + MaybeNullBufferBuilder, +}; + +const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024; + +pub struct ByteViewFilterBuilder { + /// The views of string values + /// + /// If string len <= 12, the view's format will be: + /// string(12B) | len(4B) + /// + /// If string len > 12, its format will be: + /// offset(4B) | buffer_index(4B) | prefix(4B) | len(4B) + views: Vec, + + /// The progressing block + /// + /// New values will be inserted into it until its capacity + /// is not enough(detail can see `max_block_size`). + in_progress: Vec, + + /// The completed blocks + completed: Vec, + + /// The max size of `in_progress` + /// + /// `in_progress` will be flushed into `completed`, and create new `in_progress` + /// when found its remaining capacity(`max_block_size` - `len(in_progress)`), + /// is no enough to store the appended value. + /// + /// Currently it is fixed at 2MB. + max_block_size: usize, + + /// Nulls + nulls: MaybeNullBufferBuilder, + + /// phantom data so the type requires `` + _phantom: PhantomData, +} + +impl ByteViewFilterBuilder { + pub fn new() -> Self { + Self { + views: Vec::new(), + in_progress: Vec::new(), + completed: Vec::new(), + max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE, + nulls: MaybeNullBufferBuilder::new(), + _phantom: PhantomData {}, + } + } + + fn do_append_val_inner( + &mut self, + array: &GenericByteViewArray, + row: usize, + view: u128, + ) where + B: ByteViewType, + { + let value_len = view as u32; + if value_len <= 12 { + self.views.push(view); + } else { + // Ensure big enough block to hold the value first + self.ensure_in_progress_big_enough(value_len as usize); + + // Append value + let buffer_index = self.completed.len(); + let offset = self.in_progress.len(); + self.in_progress + .extend_from_slice(array.value(row).as_ref()); + + let view = make_view_for_non_inline_string( + array.value(row).as_ref(), + buffer_index as u32, + offset as u32, + ); + self.views.push(view); + } + } + + fn ensure_in_progress_big_enough(&mut self, value_len: usize) { + debug_assert!(value_len > 12); + let require_cap = self.in_progress.len() + value_len; + + // If current block isn't big enough, flush it and create a new in progress block + if require_cap > self.max_block_size { + let flushed_block = std::mem::replace( + &mut self.in_progress, + Vec::with_capacity(self.max_block_size), + ); + let buffer = Buffer::from_vec(flushed_block); + self.completed.push(buffer); + } + } + + // append if all the rows are null + fn append_nulls(&mut self, arr_len: usize, predicate: &FilterPredicate) { + match &predicate.strategy { + IterationStrategy::SlicesIterator => { + for (start, end) in SlicesIterator::new(&predicate.filter) { + self.nulls.append_n(end - start, true); + self.views.extend(std::iter::repeat(0).take(end - start)); + } + } + IterationStrategy::Slices(slices) => { + for (start, end) in slices { + let len = *end - *start; + self.nulls.append_n(len, true); + self.views.extend(std::iter::repeat(0).take(len)); + } + } + IterationStrategy::IndexIterator => { + self.nulls.append_n(predicate.count, true); + self.views + .extend(std::iter::repeat(0).take(predicate.count)); + } + IterationStrategy::Indices(indices) => { + self.nulls.append_n(indices.len(), true); + self.views.extend(std::iter::repeat(0).take(indices.len())); + } + IterationStrategy::None => {} + IterationStrategy::All => { + self.nulls.append_n(arr_len, true); + self.views.extend(std::iter::repeat(0).take(arr_len)); + } + } + } + + // append if all the rows are non null + fn append_non_nulls( + &mut self, + arr: &GenericByteViewArray, + arr_len: usize, + predicate: &FilterPredicate, + ) { + let views = arr.views(); + match &predicate.strategy { + IterationStrategy::SlicesIterator => { + for (start, end) in SlicesIterator::new(&predicate.filter) { + self.nulls.append_n(end - start, false); + for row in start..end { + self.do_append_val_inner(arr, row, views[row]); + } + } + } + IterationStrategy::Slices(slices) => { + for (start, end) in slices { + let len = *end - *start; + self.nulls.append_n(len, false); + for row in *start..*end { + self.do_append_val_inner(arr, row, views[row]); + } + } + } + IterationStrategy::IndexIterator => { + self.nulls.append_n(predicate.count, false); + for row in IndexIterator::new(&predicate.filter, predicate.count) { + self.do_append_val_inner(arr, row, views[row]); + } + } + IterationStrategy::Indices(indices) => { + self.nulls.append_n(indices.len(), false); + for row in indices.iter() { + self.do_append_val_inner(arr, *row, views[*row]); + } + } + IterationStrategy::None => {} + IterationStrategy::All => { + self.nulls.append_n(arr_len, false); + for row in 0..arr_len { + self.do_append_val_inner(arr, row, views[row]); + } + } + } + } +} + +impl FilterCoalescer for ByteViewFilterBuilder { + fn append_filtered_array( + &mut self, + array: &ArrayRef, + predicate: &FilterPredicate, + ) -> Result<()> { + let arr = array.as_byte_view::(); + let views = arr.views(); + if arr.null_count() == arr.len() { + self.append_nulls(arr.len(), predicate); + return Ok(()); + } + if arr.null_count() == 0 { + self.append_non_nulls(arr, arr.len(), predicate); + return Ok(()); + } + + match &predicate.strategy { + IterationStrategy::SlicesIterator => { + for (start, end) in SlicesIterator::new(&predicate.filter) { + for row in start..end { + if arr.is_null(row) { + self.nulls.append(true); + self.views.push(0); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row, views[row]); + } + } + } + } + IterationStrategy::Slices(slices) => { + for (start, end) in slices { + for row in *start..*end { + if arr.is_null(row) { + self.nulls.append(true); + self.views.push(0); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row, views[row]); + } + } + } + } + IterationStrategy::IndexIterator => { + for row in IndexIterator::new(&predicate.filter, predicate.count) { + if arr.is_null(row) { + self.nulls.append(true); + self.views.push(0); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row, views[row]); + } + } + } + IterationStrategy::Indices(indices) => { + for row in indices.iter() { + let row = *row; + if arr.is_null(row) { + self.nulls.append(true); + self.views.push(0); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row, views[row]); + } + } + } + IterationStrategy::None => {} + IterationStrategy::All => { + for row in 0..arr.len() { + if arr.is_null(row) { + self.nulls.append(true); + self.views.push(0); + } else { + self.nulls.append(false); + self.do_append_val_inner(arr, row, views[row]); + } + } + } + } + + Ok(()) + } + + fn row_count(&self) -> usize { + self.views.len() + } + + fn build(self: Box) -> ArrayRef { + let Self { + views, + in_progress, + mut completed, + nulls, + .. + } = *self; + + // Build nulls + let null_buffer = nulls.build(); + + // Build values + // Flush `in_process` firstly + if !in_progress.is_empty() { + let buffer = Buffer::from(in_progress); + completed.push(buffer); + } + + let views = ScalarBuffer::from(views); + + // Safety: + // * all views were correctly made + // * (if utf8): Input was valid Utf8 so buffer contents are + // valid utf8 as well + unsafe { + Arc::new(GenericByteViewArray::::new_unchecked( + views, + completed, + null_buffer, + )) + } + } +} + +#[inline(never)] +pub fn make_view_for_non_inline_string(data: &[u8], block_id: u32, offset: u32) -> u128 { + let len = data.len(); + let view = ByteView { + length: len as u32, + prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()), + buffer_index: block_id, + offset, + }; + view.as_u128() +} diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 845a74eaea48..de8485f6169c 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -66,6 +66,7 @@ pub mod joins; pub mod limit; pub mod memory; pub mod metrics; +mod null_builder; pub mod placeholder_row; pub mod projection; pub mod recursive_query; diff --git a/datafusion/physical-plan/src/null_builder.rs b/datafusion/physical-plan/src/null_builder.rs new file mode 100644 index 000000000000..a584cf58e50a --- /dev/null +++ b/datafusion/physical-plan/src/null_builder.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; + +/// Builder for an (optional) null mask +/// +/// Optimized for avoid creating the bitmask when all values are non-null +#[derive(Debug)] +pub(crate) enum MaybeNullBufferBuilder { + /// seen `row_count` rows but no nulls yet + NoNulls { row_count: usize }, + /// have at least one null value + /// + /// Note this is an Arrow *VALIDITY* buffer (so it is false for nulls, true + /// for non-nulls) + Nulls(BooleanBufferBuilder), +} + +impl MaybeNullBufferBuilder { + /// Create a new builder + pub fn new() -> Self { + Self::NoNulls { row_count: 0 } + } + + /// Return true if the row at index `row` is null + pub fn is_null(&self, row: usize) -> bool { + match self { + Self::NoNulls { .. } => false, + // validity mask means a unset bit is NULL + Self::Nulls(builder) => !builder.get_bit(row), + } + } + + /// Set the nullness of the next row to `is_null` + /// + /// num_values is the current length of the rows being tracked + /// + /// If `value` is true, the row is null. + /// If `value` is false, the row is non null + pub fn append(&mut self, is_null: bool) { + match self { + Self::NoNulls { row_count } if is_null => { + // have seen no nulls so far, this is the first null, + // need to create the nulls buffer for all currently valid values + // alloc 2x the need given we push a new but immediately + let mut nulls = BooleanBufferBuilder::new(*row_count * 2); + nulls.append_n(*row_count, true); + nulls.append(false); + *self = Self::Nulls(nulls); + } + Self::NoNulls { row_count } => { + *row_count += 1; + } + Self::Nulls(builder) => builder.append(!is_null), + } + } + + pub fn append_n(&mut self, n: usize, is_null: bool) { + match self { + Self::NoNulls { row_count } if is_null => { + // have seen no nulls so far, this is the first null, + // need to create the nulls buffer for all currently valid values + // alloc 2x the need given we push a new but immediately + let mut nulls = BooleanBufferBuilder::new(*row_count * 2); + nulls.append_n(*row_count, true); + nulls.append_n(n, false); + *self = Self::Nulls(nulls); + } + Self::NoNulls { row_count } => { + *row_count += n; + } + Self::Nulls(builder) => builder.append_n(n, !is_null), + } + } + + /// return the number of heap allocated bytes used by this structure to store boolean values + pub fn allocated_size(&self) -> usize { + match self { + Self::NoNulls { .. } => 0, + // BooleanBufferBuilder builder::capacity returns capacity in bits (not bytes) + Self::Nulls(builder) => builder.capacity() / 8, + } + } + + /// Return a NullBuffer representing the accumulated nulls so far + pub fn build(self) -> Option { + match self { + Self::NoNulls { .. } => None, + Self::Nulls(mut builder) => Some(NullBuffer::from(builder.finish())), + } + } + + /// Returns a NullBuffer representing the first `n` rows accumulated so far + /// shifting any remaining down by `n` + pub fn take_n(&mut self, n: usize) -> Option { + match self { + Self::NoNulls { row_count } => { + *row_count -= n; + None + } + Self::Nulls(builder) => { + // Copy over the values at n..len-1 values to the start of a + // new builder and leave it in self + // + // TODO: it would be great to use something like `set_bits` from arrow here. + let mut new_builder = BooleanBufferBuilder::new(builder.len()); + for i in n..builder.len() { + new_builder.append(builder.get_bit(i)); + } + std::mem::swap(&mut new_builder, builder); + + // take only first n values from the original builder + new_builder.truncate(n); + Some(NullBuffer::from(new_builder.finish())) + } + } + } +} diff --git a/datafusion/sqllogictest/test_files/test1.slt b/datafusion/sqllogictest/test_files/test1.slt new file mode 100644 index 000000000000..d4df2c24edf0 --- /dev/null +++ b/datafusion/sqllogictest/test_files/test1.slt @@ -0,0 +1,32 @@ +statement ok +CREATE EXTERNAL TABLE hits +STORED AS PARQUET +LOCATION '../../benchmarks/data/hits.parquet'; + +query IITIIIIIIIIIITTIIIIIIIIIITIIITIIIITTIIITIIIIIIIIIITIIIIITIIIIIITIIIIIIIIIITTTTIIIIIIIITITTITTTTTTTTTTIIII +SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY to_timestamp_seconds("EventTime") LIMIT 10; +---- +7675678523794456216 1 Glavnaya gorand. Цветные объявлений районе, вером 1 1372708869 15888 64469 1840073959 2 3714843517822510735 0 44 5 http://e96.ru/search/page.googleTBR%26ad%3D0%26rnd%3D158197%26anbietersburg http://bdsmpeople.ru/obrazom_position/?page 0 13593 158 13606 216 1638 1658 22 15 7 700 0 0 22 nA 1 1 0 0 (empty) (empty) 4005373 -1 0 (empty) 0 0 1052 775 135 1372760422 0 0 0 0 windows 1601 0 0 0 0 (empty) 213893614 0 0 0 0 0 6 1372766291 0 0 0 0 0 1412515749 63522 -1 12 S0 � (empty) (empty) 0 0 0 0 445 1234 0 0 (empty) 0 (empty) NH 0 (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) 0 5972490271588207794 1369713899219085694 0 +6147260061318473746 1 Glavnaya gorand. Цветные объявлений районе, вером 1 1372708889 15888 64469 1840073959 2 3714843517822510735 0 44 5 http://e96.ru/search/page.googleTBR%26ad%3D0%26rnd%3D158197%26anbietersburg http://bdsmpeople.ru/obrazom_position/?page 0 13593 158 13606 216 1638 1658 22 15 7 700 0 0 22 D� 1 1 0 0 (empty) (empty) 4005373 -1 0 (empty) 0 0 1052 775 135 1372760442 0 0 0 0 windows 1601 0 0 0 0 (empty) 810892658 0 0 0 0 0 6 1372766305 0 0 0 0 0 1412515749 63522 -1 13 S0 � (empty) (empty) 0 0 0 0 0 16 0 0 (empty) 0 (empty) NH 0 (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) 0 5972490271588207794 1369713899219085694 0 +5972689683963797854 1 Glavnaya gorand. Цветные объявлений районе, вером 1 1372708931 15888 64469 1840073959 2 3714843517822510735 0 44 5 http://e96.ru/search/page.googleTBR%26ad%3D0%26rnd%3D158197%26anbietersburg http://bdsmpeople.ru/obrazom_position/?page 0 13593 158 13606 216 1638 1658 22 15 7 700 0 0 22 D� 1 1 0 0 (empty) (empty) 4005373 -1 0 (empty) 0 0 1052 775 135 1372760476 0 0 0 0 windows 1601 0 0 0 0 (empty) 47015096 0 0 0 0 0 6 1372766330 0 0 0 0 0 1412515749 63522 -1 13 S0 h1 (empty) (empty) 0 0 0 0 0 0 0 0 (empty) 0 (empty) NH 0 (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) 0 5972490271588207794 1369713899219085694 0 +8008688361303225116 1 Скачать онлайн играй! - Туризма - Крымский тренчкоты в интернет магазин Wildberries.ru (Работа - IRR.ru - модных словариумных 1 1372709521 15888 63217 1975817788 229 804133623150786791 1 44 7 http://bjdswaps.google-photo http://loveche.html?ctid 0 12409 20 10093 22 1749 867 23 15 3 700.224 0 0 15 D� 1 1 0 0 (empty) (empty) 3056753 -1 0 (empty) 0 0 1608 662 135 1372745995 4 1 16561 0 windows 1 0 0 0 5347008031302181363 (empty) 766531830 0 0 0 0 0 5 1372766425 31 1 3 11237 31 1870660671 -1 -1 -1 E3 _i (empty) (empty) 0 0 0 0 0 0 0 0 (empty) 0 (empty) NH 0 (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) 0 -2736470446903004689 7116991598850737408 0 +7436461208655480623 1 Компании Вино в хорошие 1 1372710744 15888 35534 1741555497 39 7082047337377160280 0 44 5 http://rsdn.ru/catalog/cifrovye-advertisement=little&category=22&input_bdsmpeople.ru/index,google.ru/news/39826 http://kalina?block/?inst_the_book.php?cPath=40_57470493958402/ 0 10634 20 0 0 1996 1781 37 15 7 700 0 0 22 D� 1 1 0 0 (empty) (empty) 808950 5 0 (empty) 0 0 1261 1017 433 1372791792 4 1 16561 0 windows-1251;charset 1601 1 0 0 6804199628189316872 (empty) 626511463 0 0 0 1 0 5 1372732542 31 2 3 694 57 1448806868 -1 -1 -1 E3 _i (empty) (empty) 0 0 0 3 345 147 239 0 (empty) 0 (empty) NH 0 (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) 0 8931346360692564721 1971436513446935197 0 +5564518777317455184 0 «set» в пробег аппах и обслуживатизиров 1 1372711469 15888 5822 1920787234 32 3712346975274085073 1 2 3 http://auto_gruppy/christikha/hotel=-1&trafkey=605&from=&power_name=Платья&produkty%2Furl.google.ru/index http://rmnt.ru/cars/passenger/hellardous/42/~37/?suggest&id=3869551753&custom=0&undefined/undefined/under=28036,5;362;108;16762643539 0 8563 21482 9822 18528 1638 1658 23 15 7 700 0 0 16 D� 1 1 0 0 (empty) (empty) 207348 -1 0 (empty) 0 0 1509 733 135 1372750392 4 1 15738 0 windows-1251;charset 1 0 0 0 8007561756096276896 (empty) 1034507462 0 0 0 0 0 5 1372759436 0 0 0 0 0 2016848722 -1 -1 -1 S0 h1 (empty) (empty) 0 0 0 0 0 0 0 0 (empty) 0 (empty) NH 0 (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) 0 7820066807630413322 -3258566084785303139 0 +7381524648140977766 1 Ploshchad' stolitsi zwy 110911923, Г официальная Прессы и Огонек 1 1372712506 15888 63217 1638850281 59 1564939829982760596 1 44 5 http://bjdleaksbrand=bpc bonprix%2F12.02&he=1024&location=pm;f=inbox;pmsg_1733/page.google http://loveplanet.ru/url?sa=t&rct 0 12409 20 10093 22 1996 1666 37 15 7 700 0 0 22 D� 1 1 0 0 (empty) (empty) 2059788 -1 0 (empty) 0 0 1261 1206 433 1372759478 0 0 0 0 windows 1 0 0 0 0 (empty) 827020970 0 0 0 0 0 5 1372791236 0 0 0 0 0 2001459352 -1 -1 -1 S0 � (empty) (empty) 0 0 0 0 0 0 0 0 (empty) 0 (empty) NH 0 (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) 0 1384301141639030267 9047048983006699504 0 +8614832219462424183 1 Модель для сумки - регеш (Россия) 1 1372713563 15888 3035 2022088895 38 2590751384199385434 1 2 88 http://smeshariki.ru/googleTBR%26ar_ntype=citykurortmag http://holodilnik.ru/GameMain.aspx?color=0&choos&source=web&cd 0 10271 158 13384 216 1917 879 37 15 7 700 0 0 1 D� 1 1 0 0 (empty) (empty) 3991944 -1 0 (empty) 0 0 746 464 322 1372754670 0 0 0 0 windows-1251;charset 1 0 0 0 7607749204513951316 (empty) 427646581 0 0 0 0 0 5 1372720757 50 2 3 0 30 1146019198 -1 -1 -1 S0 � (empty) (empty) 0 0 0 0 0 0 0 0 (empty) 0 (empty) NH 0 (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) 0 1157471009075867478 1000799766482180932 0 +7168314068394418899 1 по полиция +опытовой рецензии, 1 1372713760 15888 35534 -1420082055 11579 4822773326251181180 0 2 7 http:%2F%2Fsapozhki-advertime-2/#page.google/dodge http://saint-peters-total=меньше 100007&text=b.akhua_deckaya-look/time-2/#page=3&oprnd=6817922197946&ei=JtHTUYWRCqXA&bvm=bv.49784469,d.ZWU&cad=rjt&fu=0&type_id=172&msid=1&marka=88&text=krasnaia-moda 0 14550 952 8565 375 1304 978 37 15 4 700.224 2 7 13 D� 1 1 0 0 (empty) (empty) 2675432 3 3 dave kino 2013 года в ростопримеча 0 0 1972 778 135 1372712707 4 1 16561 0 windows 1601 0 0 0 6494516778257365839 (empty) 393719418 0 0 0 0 0 5 1372782490 31 2 2 14851 1 -1016483843 61823 -1 1 S0 � (empty) (empty) 0 0 0 0 0 0 0 0 (empty) 0 (empty) NH 0 (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) 0 -4470345086215748575 -5322637665780806659 0 +8235569889353442646 1 (empty) 1 1372713773 15888 35534 -1420082055 11579 4822773326251181180 0 2 7 http:%2F%2Fsapozhki-advertime-2/#page.google/dodge (empty) 0 0 0 8565 375 1304 978 37 15 4 700.224 2 7 13 D� 1 1 0 0 (empty) (empty) 2675432 0 0 (empty) 0 1 1972 778 135 1372712722 4 1 16561 0 windows 1601 0 0 1 6494516778257365839 (empty) 393719418 0 0 0 1 0 5 1372782504 31 2 2 14851 1 -1016483843 61823 -1 2 S0 � (empty) (empty) 0 318 0 0 0 0 0 0 (empty) 0 (empty) NH 0 (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) (empty) 0 -296158784638538920 -5322637665780806659 0 + +query TT +explain SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY to_timestamp_seconds("EventTime") LIMIT 10; +---- +logical_plan +01)Sort: to_timestamp_seconds(hits.EventTime) ASC NULLS LAST, fetch=10 +02)--Filter: hits.URL LIKE Utf8View("%google%") +03)----TableScan: hits projection=[WatchID, JavaEnable, Title, GoodEvent, EventTime, EventDate, CounterID, ClientIP, RegionID, UserID, CounterClass, OS, UserAgent, URL, Referer, IsRefresh, RefererCategoryID, RefererRegionID, URLCategoryID, URLRegionID, ResolutionWidth, ResolutionHeight, ResolutionDepth, FlashMajor, FlashMinor, FlashMinor2, NetMajor, NetMinor, UserAgentMajor, UserAgentMinor, CookieEnable, JavascriptEnable, IsMobile, MobilePhone, MobilePhoneModel, Params, IPNetworkID, TraficSourceID, SearchEngineID, SearchPhrase, AdvEngineID, IsArtifical, WindowClientWidth, WindowClientHeight, ClientTimeZone, ClientEventTime, SilverlightVersion1, SilverlightVersion2, SilverlightVersion3, SilverlightVersion4, PageCharset, CodeVersion, IsLink, IsDownload, IsNotBounce, FUniqID, OriginalURL, HID, IsOldCounter, IsEvent, IsParameter, DontCountHits, WithHash, HitColor, LocalEventTime, Age, Sex, Income, Interests, Robotness, RemoteIP, WindowName, OpenerName, HistoryLength, BrowserLanguage, BrowserCountry, SocialNetwork, SocialAction, HTTPError, SendTiming, DNSTiming, ConnectTiming, ResponseStartTiming, ResponseEndTiming, FetchTiming, SocialSourceNetworkID, SocialSourcePage, ParamPrice, ParamOrderID, ParamCurrency, ParamCurrencyID, OpenstatServiceName, OpenstatCampaignID, OpenstatAdID, OpenstatSourceID, UTMSource, UTMMedium, UTMCampaign, UTMContent, UTMTerm, FromTag, HasGCLID, RefererHash, URLHash, CLID], partial_filters=[hits.URL LIKE Utf8View("%google%")] +physical_plan +01)SortPreservingMergeExec: [to_timestamp_seconds(EventTime@4) ASC NULLS LAST], fetch=10 +02)--SortExec: TopK(fetch=10), expr=[to_timestamp_seconds(EventTime@4) ASC NULLS LAST], preserve_partitioning=[true] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------FilterExec: URL@13 LIKE %google% +05)--------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/benchmarks/data/hits.parquet:0..3694994112], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:3694994112..7389988224], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:7389988224..11084982336], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:11084982336..14779976446]]}, projection=[WatchID, JavaEnable, Title, GoodEvent, EventTime, EventDate, CounterID, ClientIP, RegionID, UserID, CounterClass, OS, UserAgent, URL, Referer, IsRefresh, RefererCategoryID, RefererRegionID, URLCategoryID, URLRegionID, ResolutionWidth, ResolutionHeight, ResolutionDepth, FlashMajor, FlashMinor, FlashMinor2, NetMajor, NetMinor, UserAgentMajor, UserAgentMinor, CookieEnable, JavascriptEnable, IsMobile, MobilePhone, MobilePhoneModel, Params, IPNetworkID, TraficSourceID, SearchEngineID, SearchPhrase, AdvEngineID, IsArtifical, WindowClientWidth, WindowClientHeight, ClientTimeZone, ClientEventTime, SilverlightVersion1, SilverlightVersion2, SilverlightVersion3, SilverlightVersion4, PageCharset, CodeVersion, IsLink, IsDownload, IsNotBounce, FUniqID, OriginalURL, HID, IsOldCounter, IsEvent, IsParameter, DontCountHits, WithHash, HitColor, LocalEventTime, Age, Sex, Income, Interests, Robotness, RemoteIP, WindowName, OpenerName, HistoryLength, BrowserLanguage, BrowserCountry, SocialNetwork, SocialAction, HTTPError, SendTiming, DNSTiming, ConnectTiming, ResponseStartTiming, ResponseEndTiming, FetchTiming, SocialSourceNetworkID, SocialSourcePage, ParamPrice, ParamOrderID, ParamCurrency, ParamCurrencyID, OpenstatServiceName, OpenstatCampaignID, OpenstatAdID, OpenstatSourceID, UTMSource, UTMMedium, UTMCampaign, UTMContent, UTMTerm, FromTag, HasGCLID, RefererHash, URLHash, CLID], predicate=URL@13 LIKE %google% diff --git a/datafusion/sqllogictest/test_files/test2.slt b/datafusion/sqllogictest/test_files/test2.slt new file mode 100644 index 000000000000..e631df8cd38d --- /dev/null +++ b/datafusion/sqllogictest/test_files/test2.slt @@ -0,0 +1,11 @@ +statement ok +CREATE EXTERNAL TABLE hits +STORED AS PARQUET +LOCATION '../../benchmarks/data/hits.parquet'; + +query I +SELECT COUNT(*) FROM hits WHERE "URL" LIKE '%google%'; +---- +15911 + + diff --git a/datafusion/sqllogictest/test_files/test3.slt b/datafusion/sqllogictest/test_files/test3.slt new file mode 100644 index 000000000000..8531f3129ad8 --- /dev/null +++ b/datafusion/sqllogictest/test_files/test3.slt @@ -0,0 +1,22 @@ +statement ok +CREATE EXTERNAL TABLE hits +STORED AS PARQUET +LOCATION '../../benchmarks/data/hits.parquet'; + +query TT +explain SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10; +---- +logical_plan +01)Sort: count(*) DESC NULLS FIRST, fetch=10 +02)--Projection: hits.UserID, date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)) AS m, hits.SearchPhrase, count(*) +03)----Aggregate: groupBy=[[hits.UserID, date_part(Utf8("MINUTE"), to_timestamp_seconds(hits.EventTime)), hits.SearchPhrase]], aggr=[[count(Int64(1)) AS count(*)]] +04)------TableScan: hits projection=[EventTime, UserID, SearchPhrase] +physical_plan +01)SortPreservingMergeExec: [count(*)@3 DESC], fetch=10 +02)--SortExec: TopK(fetch=10), expr=[count(*)@3 DESC], preserve_partitioning=[true] +03)----ProjectionExec: expr=[UserID@0 as UserID, date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as m, SearchPhrase@2 as SearchPhrase, count(*)@3 as count(*)] +04)------AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID, date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2 as SearchPhrase], aggr=[count(*)] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------RepartitionExec: partitioning=Hash([UserID@0, date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1, SearchPhrase@2], 4), input_partitions=4 +07)------------AggregateExec: mode=Partial, gby=[UserID@1 as UserID, date_part(MINUTE, to_timestamp_seconds(EventTime@0)) as date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2 as SearchPhrase], aggr=[count(*)] +08)--------------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/benchmarks/data/hits.parquet:0..3694994112], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:3694994112..7389988224], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:7389988224..11084982336], [WORKSPACE_ROOT/benchmarks/data/hits.parquet:11084982336..14779976446]]}, projection=[EventTime, UserID, SearchPhrase] diff --git a/flamegraph.svg b/flamegraph.svg new file mode 100644 index 000000000000..346e67e6489a --- /dev/null +++ b/flamegraph.svg @@ -0,0 +1,491 @@ +Flame Graph Reset ZoomSearch dfbench`dfbench::main (1,120 samples, 0.42%)dfbench`dfbench::main::_{{closure}} (379 samples, 0.14%)dfbench`datafusion_physical_plan::execution_plan::collect::_{{closure}} (123 samples, 0.05%)dfbench`datafusion_physical_plan::common::collect::_{{closure}} (103 samples, 0.04%)dfbench`<futures_util::stream::try_stream::try_collect::TryCollect<St,C> as core::future::future::Future>::poll (199 samples, 0.07%)dfbench`<datafusion_physical_plan::sorts::merge::SortPreservingMergeStream<C> as futures_core::stream::Stream>::poll_next (137 samples, 0.05%)dfbench`<alloc::collections::vec_deque::VecDeque<T,A> as core::clone::Clone>::clone (430 samples, 0.16%)dfbench`mi_malloc_aligned (500 samples, 0.19%)dfbench`<alloc::collections::vec_deque::VecDeque<T,A> as core::clone::Clone>::clone (5,401 samples, 2.01%)d..libdyld.dylib`tlv_get_addr (153 samples, 0.06%)dfbench`<datafusion_physical_plan::sorts::stream::FieldCursorStream<T> as datafusion_physical_plan::sorts::stream::PartitionedStream>::poll_next (257 samples, 0.10%)dfbench`datafusion_physical_plan::metrics::baseline::BaselineMetrics::record_poll (406 samples, 0.15%)dfbench`<datafusion_physical_plan::stream::RecordBatchStreamAdapter<S> as futures_core::stream::Stream>::poll_next (131 samples, 0.05%)dfbench`<futures_util::stream::select::Select<St1,St2> as futures_core::stream::Stream>::poll_next (176 samples, 0.07%)dfbench`<futures_util::stream::stream::filter_map::FilterMap<St,Fut,F> as futures_core::stream::Stream>::poll_next (1,736 samples, 0.65%)dfbench`tokio::task::join_set::JoinSet<T>::join_next::_{{closure}} (889 samples, 0.33%)dfbench`tokio::runtime::park::clone (220 samples, 0.08%)dfbench`tokio::runtime::park::drop_waker (192 samples, 0.07%)dfbench`tokio::sync::mpsc::list::Rx<T>::pop (1,351 samples, 0.50%)dfbench`<futures_util::stream::unfold::Unfold<T,F,Fut> as futures_core::stream::Stream>::poll_next (4,020 samples, 1.50%)dfbench`tokio::sync::task::atomic_waker::AtomicWaker::register_by_ref (578 samples, 0.22%)dfbench`tokio::sync::mpsc::list::Rx<T>::pop (256 samples, 0.10%)dfbench`tokio::sync::task::atomic_waker::AtomicWaker::register_by_ref (330 samples, 0.12%)dfbench`tokio::task::join_set::JoinSet<T>::join_next::_{{closure}} (380 samples, 0.14%)dfbench`<futures_util::stream::select::Select<St1,St2> as futures_core::stream::Stream>::poll_next (8,218 samples, 3.06%)dfb..libdyld.dylib`tlv_get_addr (652 samples, 0.24%)dfbench`<futures_util::stream::stream::filter_map::FilterMap<St,Fut,F> as futures_core::stream::Stream>::poll_next (457 samples, 0.17%)dfbench`<futures_util::stream::unfold::Unfold<T,F,Fut> as futures_core::stream::Stream>::poll_next (289 samples, 0.11%)dfbench`<datafusion_physical_plan::sorts::stream::FieldCursorStream<T> as datafusion_physical_plan::sorts::stream::PartitionedStream>::poll_next (9,953 samples, 3.71%)dfbe..dfbench`datafusion_physical_plan::sorts::stream::FusedStreams::poll_next (9,417 samples, 3.51%)dfb..dfbench`futures_util::stream::select::select::round_robin (155 samples, 0.06%)dfbench`datafusion_physical_plan::sorts::merge::SortPreservingMergeStream<C>::maybe_poll_stream (10,839 samples, 4.04%)dfbe..dfbench`datafusion_physical_plan::sorts::stream::FusedStreams::poll_next (475 samples, 0.18%)dfbench`<datafusion_physical_plan::sorts::merge::SortPreservingMergeStream<C> as futures_core::stream::Stream>::poll_next (18,120 samples, 6.76%)dfbench`<..dfbench`tokio::runtime::park::wake_by_ref (131 samples, 0.05%)dfbench`datafusion_physical_plan::metrics::baseline::BaselineMetrics::record_poll (154 samples, 0.06%)dfbench`datafusion_physical_plan::sorts::merge::SortPreservingMergeStream<C>::maybe_poll_stream (273 samples, 0.10%)dfbench`mi_free (572 samples, 0.21%)dfbench`datafusion_physical_plan::common::collect::_{{closure}} (20,708 samples, 7.72%)dfbench`dat..dfbench`<futures_util::stream::try_stream::try_collect::TryCollect<St,C> as core::future::future::Future>::poll (20,268 samples, 7.56%)dfbench`<f..dfbench`tokio::runtime::park::wake_by_ref (119 samples, 0.04%)dfbench`datafusion_physical_plan::execution_plan::collect::_{{closure}} (21,438 samples, 7.99%)dfbench`dat..dfbench`dfbench::main::_{{closure}} (23,058 samples, 8.60%)dfbench`dfbe..dfbench`dfbench::main (23,521 samples, 8.77%)dfbench`dfbe..dfbench`tokio::runtime::park::Inner::park (339 samples, 0.13%)dfbench`main (25,791 samples, 9.62%)dfbench`maindfbench`std::sys::backtrace::__rust_begin_short_backtrace (25,791 samples, 9.62%)dfbench`std::s..libdyld.dylib`tlv_get_addr (767 samples, 0.29%)dfbench`mi_arenas_try_purge (42 samples, 0.02%)dfbench`mi_arena_purge (42 samples, 0.02%)libsystem_kernel.dylib`madvise (42 samples, 0.02%)libsystem_c.dylib`exit (43 samples, 0.02%)libsystem_c.dylib`__cxa_finalize_ranges (43 samples, 0.02%)dfbench`mi_process_done (43 samples, 0.02%)dyld`start (25,835 samples, 9.63%)dyld`startlibdyld.dylib`dyld4::LibSystemHelpers::getenv (44 samples, 0.02%)libsystem_kernel.dylib`__exit (66 samples, 0.02%)dfbench`mi_segment_try_purge (30 samples, 0.01%)dfbench`mi_segment_purge (30 samples, 0.01%)libsystem_kernel.dylib`madvise (30 samples, 0.01%)dfbench`_mi_page_free (31 samples, 0.01%)dfbench`mi_segment_page_clear (31 samples, 0.01%)dfbench`mi_segment_span_free_coalesce (31 samples, 0.01%)dfbench`mi_segment_span_free (31 samples, 0.01%)dfbench`mi_heap_malloc_zero_aligned_at_generic (32 samples, 0.01%)dfbench`_mi_malloc_generic (32 samples, 0.01%)dfbench`_mi_free_delayed_block (32 samples, 0.01%)dfbench`<object_store::local::LocalFileSystem as object_store::ObjectStore>::get_range::_{{closure}}::_{{closure}} (48 samples, 0.02%)dfbench`object_store::local::read_range (46 samples, 0.02%)dfbench`object_store::local::open_file (72 samples, 0.03%)dfbench`std::fs::OpenOptions::_open (72 samples, 0.03%)libsystem_kernel.dylib`__open (72 samples, 0.03%)dfbench`mi_segment_try_purge (383 samples, 0.14%)dfbench`mi_segment_purge (382 samples, 0.14%)libsystem_kernel.dylib`madvise (382 samples, 0.14%)dfbench`_mi_page_free (397 samples, 0.15%)dfbench`mi_segment_page_clear (395 samples, 0.15%)dfbench`mi_segment_span_free_coalesce (394 samples, 0.15%)dfbench`mi_segment_span_free (389 samples, 0.15%)dfbench`mi_arena_purge (94 samples, 0.04%)libsystem_kernel.dylib`madvise (94 samples, 0.04%)dfbench`mi_segment_free (95 samples, 0.04%)dfbench`mi_arenas_try_purge (95 samples, 0.04%)dfbench`_mi_free_delayed_block (500 samples, 0.19%)dfbench`mi_page_free_list_extend (30 samples, 0.01%)dfbench`mi_segment_purge (61 samples, 0.02%)libsystem_kernel.dylib`madvise (61 samples, 0.02%)dfbench`_mi_malloc_generic (638 samples, 0.24%)dfbench`mi_page_fresh_alloc (80 samples, 0.03%)dfbench`mi_segments_page_alloc (78 samples, 0.03%)dfbench`mi_segment_try_purge (65 samples, 0.02%)dfbench`mi_heap_malloc_zero_aligned_at_generic (639 samples, 0.24%)dfbench`object_store::local::read_range (13,149 samples, 4.90%)dfbenc..libsystem_kernel.dylib`read (12,498 samples, 4.66%)libsy..libsystem_kernel.dylib`__lseek (42 samples, 0.02%)dfbench`<object_store::local::LocalFileSystem as object_store::ObjectStore>::get_ranges::_{{closure}}::_{{closure}} (13,276 samples, 4.95%)dfbenc..dfbench`<parquet::format::Statistics as parquet::thrift::TSerializable>::read_from_in_protocol (75 samples, 0.03%)dfbench`<parquet::thrift::TCompactSliceInputProtocol as thrift::protocol::TInputProtocol>::read_field_begin (68 samples, 0.03%)dfbench`thrift::protocol::compact::u8_to_type (35 samples, 0.01%)dfbench`<parquet::format::ColumnChunk as parquet::thrift::TSerializable>::read_from_in_protocol (380 samples, 0.14%)dfbench`mi_heap_malloc_zero_aligned_at_generic (41 samples, 0.02%)dfbench`_mi_malloc_generic (40 samples, 0.01%)dfbench`<parquet::format::FileMetaData as parquet::thrift::TSerializable>::read_from_in_protocol (472 samples, 0.18%)dfbench`parquet::file::metadata::reader::ParquetMetaDataReader::decode_metadata (622 samples, 0.23%)libsystem_platform.dylib`_platform_memmove (53 samples, 0.02%)dfbench`<parquet::arrow::async_reader::store::ParquetObjectReader as parquet::arrow::async_reader::AsyncFileReader>::get_metadata::_{{closure}} (683 samples, 0.25%)libsystem_platform.dylib`_platform_memmove (37 samples, 0.01%)dfbench`<datafusion::datasource::physical_plan::parquet::opener::ParquetOpener as datafusion::datasource::physical_plan::file_stream::FileOpener>::open::_{{closure}} (685 samples, 0.26%)dfbench`alloc::raw_vec::RawVec<T,A>::grow_one (37 samples, 0.01%)dfbench`alloc::raw_vec::finish_grow (27 samples, 0.01%)dfbench`arrow_array::array::byte_view_array::_<impl core::convert::From<arrow_array::array::byte_view_array::GenericByteViewArray<T>> for arrow_data::data::ArrayData>::from (95 samples, 0.04%)dfbench`<arrow_array::array::byte_view_array::GenericByteViewArray<T> as arrow_array::array::Array>::to_data (115 samples, 0.04%)dfbench`arrow_data::data::ArrayDataBuilder::build_impl (31 samples, 0.01%)dfbench`arrow_array::array::primitive_array::_<impl core::convert::From<arrow_array::array::primitive_array::PrimitiveArray<T>> for arrow_data::data::ArrayData>::from (108 samples, 0.04%)dfbench`<arrow_array::array::primitive_array::PrimitiveArray<T> as arrow_array::array::Array>::to_data (133 samples, 0.05%)dfbench`<arrow_array::array::primitive_array::PrimitiveArray<T> as core::convert::From<arrow_data::data::ArrayData>>::from (61 samples, 0.02%)dfbench`arrow_array::array::make_array (227 samples, 0.08%)dfbench`<core::iter::adapters::GenericShunt<I,R> as core::iter::traits::iterator::Iterator>::next (651 samples, 0.24%)dfbench`arrow_cast::cast::cast_with_options (508 samples, 0.19%)dfbench`arrow_array::record_batch::RecordBatch::try_new_impl (28 samples, 0.01%)dfbench`alloc::sync::Arc<T,A>::drop_slow (29 samples, 0.01%)dfbench`core::ptr::drop_in_place<alloc::vec::Vec<alloc::sync::Arc<dyn arrow_array::array::Array>>> (48 samples, 0.02%)dfbench`<datafusion::datasource::schema_adapter::SchemaMapping as datafusion::datasource::schema_adapter::SchemaMapper>::map_batch (804 samples, 0.30%)dfbench`<arrow_array::array::primitive_array::PrimitiveArray<T> as arrow_array::array::Array>::to_data (37 samples, 0.01%)dfbench`<parquet::arrow::array_reader::byte_view_array::ByteViewArrayReader as parquet::arrow::array_reader::ArrayReader>::consume_batch (37 samples, 0.01%)dfbench`<parquet::arrow::array_reader::byte_view_array::ByteViewArrayReader as parquet::arrow::array_reader::ArrayReader>::read_records (31 samples, 0.01%)dfbench`<parquet::arrow::array_reader::null_array::NullArrayReader<T> as parquet::arrow::array_reader::ArrayReader>::read_records (30 samples, 0.01%)dfbench`alloc::raw_vec::RawVec<T,A>::grow_one (39 samples, 0.01%)dfbench`alloc::raw_vec::finish_grow (31 samples, 0.01%)dfbench`arrow_data::data::ArrayDataBuilder::build_impl (61 samples, 0.02%)dfbench`arrow_array::array::byte_view_array::_<impl core::convert::From<arrow_array::array::byte_view_array::GenericByteViewArray<T>> for arrow_data::data::ArrayData>::from (132 samples, 0.05%)dfbench`<arrow_array::array::byte_view_array::GenericByteViewArray<T> as arrow_array::array::Array>::to_data (163 samples, 0.06%)dfbench`arrow_data::data::ArrayDataBuilder::build_impl (116 samples, 0.04%)dfbench`core::ptr::drop_in_place<alloc::vec::Vec<arrow_buffer::buffer::immutable::Buffer>> (65 samples, 0.02%)dfbench`arrow_array::array::primitive_array::_<impl core::convert::From<arrow_array::array::primitive_array::PrimitiveArray<T>> for arrow_data::data::ArrayData>::from (225 samples, 0.08%)dfbench`<arrow_array::array::primitive_array::PrimitiveArray<T> as arrow_array::array::Array>::to_data (268 samples, 0.10%)dfbench`<arrow_schema::datatype::DataType as core::clone::Clone>::clone (97 samples, 0.04%)dfbench`<arrow_data::data::ArrayData as core::clone::Clone>::clone (197 samples, 0.07%)dfbench`<arrow_schema::datatype::DataType as core::cmp::PartialEq>::eq (27 samples, 0.01%)dfbench`<arrow_array::array::primitive_array::PrimitiveArray<T> as core::convert::From<arrow_data::data::ArrayData>>::from (87 samples, 0.03%)dfbench`core::ptr::drop_in_place<arrow_data::data::ArrayData> (28 samples, 0.01%)dfbench`arrow_array::array::make_array (281 samples, 0.10%)dfbench`<arrow_array::array::struct_array::StructArray as core::convert::From<arrow_data::data::ArrayData>>::from (517 samples, 0.19%)dfbench`<arrow_data::data::ArrayData as core::clone::Clone>::clone (33 samples, 0.01%)dfbench`arrow_array::array::make_array (105 samples, 0.04%)dfbench`arrow_data::data::ArrayDataBuilder::add_buffers (61 samples, 0.02%)dfbench`arrow_data::data::ArrayDataBuilder::null_bit_buffer (29 samples, 0.01%)dfbench`<parquet::arrow::array_reader::byte_view_array::ByteViewArrayReader as parquet::arrow::array_reader::ArrayReader>::consume_batch (313 samples, 0.12%)dfbench`alloc::raw_vec::RawVec<T,A>::grow_one (111 samples, 0.04%)dfbench`alloc::raw_vec::finish_grow (50 samples, 0.02%)dfbench`mi_free (30 samples, 0.01%)dfbench`core::ptr::drop_in_place<arrow_array::array::primitive_array::PrimitiveArray<arrow_array::types::Int32Type>> (47 samples, 0.02%)dfbench`alloc::sync::Arc<T,A>::drop_slow (60 samples, 0.02%)dfbench`arrow_buffer::buffer::scalar::ScalarBuffer<T>::new (44 samples, 0.02%)dfbench`arrow_array::array::primitive_array::PrimitiveArray<T>::try_new (56 samples, 0.02%)dfbench`mi_page_free_list_extend (56 samples, 0.02%)libsystem_c.dylib`_mach_boottime_usec (40 samples, 0.01%)libsystem_c.dylib`gettimeofday (38 samples, 0.01%)libsystem_kernel.dylib`mach_absolute_time (38 samples, 0.01%)dfbench`mi_segment_span_allocate (75 samples, 0.03%)libsystem_c.dylib`clock_gettime (47 samples, 0.02%)dfbench`mi_page_fresh_alloc (121 samples, 0.05%)dfbench`mi_segments_page_alloc (119 samples, 0.04%)dfbench`mi_find_page (214 samples, 0.08%)dfbench`_mi_malloc_generic (225 samples, 0.08%)dfbench`mi_heap_malloc_zero_aligned_at_generic (239 samples, 0.09%)dfbench`mi_malloc_aligned (32 samples, 0.01%)dfbench`arrow_cast::cast::cast_numeric_arrays (9,312 samples, 3.47%)dfb..dfbench`arrow_data::data::ArrayDataBuilder::build_impl (46 samples, 0.02%)dfbench`arrow_array::array::primitive_array::_<impl core::convert::From<arrow_array::array::primitive_array::PrimitiveArray<T>> for arrow_data::data::ArrayData>::from (70 samples, 0.03%)dfbench`<arrow_array::array::primitive_array::PrimitiveArray<T> as arrow_array::array::Array>::to_data (85 samples, 0.03%)dfbench`<arrow_schema::datatype::DataType as core::cmp::PartialEq>::eq (28 samples, 0.01%)dfbench`arrow_array::array::make_array (67 samples, 0.02%)dfbench`arrow_cast::cast::cast_with_options (220 samples, 0.08%)dfbench`arrow_data::data::ArrayDataBuilder::build_impl (53 samples, 0.02%)dfbench`arrow_data::data::ArrayDataBuilder::null_bit_buffer (101 samples, 0.04%)dfbench`core::ptr::drop_in_place<arrow_data::data::ArrayData> (29 samples, 0.01%)dfbench`mi_malloc_aligned (29 samples, 0.01%)dfbench`<parquet::arrow::array_reader::primitive_array::PrimitiveArrayReader<T> as parquet::arrow::array_reader::ArrayReader>::consume_batch (11,771 samples, 4.39%)dfben..libsystem_platform.dylib`_platform_memset (1,375 samples, 0.51%)dfbench`alloc::sync::Arc<T,A>::drop_slow (49 samples, 0.02%)dfbench`core::ptr::drop_in_place<alloc::vec::Vec<alloc::sync::Arc<dyn arrow_array::array::Array>>> (80 samples, 0.03%)dfbench`core::ptr::drop_in_place<alloc::vec::Vec<arrow_buffer::buffer::immutable::Buffer>> (29 samples, 0.01%)dfbench`core::ptr::drop_in_place<arrow_data::data::ArrayData> (40 samples, 0.01%)dfbench`core::ptr::drop_in_place<arrow_data::data::ArrayData> (100 samples, 0.04%)dfbench`<parquet::arrow::array_reader::struct_array::StructArrayReader as parquet::arrow::array_reader::ArrayReader>::consume_batch (13,491 samples, 5.03%)dfbenc..dfbench`mi_page_free_list_extend (54 samples, 0.02%)dfbench`mi_segment_span_allocate (55 samples, 0.02%)libsystem_c.dylib`clock_gettime (30 samples, 0.01%)libsystem_c.dylib`_mach_boottime_usec (29 samples, 0.01%)dfbench`mi_segment_try_purge (32 samples, 0.01%)libsystem_c.dylib`clock_gettime (32 samples, 0.01%)dfbench`mi_page_fresh_alloc (109 samples, 0.04%)dfbench`mi_segments_page_alloc (108 samples, 0.04%)dfbench`mi_find_page (333 samples, 0.12%)dfbench`_mi_malloc_generic (383 samples, 0.14%)dfbench`mi_heap_realloc_zero_aligned_at (511 samples, 0.19%)dfbench`mi_heap_malloc_zero_aligned_at_generic (400 samples, 0.15%)dfbench`alloc::raw_vec::finish_grow (5,048 samples, 1.88%)d..libsystem_platform.dylib`_platform_memmove (4,455 samples, 1.66%)dfbench`mi_heap_realloc_zero_aligned_at (36 samples, 0.01%)dfbench`alloc::raw_vec::RawVec<T,A>::grow_one (5,183 samples, 1.93%)d..dfbench`alloc::raw_vec::finish_grow (33 samples, 0.01%)dfbench`arrow_array::builder::generic_bytes_view_builder::make_view (2,453 samples, 0.91%)dfbench`core::str::converts::from_utf8 (86 samples, 0.03%)dfbench`alloc::raw_vec::finish_grow (215 samples, 0.08%)libsystem_platform.dylib`_platform_memmove (171 samples, 0.06%)dfbench`alloc::raw_vec::RawVecInner<A>::reserve::do_reserve_and_handle (222 samples, 0.08%)dfbench`core::str::converts::from_utf8 (52,137 samples, 19.44%)dfbench`core::str::converts::f..dfbench`parquet::arrow::array_reader::byte_view_array::ByteViewArrayDecoderPlain::read (55,684 samples, 20.77%)dfbench`parquet::arrow::array_rea..dfbench`<parquet::arrow::array_reader::byte_view_array::ByteViewArrayColumnValueDecoder as parquet::column::reader::decoder::ColumnValueDecoder>::set_data (41 samples, 0.02%)dfbench`arrow_array::builder::generic_bytes_view_builder::make_view (55 samples, 0.02%)dfbench`core::str::converts::from_utf8 (2,848 samples, 1.06%)dfbench`<parquet::arrow::array_reader::byte_view_array::ByteViewArrayColumnValueDecoder as parquet::column::reader::decoder::ColumnValueDecoder>::set_dict (3,053 samples, 1.14%)dfbench`parquet::arrow::array_reader::byte_view_array::ByteViewArrayDecoderPlain::read (2,994 samples, 1.12%)dfbench`_mi_malloc_generic (40 samples, 0.01%)dfbench`mi_page_fresh_alloc (32 samples, 0.01%)dfbench`mi_segments_page_alloc (32 samples, 0.01%)dfbench`mi_heap_malloc_zero_aligned_at_generic (41 samples, 0.02%)dfbench`<parquet::compression::snappy_codec::SnappyCodec as parquet::compression::Codec>::decompress (60,752 samples, 22.66%)dfbench`<parquet::compression::snapp..dfbench`DYLD-STUB$$memcpy (51 samples, 0.02%)dfbench`_mi_page_free (41 samples, 0.02%)dfbench`mi_segment_page_clear (39 samples, 0.01%)dfbench`mi_segment_span_free_coalesce (36 samples, 0.01%)dfbench`mi_segment_span_free (32 samples, 0.01%)dfbench`_mi_malloc_generic (65 samples, 0.02%)dfbench`mi_page_fresh_alloc (46 samples, 0.02%)dfbench`mi_segments_page_alloc (45 samples, 0.02%)dfbench`mi_heap_malloc_zero_aligned_at_generic (67 samples, 0.02%)libsystem_platform.dylib`__bzero (1,379 samples, 0.51%)libsystem_platform.dylib`_platform_memmove (1,924 samples, 0.72%)dfbench`parquet::file::serialized_reader::decode_page (64,247 samples, 23.96%)dfbench`parquet::file::serialized_read..dfbench`<parquet::format::Statistics as parquet::thrift::TSerializable>::read_from_in_protocol (41 samples, 0.02%)dfbench`<thrift::protocol::compact::TCompactInputProtocol<T> as thrift::protocol::TInputProtocol>::read_i32 (36 samples, 0.01%)dfbench`parquet::file::serialized_reader::read_page_header_len (160 samples, 0.06%)dfbench`<parquet::file::serialized_reader::SerializedPageReader<R> as parquet::column::page::PageReader>::get_next_page (64,511 samples, 24.06%)dfbench`<parquet::file::serialized_rea..libsystem_platform.dylib`__bzero (299 samples, 0.11%)libsystem_platform.dylib`_platform_memmove (1,554 samples, 0.58%)dfbench`parquet::column::reader::GenericColumnReader<R,D,V>::read_new_page (69,616 samples, 25.96%)dfbench`parquet::column::reader::GenericCo..libsystem_platform.dylib`_platform_memset (128 samples, 0.05%)dfbench`parquet::encodings::rle::RleDecoder::reload (580 samples, 0.22%)dfbench`parquet::util::bit_util::BitReader::get_vlq_int (263 samples, 0.10%)dfbench`parquet::util::bit_pack::unpack16 (84 samples, 0.03%)dfbench`parquet::util::bit_pack::unpack32 (142 samples, 0.05%)dfbench`parquet::util::bit_pack::unpack8 (114 samples, 0.04%)dfbench`parquet::util::bit_util::BitReader::get_value (221 samples, 0.08%)dfbench`parquet::util::bit_util::BitReader::get_batch (1,245 samples, 0.46%)dfbench`parquet::util::bit_util::BitReader::get_value (52 samples, 0.02%)dfbench`parquet::encodings::rle::RleDecoder::get_batch (4,121 samples, 1.54%)libsystem_platform.dylib`_platform_memmove (70 samples, 0.03%)dfbench`parquet::encodings::rle::RleDecoder::reload (45 samples, 0.02%)dfbench`<parquet::arrow::array_reader::byte_view_array::ByteViewArrayReader as parquet::arrow::array_reader::ArrayReader>::read_records (157,905 samples, 58.89%)dfbench`<parquet::arrow::array_reader::byte_view_array::ByteViewArrayReader as parquet::arrow::ar..dfbench`parquet::util::bit_util::BitReader::get_batch (40 samples, 0.01%)dfbench`alloc::raw_vec::finish_grow (32 samples, 0.01%)dfbench`alloc::raw_vec::RawVecInner<A>::reserve::do_reserve_and_handle (42 samples, 0.02%)libsystem_platform.dylib`_platform_memmove (175 samples, 0.07%)dfbench`<parquet::column::reader::decoder::ColumnValueDecoderImpl<T> as parquet::column::reader::decoder::ColumnValueDecoder>::set_dict (190 samples, 0.07%)dfbench`<parquet::compression::snappy_codec::SnappyCodec as parquet::compression::Codec>::decompress (5,953 samples, 2.22%)d..libsystem_platform.dylib`__bzero (368 samples, 0.14%)dfbench`parquet::file::serialized_reader::decode_page (6,731 samples, 2.51%)df..libsystem_platform.dylib`_platform_memmove (380 samples, 0.14%)dfbench`<parquet::file::serialized_reader::SerializedPageReader<R> as parquet::column::page::PageReader>::get_next_page (6,779 samples, 2.53%)df..libsystem_platform.dylib`__bzero (380 samples, 0.14%)libsystem_platform.dylib`_platform_memmove (449 samples, 0.17%)dfbench`parquet::column::reader::GenericColumnReader<R,D,V>::read_new_page (7,856 samples, 2.93%)df..libsystem_platform.dylib`_platform_memset (34 samples, 0.01%)dfbench`parquet::encodings::rle::RleDecoder::reload (161 samples, 0.06%)dfbench`parquet::util::bit_util::BitReader::get_vlq_int (60 samples, 0.02%)dfbench`DYLD-STUB$$memcpy (35 samples, 0.01%)dfbench`parquet::util::bit_pack::unpack32 (596 samples, 0.22%)dfbench`parquet::util::bit_util::BitReader::get_value (408 samples, 0.15%)dfbench`parquet::util::bit_util::BitReader::get_batch (1,425 samples, 0.53%)libsystem_platform.dylib`_platform_memmove (63 samples, 0.02%)dfbench`parquet::util::bit_util::BitReader::get_value (135 samples, 0.05%)dfbench`parquet::encodings::rle::RleDecoder::get_batch_with_dict (6,972 samples, 2.60%)df..libsystem_platform.dylib`_platform_memmove (31 samples, 0.01%)libsystem_platform.dylib`_platform_memmove (283 samples, 0.11%)dfbench`<parquet::arrow::array_reader::primitive_array::PrimitiveArrayReader<T> as parquet::arrow::array_reader::ArrayReader>::read_records (15,265 samples, 5.69%)dfbench..dfbench`<core::hash::sip::Hasher<S> as core::hash::Hasher>::write (36 samples, 0.01%)dfbench`mi_page_fresh_alloc (44 samples, 0.02%)dfbench`mi_segments_page_alloc (42 samples, 0.02%)dfbench`mi_find_page (177 samples, 0.07%)dfbench`_mi_malloc_generic (217 samples, 0.08%)dfbench`mi_heap_malloc_zero_aligned_at_generic (237 samples, 0.09%)dfbench`alloc::raw_vec::finish_grow (258 samples, 0.10%)dfbench`alloc::raw_vec::RawVecInner<A>::reserve::do_reserve_and_handle (324 samples, 0.12%)dfbench`core::hash::BuildHasher::hash_one (30 samples, 0.01%)libsystem_platform.dylib`_platform_memmove (46 samples, 0.02%)dfbench`<parquet::column::reader::decoder::ColumnValueDecoderImpl<T> as parquet::column::reader::decoder::ColumnValueDecoder>::set_dict (70 samples, 0.03%)dfbench`<parquet::compression::snappy_codec::SnappyCodec as parquet::compression::Codec>::decompress (5,820 samples, 2.17%)d..libsystem_platform.dylib`__bzero (215 samples, 0.08%)libsystem_platform.dylib`_platform_memmove (328 samples, 0.12%)dfbench`parquet::file::serialized_reader::decode_page (6,421 samples, 2.39%)df..dfbench`parquet::file::serialized_reader::read_page_header_len (74 samples, 0.03%)dfbench`<parquet::file::serialized_reader::SerializedPageReader<R> as parquet::column::page::PageReader>::get_next_page (6,549 samples, 2.44%)df..libsystem_platform.dylib`__bzero (181 samples, 0.07%)libsystem_platform.dylib`_platform_memmove (297 samples, 0.11%)dfbench`parquet::column::reader::GenericColumnReader<R,D,V>::read_new_page (7,202 samples, 2.69%)df..libsystem_platform.dylib`_platform_memset (80 samples, 0.03%)dfbench`DYLD-STUB$$memcpy (48 samples, 0.02%)dfbench`parquet::encodings::rle::RleDecoder::reload (2,553 samples, 0.95%)dfbench`parquet::util::bit_util::BitReader::get_vlq_int (930 samples, 0.35%)dfbench`DYLD-STUB$$memcpy (61 samples, 0.02%)dfbench`parquet::util::bit_pack::unpack16 (434 samples, 0.16%)dfbench`parquet::util::bit_pack::unpack32 (982 samples, 0.37%)dfbench`parquet::util::bit_pack::unpack8 (383 samples, 0.14%)dfbench`parquet::util::bit_util::BitReader::get_value (958 samples, 0.36%)dfbench`parquet::util::bit_util::BitReader::get_batch (5,620 samples, 2.10%)d..libsystem_platform.dylib`_platform_memmove (184 samples, 0.07%)dfbench`parquet::util::bit_util::BitReader::get_value (359 samples, 0.13%)dfbench`parquet::util::bit_util::BitReader::get_vlq_int (27 samples, 0.01%)dfbench`parquet::encodings::rle::RleDecoder::get_batch_with_dict (21,116 samples, 7.87%)dfbench`par..libsystem_platform.dylib`_platform_memmove (480 samples, 0.18%)dfbench`parquet::encodings::rle::RleDecoder::reload (287 samples, 0.11%)dfbench`parquet::util::bit_util::BitReader::get_batch (299 samples, 0.11%)dfbench`parquet::arrow::array_reader::read_records (29,436 samples, 10.98%)dfbench`parquet:..dfbench`parquet::encodings::rle::RleDecoder::get_batch_with_dict (29 samples, 0.01%)libsystem_platform.dylib`__bzero (1,083 samples, 0.40%)dfbench`<parquet::arrow::array_reader::struct_array::StructArrayReader as parquet::arrow::array_reader::ArrayReader>::read_records (206,367 samples, 76.96%)dfbench`<parquet::arrow::array_reader::struct_array::StructArrayReader as parquet::arrow::array_reader::ArrayReader>::read_recor..libsystem_platform.dylib`_platform_memset (2,548 samples, 0.95%)dfbench`parquet::arrow::array_reader::read_records (58 samples, 0.02%)dfbench`<parquet::arrow::arrow_reader::ParquetRecordBatchReader as core::iter::traits::iterator::Iterator>::next (220,164 samples, 82.10%)dfbench`<parquet::arrow::arrow_reader::ParquetRecordBatchReader as core::iter::traits::iterator::Iterator>::nextdfbench`bytes::bytes::promotable_even_drop (297 samples, 0.11%)dfbench`mi_free_block_mt (297 samples, 0.11%)dfbench`_mi_os_reset (297 samples, 0.11%)libsystem_kernel.dylib`madvise (297 samples, 0.11%)dfbench`core::ptr::drop_in_place<parquet::file::serialized_reader::SerializedPageReader<parquet::arrow::async_reader::ColumnChunkData>> (300 samples, 0.11%)dfbench`alloc::sync::Arc<T,A>::drop_slow (299 samples, 0.11%)dfbench`core::ptr::drop_in_place<core::option::Option<parquet::column::reader::GenericColumnReader<parquet::column::reader::decoder::RepetitionLevelDecoderImpl,parquet::arrow::record_reader::definition_levels::DefinitionLevelBufferDecoder,parquet::arrow::array_reader::byte_view_array::ByteViewArrayColumnValueDecoder>>> (314 samples, 0.12%)dfbench`core::ptr::drop_in_place<parquet::encodings::decoding::DictDecoder<parquet::data_type::DoubleType>> (39 samples, 0.01%)dfbench`core::ptr::drop_in_place<std::collections::hash::map::HashMap<parquet::basic::Encoding,alloc::boxed::Box<dyn parquet::encodings::decoding::Decoder<parquet::data_type::BoolType>>>> (66 samples, 0.02%)dfbench`core::ptr::drop_in_place<alloc::vec::Vec<alloc::boxed::Box<dyn arrow_json::reader::ArrayDecoder>>> (414 samples, 0.15%)dfbench`core::ptr::drop_in_place<parquet::arrow::arrow_reader::ParquetRecordBatchReader> (421 samples, 0.16%)dfbench`core::ptr::drop_in_place<parquet::arrow::array_reader::struct_array::StructArrayReader> (417 samples, 0.16%)dfbench`mi_segment_try_purge (180 samples, 0.07%)dfbench`mi_segment_purge (180 samples, 0.07%)libsystem_kernel.dylib`madvise (180 samples, 0.07%)dfbench`_mi_page_free (182 samples, 0.07%)dfbench`mi_segment_page_clear (182 samples, 0.07%)dfbench`mi_segment_span_free_coalesce (182 samples, 0.07%)dfbench`mi_segment_span_free (181 samples, 0.07%)dfbench`mi_find_page (202 samples, 0.08%)dfbench`mi_heap_malloc_zero_aligned_at_generic (203 samples, 0.08%)dfbench`_mi_malloc_generic (203 samples, 0.08%)dfbench`parquet::compression::create_codec (206 samples, 0.08%)dfbench`<parquet::arrow::async_reader::InMemoryRowGroup as parquet::arrow::array_reader::RowGroups>::column_chunks (224 samples, 0.08%)dfbench`parquet::arrow::array_reader::builder::build_reader (240 samples, 0.09%)dfbench`parquet::arrow::array_reader::builder::build_reader (281 samples, 0.10%)dfbench`parquet::arrow::array_reader::builder::build_array_reader (290 samples, 0.11%)dfbench`parquet::arrow::async_reader::InMemoryRowGroup::fetch::_{{closure}} (47 samples, 0.02%)dfbench`<futures_util::stream::stream::map::Map<St,F> as futures_core::stream::Stream>::poll_next (221,749 samples, 82.69%)dfbench`<futures_util::stream::stream::map::Map<St,F> as futures_core::stream::Stream>::poll_nextdfbench`parquet::arrow::async_reader::ReaderFactory<T>::read_row_group::_{{closure}} (340 samples, 0.13%)dfbench`arrow_array::record_batch::RecordBatch::try_new_impl (47 samples, 0.02%)dfbench`datafusion::datasource::physical_plan::file_scan_config::PartitionColumnProjector::project (107 samples, 0.04%)dfbench`<datafusion::datasource::physical_plan::file_stream::FileStream<F> as futures_core::stream::Stream>::poll_next (222,595 samples, 83.01%)dfbench`<datafusion::datasource::physical_plan::file_stream::FileStream<F> as futures_core::stream::Stream>::poll_nextdfbench`<arrow_array::array::boolean_array::BooleanArray as core::convert::From<alloc::vec::Vec<bool>>>::from (160 samples, 0.06%)dfbench`memchr::memmem::searcher::searcher_kind_neon (4,134 samples, 1.54%)dfbench`arrow_string::predicate::Predicate::evaluate_array (4,863 samples, 1.81%)d..dfbench`arrow_string::like::op_scalar (5,203 samples, 1.94%)d..dfbench`memchr::memmem::searcher::searcher_kind_neon (336 samples, 0.13%)dfbench`<datafusion_physical_expr::expressions::like::LikeExpr as datafusion_physical_expr_common::physical_expr::PhysicalExpr>::evaluate (5,270 samples, 1.97%)d..dfbench`datafusion_physical_expr_common::datum::apply_cmp (5,248 samples, 1.96%)d..dfbench`<datafusion_physical_plan::filter::filter_coalesce::PrimitiveFilterBuilder<T,_> as datafusion_physical_plan::filter::filter_coalesce::FilterCoalescer>::append_filtered_array (79 samples, 0.03%)dfbench`datafusion_physical_plan::filter::filter_coalesce::bytes_view::ByteViewFilterBuilder<B>::do_append_val_inner (34 samples, 0.01%)dfbench`<datafusion_physical_plan::filter::filter_coalesce::bytes_view::ByteViewFilterBuilder<B> as datafusion_physical_plan::filter::filter_coalesce::FilterCoalescer>::append_filtered_array (49 samples, 0.02%)dfbench`_mi_page_free (61 samples, 0.02%)dfbench`mi_segment_page_clear (59 samples, 0.02%)dfbench`mi_segment_span_free_coalesce (56 samples, 0.02%)dfbench`mi_segment_span_free (49 samples, 0.02%)libsystem_c.dylib`clock_gettime (30 samples, 0.01%)libsystem_c.dylib`_mach_boottime_usec (28 samples, 0.01%)dfbench`mi_segment_free (31 samples, 0.01%)dfbench`mi_arenas_try_purge (31 samples, 0.01%)dfbench`mi_arena_purge (31 samples, 0.01%)libsystem_kernel.dylib`madvise (31 samples, 0.01%)dfbench`alloc::sync::Arc<T,A>::drop_slow (106 samples, 0.04%)dfbench`mi_segment_try_purge (37 samples, 0.01%)dfbench`mi_segment_purge (37 samples, 0.01%)libsystem_kernel.dylib`madvise (37 samples, 0.01%)dfbench`mi_segment_span_free (62 samples, 0.02%)dfbench`_mi_page_free (70 samples, 0.03%)dfbench`mi_segment_page_clear (70 samples, 0.03%)dfbench`mi_segment_span_free_coalesce (69 samples, 0.03%)dfbench`alloc::sync::Arc<T,A>::drop_slow (103 samples, 0.04%)dfbench`bytes::bytes::shared_drop (102 samples, 0.04%)dfbench`core::ptr::drop_in_place<alloc::vec::Vec<arrow_buffer::buffer::immutable::Buffer>> (117 samples, 0.04%)dfbench`alloc::sync::Arc<T,A>::drop_slow (110 samples, 0.04%)dfbench`mi_free (28 samples, 0.01%)dfbench`core::ptr::drop_in_place<arrow_array::array::byte_view_array::GenericByteViewArray<arrow_array::types::BinaryViewType>> (271 samples, 0.10%)libsystem_c.dylib`_mach_boottime_usec (38 samples, 0.01%)libsystem_c.dylib`gettimeofday (36 samples, 0.01%)libsystem_kernel.dylib`mach_absolute_time (33 samples, 0.01%)dfbench`_mi_page_free (119 samples, 0.04%)dfbench`mi_segment_page_clear (111 samples, 0.04%)dfbench`mi_segment_span_free_coalesce (109 samples, 0.04%)dfbench`mi_segment_span_free (90 samples, 0.03%)libsystem_c.dylib`clock_gettime (41 samples, 0.02%)dfbench`alloc::sync::Arc<T,A>::drop_slow (166 samples, 0.06%)dfbench`mi_free (65 samples, 0.02%)dfbench`core::ptr::drop_in_place<arrow_array::array::primitive_array::PrimitiveArray<arrow_array::types::Int8Type>> (249 samples, 0.09%)dfbench`alloc::sync::Arc<T,A>::drop_slow (576 samples, 0.21%)dfbench`mi_free (30 samples, 0.01%)dfbench`core::ptr::drop_in_place<alloc::vec::Vec<alloc::sync::Arc<dyn arrow_array::array::Array>>> (591 samples, 0.22%)dfbench`<datafusion_physical_plan::filter::FilterExecStream as futures_core::stream::Stream>::poll_next (228,660 samples, 85.27%)dfbench`<datafusion_physical_plan::filter::FilterExecStream as futures_core::stream::Stream>::poll_nextdfbench`<datafusion_physical_plan::filter::filter_coalesce::PrimitiveFilterBuilder<T,_> as datafusion_physical_plan::filter::filter_coalesce::FilterCoalescer>::append_filtered_array (41 samples, 0.02%)dfbench`<datafusion_physical_plan::coalesce_batches::CoalesceBatchesStream as futures_core::stream::Stream>::poll_next (228,743 samples, 85.30%)dfbench`<datafusion_physical_plan::coalesce_batches::CoalesceBatchesStream as futures_core::stream::Stream>::poll_nextdfbench`core::ptr::drop_in_place<parquet::file::metadata::ColumnChunkMetaData> (46 samples, 0.02%)dfbench`core::ptr::drop_in_place<parquet::file::metadata::RowGroupMetaData> (74 samples, 0.03%)dfbench`core::ptr::drop_in_place<datafusion_physical_plan::coalesce_batches::CoalesceBatchesStream> (81 samples, 0.03%)dfbench`core::ptr::drop_in_place<datafusion_physical_plan::filter::FilterExecStream> (81 samples, 0.03%)dfbench`core::ptr::drop_in_place<datafusion::datasource::physical_plan::file_stream::FileStream<datafusion::datasource::physical_plan::parquet::opener::ParquetOpener>> (81 samples, 0.03%)dfbench`core::ptr::drop_in_place<datafusion::datasource::physical_plan::file_stream::FileStreamState> (81 samples, 0.03%)dfbench`core::ptr::drop_in_place<futures_util::stream::stream::map::Map<futures_util::stream::try_stream::MapErr<parquet::arrow::async_reader::ParquetRecordBatchStream<alloc::boxed::Box<dyn parquet::arrow::async_reader::AsyncFileReader>>,<datafusion::datasource::physical_plan::parquet::opener::ParquetOpener as datafusion::datasource::physical_plan::file_stream::FileOpener>::open::{{closure}}::{{closure}}>,<datafusion::datasource::physical_plan::parquet::opener::ParquetOpener as datafusion::datasource::physical_plan::file_stream::FileOpener>::open::{{closure}}::{{closure}}>> (81 samples, 0.03%)dfbench`core::ptr::drop_in_place<parquet::arrow::async_reader::ReaderFactory<alloc::boxed::Box<dyn parquet::arrow::async_reader::AsyncFileReader>>> (81 samples, 0.03%)dfbench`alloc::sync::Arc<T,A>::drop_slow (81 samples, 0.03%)dfbench`core::ptr::drop_in_place<parquet::file::metadata::ParquetMetaData> (81 samples, 0.03%)dfbench`tokio::runtime::scheduler::multi_thread::worker::Context::run_task (228,840 samples, 85.34%)dfbench`tokio::runtime::scheduler::multi_thread::worker::Context::run_taskdfbench`tokio::runtime::task::raw::poll (228,836 samples, 85.34%)dfbench`tokio::runtime::task::raw::polldfbench`datafusion_physical_plan::common::spawn_buffered::_{{closure}} (228,835 samples, 85.34%)dfbench`datafusion_physical_plan::common::spawn_buffered::_{{closure}}dfbench`<datafusion_physical_plan::stream::RecordBatchStreamAdapter<S> as futures_core::stream::Stream>::poll_next (228,834 samples, 85.34%)dfbench`<datafusion_physical_plan::stream::RecordBatchStreamAdapter<S> as futures_core::stream::Stream>::poll_nextdfbench`core::ops::function::FnOnce::call_once{{vtable.shim}} (242,213 samples, 90.33%)dfbench`core::ops::function::FnOnce::call_once{{vtable.shim}}dfbench`std::sys::backtrace::__rust_begin_short_backtrace (242,213 samples, 90.33%)dfbench`std::sys::backtrace::__rust_begin_short_backtracedfbench`tokio::runtime::task::raw::poll (242,200 samples, 90.32%)dfbench`tokio::runtime::task::raw::polldfbench`std::sys::pal::unix::thread::Thread::new::thread_start (242,214 samples, 90.33%)dfbench`std::sys::pal::unix::thread::Thread::new::thread_startdfbench`_mi_free_delayed_block (32 samples, 0.01%)dfbench`_mi_page_free (31 samples, 0.01%)dfbench`mi_segment_page_clear (31 samples, 0.01%)dfbench`mi_segment_span_free_coalesce (31 samples, 0.01%)dfbench`mi_segment_span_free (31 samples, 0.01%)dfbench`mi_segment_try_purge (31 samples, 0.01%)dfbench`mi_segment_purge (31 samples, 0.01%)libsystem_kernel.dylib`madvise (31 samples, 0.01%)all (268,154 samples, 100%)libsystem_pthread.dylib`thread_start (242,250 samples, 90.34%)libsystem_pthread.dylib`thread_startlibsystem_pthread.dylib`_pthread_start (242,250 samples, 90.34%)libsystem_pthread.dylib`_pthread_startlibsystem_pthread.dylib`_pthread_exit (36 samples, 0.01%)libsystem_pthread.dylib`_pthread_tsd_cleanup (35 samples, 0.01%)dfbench`_mi_thread_done (35 samples, 0.01%)dfbench`mi_heap_collect_ex (35 samples, 0.01%) \ No newline at end of file