Skip to content

Commit

Permalink
Improve aggregatation documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored and korowa committed Aug 3, 2024
1 parent 8913fcf commit 5b183d6
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 22 deletions.
105 changes: 103 additions & 2 deletions datafusion/expr/src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub trait Accumulator: Send + Sync + Debug {
///
/// Intermediate state is used for "multi-phase" grouping in
/// DataFusion, where an aggregate is computed in parallel with
/// multiple `Accumulator` instances, as illustrated below:
/// multiple `Accumulator` instances, as described below:
///
/// # MultiPhase Grouping
///
Expand Down Expand Up @@ -130,7 +130,7 @@ pub trait Accumulator: Send + Sync + Debug {
/// `───────' `───────'
/// ```
///
/// The partial state is serialied as `Arrays` and then combined
/// The partial state is serialized as `Arrays` and then combined
/// with other partial states from different instances of this
/// Accumulator (that ran on different partitions, for example).
///
Expand All @@ -147,6 +147,107 @@ pub trait Accumulator: Send + Sync + Debug {
/// Note that [`ScalarValue::List`] can be used to pass multiple
/// values if the number of intermediate values is not known at
/// planning time (e.g. for `MEDIAN`)
///
/// # Multi-phase repartitioned Grouping
///
/// Many multi-phase grouping plans contain a Repartition operation
/// as well as shown below:
///
/// ```text
/// ▲ ▲
/// │ │
/// │ │
/// │ │
/// │ │
/// │ │
/// ┌───────────────────────┐ ┌───────────────────────┐ 4. Each AggregateMode::Final
/// │GroupBy │ │GroupBy │ GroupBy has an entry for its
/// │(AggregateMode::Final) │ │(AggregateMode::Final) │ subset of groups (in this case
/// │ │ │ │ that means half the entries)
/// └───────────────────────┘ └───────────────────────┘
/// ▲ ▲
/// │ │
/// └─────────────┬────────────┘
/// │
/// │
/// │
/// ┌─────────────────────────┐ 3. Repartitioning by hash(group
/// │ Repartition │ keys) ensures that each distinct
/// │ HASH(x) │ group key now appears in exactly
/// └─────────────────────────┘ one partition
/// ▲
/// │
/// ┌───────────────┴─────────────┐
/// │ │
/// │ │
/// ┌─────────────────────────┐ ┌──────────────────────────┐ 2. Each AggregateMode::Partial
/// │ GroubyBy │ │ GroubyBy │ GroupBy has an entry for *all*
/// │(AggregateMode::Partial) │ │ (AggregateMode::Partial) │ the groups
/// └─────────────────────────┘ └──────────────────────────┘
/// ▲ ▲
/// │ ┌┘
/// │ │
/// .─────────. .─────────.
/// ,─' '─. ,─' '─.
/// ; Input : ; Input : 1. Since input data is
/// : Partition 0 ; : Partition 1 ; arbitrarily or RoundRobin
/// ╲ ╱ ╲ ╱ distributed, each partition
/// '─. ,─' '─. ,─' likely has all distinct
/// `───────' `───────'
/// ```
///
/// This structure is used so that the `AggregateMode::Partial` accumulators
/// reduces the cardinality of the input as soon as possible. Typically,
/// each partial accumulator sees all groups in the input as the group keys
/// are evenly distributed across the input.
///
/// The final output is computed by repartitioning the result of
/// [`Self::state`] from each Partial aggregate and `hash(group keys)` so
/// that each distinct group key appears in exactly one of the
/// `AggregateMode::Final` GroupBy nodes. The output of the final nodes are
/// then unioned together to produce the overall final output.
///
/// Here is an example that shows the distribution of groups in the
/// different phases
///
/// ```text
/// ┌─────┐ ┌─────┐
/// │ 1 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 2 │ │ 4 │ After repartitioning by
/// └─────┘ └─────┘ hash(group keys), each distinct
/// ┌─────┐ ┌─────┐ group key now appears in exactly
/// │ 1 │ │ 3 │ one partition
/// ├─────┤ ├─────┤
/// │ 2 │ │ 4 │
/// └─────┘ └─────┘
///
///
/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
///
/// ┌─────┐ ┌─────┐
/// │ 2 │ │ 2 │
/// ├─────┤ ├─────┤
/// │ 1 │ │ 2 │
/// ├─────┤ ├─────┤
/// │ 3 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 4 │ │ 1 │
/// └─────┘ └─────┘ Input data is arbitrarily or
/// ... ... RoundRobin distributed, each
/// ┌─────┐ ┌─────┐ partition likely has all
/// │ 1 │ │ 4 │ distinct group keys
/// ├─────┤ ├─────┤
/// │ 4 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 1 │ │ 1 │
/// ├─────┤ ├─────┤
/// │ 4 │ │ 3 │
/// └─────┘ └─────┘
///
/// group values group values
/// in partition 0 in partition 1
/// ```
fn state(&mut self) -> Result<Vec<ScalarValue>>;

/// Updates the accumulator's state from an `Array` containing one
Expand Down
58 changes: 48 additions & 10 deletions datafusion/expr/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,23 @@ pub trait GroupsAccumulator: Send {
/// Returns the intermediate aggregate state for this accumulator,
/// used for multi-phase grouping, resetting its internal state.
///
/// See [`Accumulator::state`] for more information on multi-phase
/// aggregation.
///
/// For example, `AVG` might return two arrays: `SUM` and `COUNT`
/// but the `MIN` aggregate would just return a single array.
///
/// Note more sophisticated internal state can be passed as
/// single `StructArray` rather than multiple arrays.
///
/// See [`Self::evaluate`] for details on the required output
/// order and `emit_to`.
/// order and `emit_to`.
///
/// [`Accumulator::state`]: crate::Accumulator::state
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;

/// Merges intermediate state (the output from [`Self::state`])
/// into this accumulator's values.
/// into this accumulator's current state.
///
/// For some aggregates (such as `SUM`), `merge_batch` is the same
/// as `update_batch`, but for some aggregates (such as `COUNT`,
Expand All @@ -158,9 +163,41 @@ pub trait GroupsAccumulator: Send {
total_num_groups: usize,
) -> Result<()>;

/// Converts input batch to intermediate aggregate state,
/// without grouping (each input row considered as a separate
/// group).
/// Converts an input batch directly the intermediate aggregate state.
///
/// This is the equivalent of treating each input row as its own group. It
/// is invoked when the Partial phase of a multi-phase aggregation is not
/// reducing the cardinality enough to warrant spending more effort on
/// pre-aggregation (see `Background` section below), and switches to
/// passing intermediate state directly on to the next aggregation phase.
///
/// Examples:
/// * `COUNT`: an array of 1s for each row in the input batch.
/// * `SUM/MIN/MAX`: the input values themselves.
///
/// # Arguments
/// * `values`: the input arguments to the accumulator
/// * `opt_filter`: if present, any row where `opt_filter[i]` is false should be ignored
///
/// # Background
///
/// In a multi-phase aggregation (see [`Accumulator::state`]), the initial
/// Partial phase reduces the cardinality of the input data as soon as
/// possible in the plan.
///
/// This strategy is very effective for queries with a small number of
/// groups, as most of the data is aggregated immediately and only a small
/// amount of data must be repartitioned (see [`Accumulator::state`] for
/// background)
///
/// However, for queries with a large number of groups, the Partial phase
/// often does not reduce the cardinality enough to warrant the memory and
/// CPU cost of actually performing the aggregation. For such cases, the
/// HashAggregate operator will dynamically switch to passing intermediate
/// state directly to the next aggregation phase with minimal processing
/// using this method.
///
/// [`Accumulator::state`]: crate::Accumulator::state
fn convert_to_state(
&self,
_values: &[ArrayRef],
Expand All @@ -169,15 +206,16 @@ pub trait GroupsAccumulator: Send {
not_impl_err!("Input batch conversion to state not implemented")
}

/// Returns `true` is groups accumulator supports input batch
/// to intermediate aggregate state conversion (`convert_to_state`
/// method is implemented).
/// Returns `true` if [`Self::convert_to_state`] is implemented to support
/// intermediate aggregate state conversion.
fn supports_convert_to_state(&self) -> bool {
false
}

/// Amount of memory used to store the state of this accumulator,
/// in bytes. This function is called once per batch, so it should
/// be `O(n)` to compute, not `O(num_groups)`
/// in bytes.
///
/// This function is called once per batch, so it should be `O(n)` to
/// compute, not `O(num_groups)`
fn size(&self) -> usize;
}
4 changes: 3 additions & 1 deletion datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {

/// Return the fields used to store the intermediate state of this accumulator.
///
/// See [`Accumulator::state`] for background information.
///
/// args: [`StateFieldsArgs`] contains arguments passed to the
/// aggregate function's accumulator.
///
Expand Down Expand Up @@ -388,7 +390,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
/// # Notes
///
/// Even if this function returns true, DataFusion will still use
/// `Self::accumulator` for certain queries, such as when this aggregate is
/// [`Self::accumulator`] for certain queries, such as when this aggregate is
/// used as a window function or when there no GROUP BY columns in the
/// query.
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,11 @@ impl GroupsAccumulator for CountGroupsAccumulator {
Ok(vec![Arc::new(counts) as ArrayRef])
}

/// Converts an input batch directly to a state batch
///
/// The state of `COUNT` is always a single Int64Array:
/// * `1` (for non-null, non filtered values)
/// * `0` (for null values)
fn convert_to_state(
&self,
values: &[ArrayRef],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ where
self.update_batch(values, group_indices, opt_filter, total_num_groups)
}

/// Converts an input batch directly to a state batch
///
/// The state is:
/// - self.prim_fn for all non null, non filtered values
/// - null otherwise
///
fn convert_to_state(
&self,
values: &[ArrayRef],
Expand Down
18 changes: 15 additions & 3 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,20 @@ mod topk;
mod topk_stream;

/// Hash aggregate modes
///
/// See [`Accumulator::state`] for background information on multi-phase
/// aggregation and how these modes are used.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AggregateMode {
/// Partial aggregate that can be applied in parallel across input partitions
/// Partial aggregate that can be applied in parallel across input
/// partitions.
///
/// This is the first phase of a multi-phase aggregation.
Partial,
/// Final aggregate that produces a single partition of output
/// Final aggregate that produces a single partition of output by combining
/// the output of multiple partial aggregates.
///
/// This is the second phase of a multi-phase aggregation.
Final,
/// Final aggregate that works on pre-partitioned data.
///
Expand All @@ -72,12 +81,15 @@ pub enum AggregateMode {
/// Applies the entire logical aggregation operation in a single operator,
/// as opposed to Partial / Final modes which apply the logical aggregation using
/// two operators.
///
/// This mode requires that the input is a single partition (like Final)
Single,
/// Applies the entire logical aggregation operation in a single operator,
/// as opposed to Partial / Final modes which apply the logical aggregation using
/// two operators.
/// This mode requires that the input is partitioned by group key (like FinalPartitioned)
///
/// This mode requires that the input is partitioned by group key (like
/// FinalPartitioned)
SinglePartitioned,
}

Expand Down
35 changes: 29 additions & 6 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ pub(crate) enum ExecutionState {
/// When producing output, the remaining rows to output are stored
/// here and are sliced off as needed in batch_size chunks
ProducingOutput(RecordBatch),
/// Indicates that GroupedHashAggregateStream should produce
/// intermediate aggregate state for each input rows without
/// their aggregation
/// Produce intermediate aggregate state for each input row without
/// aggregation.
///
/// See "partial aggregation" discussion on [`GroupedHashAggregateStream`]
SkippingAggregation,
/// All input has been consumed and all groups have been emitted
Done,
}

Expand Down Expand Up @@ -94,6 +96,9 @@ struct SpillState {
merging_group_by: PhysicalGroupBy,
}

/// Tracks if the aggregate should skip partial aggregations
///
/// See "partial aggregation" discussion on [`GroupedHashAggregateStream`]
struct SkipAggregationProbe {
/// Number of processed input rows
input_rows: usize,
Expand Down Expand Up @@ -204,7 +209,7 @@ impl SkipAggregationProbe {
/// of `x` and one accumulator for `SUM(y)`, specialized for the data
/// type of `y`.
///
/// # Description
/// # Discussion
///
/// [`group_values`] does not store any aggregate state inline. It only
/// assigns "group indices", one for each (distinct) group value. The
Expand All @@ -222,7 +227,25 @@ impl SkipAggregationProbe {
///
/// [`group_values`]: Self::group_values
///
/// # Spilling
/// # Partial Aggregate and multi-phase grouping
///
/// As described on [`Accumulator::state`], this operator is used in the context
/// "multi-phase" grouping when the mode is [`AggregateMode::Partial`].
///
/// An important optimization for multi-phase partial aggregation is to skip
/// partial aggregation when it is not effective enough to warrant the memory or
/// CPU cost, as is often the case for queries many distinct groups (high
/// cardinality group by). Memory is particularly important because each Partial
/// aggregator must store the intermediate state for each group.
///
/// If the ratio of the number of groups to the number of input rows exceeds a
/// threshold, and [`GroupsAccumulator::convert_to_state_supported`] is
/// supported, this operator will stop applying Partial aggregation and directly
/// pass the input rows to the next aggregation phase.
///
/// [`Accumulator::state`]: datafusion_expr::Accumulator::state
///
/// # Spilling (to disk)
///
/// The sizes of group values and accumulators can become large. Before that causes out of memory,
/// this hash aggregator outputs partial states early for partial aggregation or spills to local
Expand Down Expand Up @@ -344,7 +367,7 @@ pub(crate) struct GroupedHashAggregateStream {
group_values_soft_limit: Option<usize>,

/// Optional probe for skipping data aggregation, if supported by
/// current stream
/// current stream.
skip_aggregation_probe: Option<SkipAggregationProbe>,
}

Expand Down

0 comments on commit 5b183d6

Please sign in to comment.