From 5b183d6b01f6e5078ed4a0e4aba0a382d43e0ca6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 28 Jul 2024 08:36:46 -0400 Subject: [PATCH] Improve aggregatation documentation --- datafusion/expr/src/accumulator.rs | 105 +++++++++++++++++- datafusion/expr/src/groups_accumulator.rs | 58 ++++++++-- datafusion/expr/src/udaf.rs | 4 +- datafusion/functions-aggregate/src/count.rs | 5 + .../aggregate/groups_accumulator/prim_op.rs | 6 + .../physical-plan/src/aggregates/mod.rs | 18 ++- .../physical-plan/src/aggregates/row_hash.rs | 35 +++++- 7 files changed, 209 insertions(+), 22 deletions(-) diff --git a/datafusion/expr/src/accumulator.rs b/datafusion/expr/src/accumulator.rs index f9af7850cb924..262646d8ba3ae 100644 --- a/datafusion/expr/src/accumulator.rs +++ b/datafusion/expr/src/accumulator.rs @@ -94,7 +94,7 @@ pub trait Accumulator: Send + Sync + Debug { /// /// Intermediate state is used for "multi-phase" grouping in /// DataFusion, where an aggregate is computed in parallel with - /// multiple `Accumulator` instances, as illustrated below: + /// multiple `Accumulator` instances, as described below: /// /// # MultiPhase Grouping /// @@ -130,7 +130,7 @@ pub trait Accumulator: Send + Sync + Debug { /// `───────' `───────' /// ``` /// - /// The partial state is serialied as `Arrays` and then combined + /// The partial state is serialized as `Arrays` and then combined /// with other partial states from different instances of this /// Accumulator (that ran on different partitions, for example). /// @@ -147,6 +147,107 @@ pub trait Accumulator: Send + Sync + Debug { /// Note that [`ScalarValue::List`] can be used to pass multiple /// values if the number of intermediate values is not known at /// planning time (e.g. for `MEDIAN`) + /// + /// # Multi-phase repartitioned Grouping + /// + /// Many multi-phase grouping plans contain a Repartition operation + /// as well as shown below: + /// + /// ```text + /// ▲ ▲ + /// │ │ + /// │ │ + /// │ │ + /// │ │ + /// │ │ + /// ┌───────────────────────┐ ┌───────────────────────┐ 4. Each AggregateMode::Final + /// │GroupBy │ │GroupBy │ GroupBy has an entry for its + /// │(AggregateMode::Final) │ │(AggregateMode::Final) │ subset of groups (in this case + /// │ │ │ │ that means half the entries) + /// └───────────────────────┘ └───────────────────────┘ + /// ▲ ▲ + /// │ │ + /// └─────────────┬────────────┘ + /// │ + /// │ + /// │ + /// ┌─────────────────────────┐ 3. Repartitioning by hash(group + /// │ Repartition │ keys) ensures that each distinct + /// │ HASH(x) │ group key now appears in exactly + /// └─────────────────────────┘ one partition + /// ▲ + /// │ + /// ┌───────────────┴─────────────┐ + /// │ │ + /// │ │ + /// ┌─────────────────────────┐ ┌──────────────────────────┐ 2. Each AggregateMode::Partial + /// │ GroubyBy │ │ GroubyBy │ GroupBy has an entry for *all* + /// │(AggregateMode::Partial) │ │ (AggregateMode::Partial) │ the groups + /// └─────────────────────────┘ └──────────────────────────┘ + /// ▲ ▲ + /// │ ┌┘ + /// │ │ + /// .─────────. .─────────. + /// ,─' '─. ,─' '─. + /// ; Input : ; Input : 1. Since input data is + /// : Partition 0 ; : Partition 1 ; arbitrarily or RoundRobin + /// ╲ ╱ ╲ ╱ distributed, each partition + /// '─. ,─' '─. ,─' likely has all distinct + /// `───────' `───────' + /// ``` + /// + /// This structure is used so that the `AggregateMode::Partial` accumulators + /// reduces the cardinality of the input as soon as possible. Typically, + /// each partial accumulator sees all groups in the input as the group keys + /// are evenly distributed across the input. + /// + /// The final output is computed by repartitioning the result of + /// [`Self::state`] from each Partial aggregate and `hash(group keys)` so + /// that each distinct group key appears in exactly one of the + /// `AggregateMode::Final` GroupBy nodes. The output of the final nodes are + /// then unioned together to produce the overall final output. + /// + /// Here is an example that shows the distribution of groups in the + /// different phases + /// + /// ```text + /// ┌─────┐ ┌─────┐ + /// │ 1 │ │ 3 │ + /// ├─────┤ ├─────┤ + /// │ 2 │ │ 4 │ After repartitioning by + /// └─────┘ └─────┘ hash(group keys), each distinct + /// ┌─────┐ ┌─────┐ group key now appears in exactly + /// │ 1 │ │ 3 │ one partition + /// ├─────┤ ├─────┤ + /// │ 2 │ │ 4 │ + /// └─────┘ └─────┘ + /// + /// + /// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ + /// + /// ┌─────┐ ┌─────┐ + /// │ 2 │ │ 2 │ + /// ├─────┤ ├─────┤ + /// │ 1 │ │ 2 │ + /// ├─────┤ ├─────┤ + /// │ 3 │ │ 3 │ + /// ├─────┤ ├─────┤ + /// │ 4 │ │ 1 │ + /// └─────┘ └─────┘ Input data is arbitrarily or + /// ... ... RoundRobin distributed, each + /// ┌─────┐ ┌─────┐ partition likely has all + /// │ 1 │ │ 4 │ distinct group keys + /// ├─────┤ ├─────┤ + /// │ 4 │ │ 3 │ + /// ├─────┤ ├─────┤ + /// │ 1 │ │ 1 │ + /// ├─────┤ ├─────┤ + /// │ 4 │ │ 3 │ + /// └─────┘ └─────┘ + /// + /// group values group values + /// in partition 0 in partition 1 + /// ``` fn state(&mut self) -> Result>; /// Updates the accumulator's state from an `Array` containing one diff --git a/datafusion/expr/src/groups_accumulator.rs b/datafusion/expr/src/groups_accumulator.rs index 620e93a178465..886bd8443e4d3 100644 --- a/datafusion/expr/src/groups_accumulator.rs +++ b/datafusion/expr/src/groups_accumulator.rs @@ -128,6 +128,9 @@ pub trait GroupsAccumulator: Send { /// Returns the intermediate aggregate state for this accumulator, /// used for multi-phase grouping, resetting its internal state. /// + /// See [`Accumulator::state`] for more information on multi-phase + /// aggregation. + /// /// For example, `AVG` might return two arrays: `SUM` and `COUNT` /// but the `MIN` aggregate would just return a single array. /// @@ -135,11 +138,13 @@ pub trait GroupsAccumulator: Send { /// single `StructArray` rather than multiple arrays. /// /// See [`Self::evaluate`] for details on the required output - /// order and `emit_to`. + /// order and `emit_to`. + /// + /// [`Accumulator::state`]: crate::Accumulator::state fn state(&mut self, emit_to: EmitTo) -> Result>; /// Merges intermediate state (the output from [`Self::state`]) - /// into this accumulator's values. + /// into this accumulator's current state. /// /// For some aggregates (such as `SUM`), `merge_batch` is the same /// as `update_batch`, but for some aggregates (such as `COUNT`, @@ -158,9 +163,41 @@ pub trait GroupsAccumulator: Send { total_num_groups: usize, ) -> Result<()>; - /// Converts input batch to intermediate aggregate state, - /// without grouping (each input row considered as a separate - /// group). + /// Converts an input batch directly the intermediate aggregate state. + /// + /// This is the equivalent of treating each input row as its own group. It + /// is invoked when the Partial phase of a multi-phase aggregation is not + /// reducing the cardinality enough to warrant spending more effort on + /// pre-aggregation (see `Background` section below), and switches to + /// passing intermediate state directly on to the next aggregation phase. + /// + /// Examples: + /// * `COUNT`: an array of 1s for each row in the input batch. + /// * `SUM/MIN/MAX`: the input values themselves. + /// + /// # Arguments + /// * `values`: the input arguments to the accumulator + /// * `opt_filter`: if present, any row where `opt_filter[i]` is false should be ignored + /// + /// # Background + /// + /// In a multi-phase aggregation (see [`Accumulator::state`]), the initial + /// Partial phase reduces the cardinality of the input data as soon as + /// possible in the plan. + /// + /// This strategy is very effective for queries with a small number of + /// groups, as most of the data is aggregated immediately and only a small + /// amount of data must be repartitioned (see [`Accumulator::state`] for + /// background) + /// + /// However, for queries with a large number of groups, the Partial phase + /// often does not reduce the cardinality enough to warrant the memory and + /// CPU cost of actually performing the aggregation. For such cases, the + /// HashAggregate operator will dynamically switch to passing intermediate + /// state directly to the next aggregation phase with minimal processing + /// using this method. + /// + /// [`Accumulator::state`]: crate::Accumulator::state fn convert_to_state( &self, _values: &[ArrayRef], @@ -169,15 +206,16 @@ pub trait GroupsAccumulator: Send { not_impl_err!("Input batch conversion to state not implemented") } - /// Returns `true` is groups accumulator supports input batch - /// to intermediate aggregate state conversion (`convert_to_state` - /// method is implemented). + /// Returns `true` if [`Self::convert_to_state`] is implemented to support + /// intermediate aggregate state conversion. fn supports_convert_to_state(&self) -> bool { false } /// Amount of memory used to store the state of this accumulator, - /// in bytes. This function is called once per batch, so it should - /// be `O(n)` to compute, not `O(num_groups)` + /// in bytes. + /// + /// This function is called once per batch, so it should be `O(n)` to + /// compute, not `O(num_groups)` fn size(&self) -> usize; } diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index f5eeef6b53bbe..3a292b2b49bfb 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -351,6 +351,8 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { /// Return the fields used to store the intermediate state of this accumulator. /// + /// See [`Accumulator::state`] for background information. + /// /// args: [`StateFieldsArgs`] contains arguments passed to the /// aggregate function's accumulator. /// @@ -388,7 +390,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { /// # Notes /// /// Even if this function returns true, DataFusion will still use - /// `Self::accumulator` for certain queries, such as when this aggregate is + /// [`Self::accumulator`] for certain queries, such as when this aggregate is /// used as a window function or when there no GROUP BY columns in the /// query. fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index e81489005e183..aea05442536ee 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -441,6 +441,11 @@ impl GroupsAccumulator for CountGroupsAccumulator { Ok(vec![Arc::new(counts) as ArrayRef]) } + /// Converts an input batch directly to a state batch + /// + /// The state of `COUNT` is always a single Int64Array: + /// * `1` (for non-null, non filtered values) + /// * `0` (for null values) fn convert_to_state( &self, values: &[ArrayRef], diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs index a78c6f597cd60..8d69646bd422a 100644 --- a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs @@ -136,6 +136,12 @@ where self.update_batch(values, group_indices, opt_filter, total_num_groups) } + /// Converts an input batch directly to a state batch + /// + /// The state is: + /// - self.prim_fn for all non null, non filtered values + /// - null otherwise + /// fn convert_to_state( &self, values: &[ArrayRef], diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 7eab9e6cc4c4e..8941418c12e1e 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -56,11 +56,20 @@ mod topk; mod topk_stream; /// Hash aggregate modes +/// +/// See [`Accumulator::state`] for background information on multi-phase +/// aggregation and how these modes are used. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum AggregateMode { - /// Partial aggregate that can be applied in parallel across input partitions + /// Partial aggregate that can be applied in parallel across input + /// partitions. + /// + /// This is the first phase of a multi-phase aggregation. Partial, - /// Final aggregate that produces a single partition of output + /// Final aggregate that produces a single partition of output by combining + /// the output of multiple partial aggregates. + /// + /// This is the second phase of a multi-phase aggregation. Final, /// Final aggregate that works on pre-partitioned data. /// @@ -72,12 +81,15 @@ pub enum AggregateMode { /// Applies the entire logical aggregation operation in a single operator, /// as opposed to Partial / Final modes which apply the logical aggregation using /// two operators. + /// /// This mode requires that the input is a single partition (like Final) Single, /// Applies the entire logical aggregation operation in a single operator, /// as opposed to Partial / Final modes which apply the logical aggregation using /// two operators. - /// This mode requires that the input is partitioned by group key (like FinalPartitioned) + /// + /// This mode requires that the input is partitioned by group key (like + /// FinalPartitioned) SinglePartitioned, } diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index e30ac250e41f7..9db81fe28ea43 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -62,10 +62,12 @@ pub(crate) enum ExecutionState { /// When producing output, the remaining rows to output are stored /// here and are sliced off as needed in batch_size chunks ProducingOutput(RecordBatch), - /// Indicates that GroupedHashAggregateStream should produce - /// intermediate aggregate state for each input rows without - /// their aggregation + /// Produce intermediate aggregate state for each input row without + /// aggregation. + /// + /// See "partial aggregation" discussion on [`GroupedHashAggregateStream`] SkippingAggregation, + /// All input has been consumed and all groups have been emitted Done, } @@ -94,6 +96,9 @@ struct SpillState { merging_group_by: PhysicalGroupBy, } +/// Tracks if the aggregate should skip partial aggregations +/// +/// See "partial aggregation" discussion on [`GroupedHashAggregateStream`] struct SkipAggregationProbe { /// Number of processed input rows input_rows: usize, @@ -204,7 +209,7 @@ impl SkipAggregationProbe { /// of `x` and one accumulator for `SUM(y)`, specialized for the data /// type of `y`. /// -/// # Description +/// # Discussion /// /// [`group_values`] does not store any aggregate state inline. It only /// assigns "group indices", one for each (distinct) group value. The @@ -222,7 +227,25 @@ impl SkipAggregationProbe { /// /// [`group_values`]: Self::group_values /// -/// # Spilling +/// # Partial Aggregate and multi-phase grouping +/// +/// As described on [`Accumulator::state`], this operator is used in the context +/// "multi-phase" grouping when the mode is [`AggregateMode::Partial`]. +/// +/// An important optimization for multi-phase partial aggregation is to skip +/// partial aggregation when it is not effective enough to warrant the memory or +/// CPU cost, as is often the case for queries many distinct groups (high +/// cardinality group by). Memory is particularly important because each Partial +/// aggregator must store the intermediate state for each group. +/// +/// If the ratio of the number of groups to the number of input rows exceeds a +/// threshold, and [`GroupsAccumulator::convert_to_state_supported`] is +/// supported, this operator will stop applying Partial aggregation and directly +/// pass the input rows to the next aggregation phase. +/// +/// [`Accumulator::state`]: datafusion_expr::Accumulator::state +/// +/// # Spilling (to disk) /// /// The sizes of group values and accumulators can become large. Before that causes out of memory, /// this hash aggregator outputs partial states early for partial aggregation or spills to local @@ -344,7 +367,7 @@ pub(crate) struct GroupedHashAggregateStream { group_values_soft_limit: Option, /// Optional probe for skipping data aggregation, if supported by - /// current stream + /// current stream. skip_aggregation_probe: Option, }