diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs b/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs index 41f7c8729ee3..8baea511c776 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs @@ -15,39 +15,36 @@ // specific language governing permissions and limitations // under the License. +mod native; mod strings; use std::any::Any; -use std::cmp::Eq; use std::collections::HashSet; use std::fmt::Debug; -use std::hash::Hash; use std::sync::Arc; use ahash::RandomState; use arrow::array::{Array, ArrayRef}; use arrow::datatypes::{DataType, Field, TimeUnit}; use arrow_array::types::{ - ArrowPrimitiveType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, - Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, - Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, + Date32Type, Date64Type, Decimal128Type, Decimal256Type, Float16Type, Float32Type, + Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, Time32MillisecondType, + Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; -use arrow_array::PrimitiveArray; -use datafusion_common::cast::{as_list_array, as_primitive_array}; -use datafusion_common::utils::array_into_list_array; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::Accumulator; +use crate::aggregate::count_distinct::native::{ + FloatDistinctCountAccumulator, PrimitiveDistinctCountAccumulator, +}; use crate::aggregate::count_distinct::strings::StringDistinctCountAccumulator; -use crate::aggregate::utils::{down_cast_any_ref, Hashable}; +use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; use crate::{AggregateExpr, PhysicalExpr}; -type DistinctScalarValues = ScalarValue; - /// Expression for a COUNT(DISTINCT) aggregation. #[derive(Debug)] pub struct DistinctCount { @@ -101,46 +98,46 @@ impl AggregateExpr for DistinctCount { use TimeUnit::*; Ok(match &self.state_data_type { - Int8 => Box::new(NativeDistinctCountAccumulator::::new()), - Int16 => Box::new(NativeDistinctCountAccumulator::::new()), - Int32 => Box::new(NativeDistinctCountAccumulator::::new()), - Int64 => Box::new(NativeDistinctCountAccumulator::::new()), - UInt8 => Box::new(NativeDistinctCountAccumulator::::new()), - UInt16 => Box::new(NativeDistinctCountAccumulator::::new()), - UInt32 => Box::new(NativeDistinctCountAccumulator::::new()), - UInt64 => Box::new(NativeDistinctCountAccumulator::::new()), + Int8 => Box::new(PrimitiveDistinctCountAccumulator::::new()), + Int16 => Box::new(PrimitiveDistinctCountAccumulator::::new()), + Int32 => Box::new(PrimitiveDistinctCountAccumulator::::new()), + Int64 => Box::new(PrimitiveDistinctCountAccumulator::::new()), + UInt8 => Box::new(PrimitiveDistinctCountAccumulator::::new()), + UInt16 => Box::new(PrimitiveDistinctCountAccumulator::::new()), + UInt32 => Box::new(PrimitiveDistinctCountAccumulator::::new()), + UInt64 => Box::new(PrimitiveDistinctCountAccumulator::::new()), Decimal128(_, _) => { - Box::new(NativeDistinctCountAccumulator::::new()) + Box::new(PrimitiveDistinctCountAccumulator::::new()) } Decimal256(_, _) => { - Box::new(NativeDistinctCountAccumulator::::new()) + Box::new(PrimitiveDistinctCountAccumulator::::new()) } - Date32 => Box::new(NativeDistinctCountAccumulator::::new()), - Date64 => Box::new(NativeDistinctCountAccumulator::::new()), - Time32(Millisecond) => { - Box::new(NativeDistinctCountAccumulator::::new()) - } + Date32 => Box::new(PrimitiveDistinctCountAccumulator::::new()), + Date64 => Box::new(PrimitiveDistinctCountAccumulator::::new()), + Time32(Millisecond) => Box::new(PrimitiveDistinctCountAccumulator::< + Time32MillisecondType, + >::new()), Time32(Second) => { - Box::new(NativeDistinctCountAccumulator::::new()) - } - Time64(Microsecond) => { - Box::new(NativeDistinctCountAccumulator::::new()) + Box::new(PrimitiveDistinctCountAccumulator::::new()) } + Time64(Microsecond) => Box::new(PrimitiveDistinctCountAccumulator::< + Time64MicrosecondType, + >::new()), Time64(Nanosecond) => { - Box::new(NativeDistinctCountAccumulator::::new()) + Box::new(PrimitiveDistinctCountAccumulator::::new()) } - Timestamp(Microsecond, _) => Box::new(NativeDistinctCountAccumulator::< + Timestamp(Microsecond, _) => Box::new(PrimitiveDistinctCountAccumulator::< TimestampMicrosecondType, >::new()), - Timestamp(Millisecond, _) => Box::new(NativeDistinctCountAccumulator::< + Timestamp(Millisecond, _) => Box::new(PrimitiveDistinctCountAccumulator::< TimestampMillisecondType, >::new()), - Timestamp(Nanosecond, _) => { - Box::new(NativeDistinctCountAccumulator::::new()) - } + Timestamp(Nanosecond, _) => Box::new(PrimitiveDistinctCountAccumulator::< + TimestampNanosecondType, + >::new()), Timestamp(Second, _) => { - Box::new(NativeDistinctCountAccumulator::::new()) + Box::new(PrimitiveDistinctCountAccumulator::::new()) } Float16 => Box::new(FloatDistinctCountAccumulator::::new()), @@ -175,9 +172,13 @@ impl PartialEq for DistinctCount { } } +/// General purpose distinct accumulator that works for any DataType by using +/// [`ScalarValue`]. Some types have specialized accumulators that are (much) +/// more efficient such as [`PrimitiveDistinctCountAccumulator`] and +/// [`StringDistinctCountAccumulator`] #[derive(Debug)] struct DistinctCountAccumulator { - values: HashSet, + values: HashSet, state_data_type: DataType, } @@ -186,7 +187,7 @@ impl DistinctCountAccumulator { // This method is faster than .full_size(), however it is not suitable for variable length values like strings or complex types fn fixed_size(&self) -> usize { std::mem::size_of_val(self) - + (std::mem::size_of::() * self.values.capacity()) + + (std::mem::size_of::() * self.values.capacity()) + self .values .iter() @@ -199,7 +200,7 @@ impl DistinctCountAccumulator { // calculates the size as accurate as possible, call to this method is expensive fn full_size(&self) -> usize { std::mem::size_of_val(self) - + (std::mem::size_of::() * self.values.capacity()) + + (std::mem::size_of::() * self.values.capacity()) + self .values .iter() @@ -260,182 +261,6 @@ impl Accumulator for DistinctCountAccumulator { } } -#[derive(Debug)] -struct NativeDistinctCountAccumulator -where - T: ArrowPrimitiveType + Send, - T::Native: Eq + Hash, -{ - values: HashSet, -} - -impl NativeDistinctCountAccumulator -where - T: ArrowPrimitiveType + Send, - T::Native: Eq + Hash, -{ - fn new() -> Self { - Self { - values: HashSet::default(), - } - } -} - -impl Accumulator for NativeDistinctCountAccumulator -where - T: ArrowPrimitiveType + Send + Debug, - T::Native: Eq + Hash, -{ - fn state(&mut self) -> Result> { - let arr = Arc::new(PrimitiveArray::::from_iter_values( - self.values.iter().cloned(), - )) as ArrayRef; - let list = Arc::new(array_into_list_array(arr)); - Ok(vec![ScalarValue::List(list)]) - } - - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if values.is_empty() { - return Ok(()); - } - - let arr = as_primitive_array::(&values[0])?; - arr.iter().for_each(|value| { - if let Some(value) = value { - self.values.insert(value); - } - }); - - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - if states.is_empty() { - return Ok(()); - } - assert_eq!( - states.len(), - 1, - "count_distinct states must be single array" - ); - - let arr = as_list_array(&states[0])?; - arr.iter().try_for_each(|maybe_list| { - if let Some(list) = maybe_list { - let list = as_primitive_array::(&list)?; - self.values.extend(list.values()) - }; - Ok(()) - }) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::Int64(Some(self.values.len() as i64))) - } - - fn size(&self) -> usize { - let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX) - / 7) - .next_power_of_two(); - - // Size of accumulator - // + size of entry * number of buckets - // + 1 byte for each bucket - // + fixed size of HashSet - std::mem::size_of_val(self) - + std::mem::size_of::() * estimated_buckets - + estimated_buckets - + std::mem::size_of_val(&self.values) - } -} - -#[derive(Debug)] -struct FloatDistinctCountAccumulator -where - T: ArrowPrimitiveType + Send, -{ - values: HashSet, RandomState>, -} - -impl FloatDistinctCountAccumulator -where - T: ArrowPrimitiveType + Send, -{ - fn new() -> Self { - Self { - values: HashSet::default(), - } - } -} - -impl Accumulator for FloatDistinctCountAccumulator -where - T: ArrowPrimitiveType + Send + Debug, -{ - fn state(&mut self) -> Result> { - let arr = Arc::new(PrimitiveArray::::from_iter_values( - self.values.iter().map(|v| v.0), - )) as ArrayRef; - let list = Arc::new(array_into_list_array(arr)); - Ok(vec![ScalarValue::List(list)]) - } - - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if values.is_empty() { - return Ok(()); - } - - let arr = as_primitive_array::(&values[0])?; - arr.iter().for_each(|value| { - if let Some(value) = value { - self.values.insert(Hashable(value)); - } - }); - - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - if states.is_empty() { - return Ok(()); - } - assert_eq!( - states.len(), - 1, - "count_distinct states must be single array" - ); - - let arr = as_list_array(&states[0])?; - arr.iter().try_for_each(|maybe_list| { - if let Some(list) = maybe_list { - let list = as_primitive_array::(&list)?; - self.values - .extend(list.values().iter().map(|v| Hashable(*v))); - }; - Ok(()) - }) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::Int64(Some(self.values.len() as i64))) - } - - fn size(&self) -> usize { - let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX) - / 7) - .next_power_of_two(); - - // Size of accumulator - // + size of entry * number of buckets - // + 1 byte for each bucket - // + fixed size of HashSet - std::mem::size_of_val(self) - + std::mem::size_of::() * estimated_buckets - + estimated_buckets - + std::mem::size_of_val(&self.values) - } -} - #[cfg(test)] mod tests { use arrow::array::{ diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/native.rs b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs new file mode 100644 index 000000000000..a44e8b772e5a --- /dev/null +++ b/datafusion/physical-expr/src/aggregate/count_distinct/native.rs @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Specialized implementation of `COUNT DISTINCT` for "Native" arrays such as +//! [`Int64Array`] and [`Float64Array`] +//! +//! [`Int64Array`]: arrow::array::Int64Array +//! [`Float64Array`]: arrow::array::Float64Array +use std::cmp::Eq; +use std::collections::HashSet; +use std::fmt::Debug; +use std::hash::Hash; +use std::sync::Arc; + +use ahash::RandomState; +use arrow::array::ArrayRef; +use arrow_array::types::ArrowPrimitiveType; +use arrow_array::PrimitiveArray; + +use datafusion_common::cast::{as_list_array, as_primitive_array}; +use datafusion_common::utils::array_into_list_array; +use datafusion_common::ScalarValue; +use datafusion_expr::Accumulator; + +use crate::aggregate::utils::Hashable; + +#[derive(Debug)] +pub(super) struct PrimitiveDistinctCountAccumulator +where + T: ArrowPrimitiveType + Send, + T::Native: Eq + Hash, +{ + values: HashSet, +} + +impl PrimitiveDistinctCountAccumulator +where + T: ArrowPrimitiveType + Send, + T::Native: Eq + Hash, +{ + pub(super) fn new() -> Self { + Self { + values: HashSet::default(), + } + } +} + +impl Accumulator for PrimitiveDistinctCountAccumulator +where + T: ArrowPrimitiveType + Send + Debug, + T::Native: Eq + Hash, +{ + fn state(&mut self) -> datafusion_common::Result> { + let arr = Arc::new(PrimitiveArray::::from_iter_values( + self.values.iter().cloned(), + )) as ArrayRef; + let list = Arc::new(array_into_list_array(arr)); + Ok(vec![ScalarValue::List(list)]) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = as_primitive_array::(&values[0])?; + arr.iter().for_each(|value| { + if let Some(value) = value { + self.values.insert(value); + } + }); + + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { + if states.is_empty() { + return Ok(()); + } + assert_eq!( + states.len(), + 1, + "count_distinct states must be single array" + ); + + let arr = as_list_array(&states[0])?; + arr.iter().try_for_each(|maybe_list| { + if let Some(list) = maybe_list { + let list = as_primitive_array::(&list)?; + self.values.extend(list.values()) + }; + Ok(()) + }) + } + + fn evaluate(&mut self) -> datafusion_common::Result { + Ok(ScalarValue::Int64(Some(self.values.len() as i64))) + } + + fn size(&self) -> usize { + let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX) + / 7) + .next_power_of_two(); + + // Size of accumulator + // + size of entry * number of buckets + // + 1 byte for each bucket + // + fixed size of HashSet + std::mem::size_of_val(self) + + std::mem::size_of::() * estimated_buckets + + estimated_buckets + + std::mem::size_of_val(&self.values) + } +} + +#[derive(Debug)] +pub(super) struct FloatDistinctCountAccumulator +where + T: ArrowPrimitiveType + Send, +{ + values: HashSet, RandomState>, +} + +impl FloatDistinctCountAccumulator +where + T: ArrowPrimitiveType + Send, +{ + pub(super) fn new() -> Self { + Self { + values: HashSet::default(), + } + } +} + +impl Accumulator for FloatDistinctCountAccumulator +where + T: ArrowPrimitiveType + Send + Debug, +{ + fn state(&mut self) -> datafusion_common::Result> { + let arr = Arc::new(PrimitiveArray::::from_iter_values( + self.values.iter().map(|v| v.0), + )) as ArrayRef; + let list = Arc::new(array_into_list_array(arr)); + Ok(vec![ScalarValue::List(list)]) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = as_primitive_array::(&values[0])?; + arr.iter().for_each(|value| { + if let Some(value) = value { + self.values.insert(Hashable(value)); + } + }); + + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { + if states.is_empty() { + return Ok(()); + } + assert_eq!( + states.len(), + 1, + "count_distinct states must be single array" + ); + + let arr = as_list_array(&states[0])?; + arr.iter().try_for_each(|maybe_list| { + if let Some(list) = maybe_list { + let list = as_primitive_array::(&list)?; + self.values + .extend(list.values().iter().map(|v| Hashable(*v))); + }; + Ok(()) + }) + } + + fn evaluate(&mut self) -> datafusion_common::Result { + Ok(ScalarValue::Int64(Some(self.values.len() as i64))) + } + + fn size(&self) -> usize { + let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX) + / 7) + .next_power_of_two(); + + // Size of accumulator + // + size of entry * number of buckets + // + 1 byte for each bucket + // + fixed size of HashSet + std::mem::size_of_val(self) + + std::mem::size_of::() * estimated_buckets + + estimated_buckets + + std::mem::size_of_val(&self.values) + } +} diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/strings.rs b/datafusion/physical-expr/src/aggregate/count_distinct/strings.rs index d7a9ea5c373d..02d30c350623 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct/strings.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct/strings.rs @@ -15,7 +15,11 @@ // specific language governing permissions and limitations // under the License. -//! Specialized implementation of `COUNT DISTINCT` for `StringArray` and `LargeStringArray` +//! Specialized implementation of `COUNT DISTINCT` for [`StringArray`] +//! and [`LargeStringArray`] +//! +//! [`StringArray`]: arrow::array::StringArray +//! [`LargeStringArray`]: arrow::array::LargeStringArray use ahash::RandomState; use arrow_array::cast::AsArray;