Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make aggregate accumulators storage column-based #956

Closed
Tracked by #4973
Dandandan opened this issue Aug 30, 2021 · 9 comments · Fixed by #6904
Closed
Tracked by #4973

Make aggregate accumulators storage column-based #956

Dandandan opened this issue Aug 30, 2021 · 9 comments · Fixed by #6904
Labels
enhancement New feature or request performance Make DataFusion faster

Comments

@Dandandan
Copy link
Contributor

Dandandan commented Aug 30, 2021

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Currently, the aggregate code keeps the states of each group by value, by storing it in a Vec together with the group by values.

For low-cardinality aggregates, this works OK, as there are only a limited number of keys and thus accumulators.
But for medium/high cardinality aggregate, the current way is inefficient:

  • Higher memory usage, as each group by value has an extra Vec with a number of AccumulatorItem, which adds at least 2 (
    Box<dyn T>) + 3 (empty Vec) + 3 (initial ) = 8 pointers (64 bytes) of overhead per item when storing one aggregate per group (e.g. one of count/avg/sum).
  • Extra allocations while inserting new groups into the data structures and when returning states as Vec<ScalarValue>, and some cloning.
  • Less cache efficient (because memory is scattered around)
  • Requires more / expensive paths to convert into Vec and ScalarValues and back to an Arrays again.

This issue is for the Accumulator state only, but a similar thing could be done for the group_by_values with a similar

Describe the solution you'd like
We should define a trait that allows storing the required state in the accumulators in a contiguous/columnar manner.
The idea here is that the required state can be stored in a Vec-like container, where each item at the index contains the current state.

My proposal is to add an extra index to the methods, so the Accumulators can update the state at a certain index.
Some methods could be added to retrieve the entire state of the accumulator and/or convert the state values to array(s) in one go. If Arrow provides mutable Arrays in the future, it could avoid the extra conversion step back to an Array.

pub trait Accumulator: Send + Sync + Debug {
    /// Initializes the state for a new group with a `index`
    fn init_state(&self, index: usize);

    /// Returns the state of the accumulator at the end of the accumulation.
    // in the case of an average on which we track `sum` and `n`, this function should return a vector
    // of two values, sum and n.
    fn state(&self, index: usize) -> Result<Vec<ScalarValue>>;

    /// updates the accumulator's state from a vector of scalars.
    fn update(&mut self, index: usize, values: &[ScalarValue]) -> Result<()>;

    /// updates the accumulator's state from a vector of arrays.
    fn update_batch(&mut self, index: usize, values: &[ArrayRef]) -> Result<()> {
        if values.is_empty() {
            return Ok(());
        };
        (0..values[0].len()).try_for_each(|idx| {
            let v = values
                .iter()
                .map(|array| ScalarValue::try_from_array(array, idx))
                .collect::<Result<Vec<_>>>()?;
            self.update(index, &v)
        })
    }

    /// updates the accumulator's state from a vector of scalars.
    fn merge(&mut self, index: usize, states: &[ScalarValue]) -> Result<()>;

    /// updates the accumulator's state from a vector of states.
    fn merge_batch(&mut self, index: usize, states: &[ArrayRef]) -> Result<()> {
        if states.is_empty() {
            return Ok(());
        };
        (0..states[0].len()).try_for_each(|idx| {
            let v = states
                .iter()
                .map(|array| ScalarValue::try_from_array(array, index))
                .collect::<Result<Vec<_>>>()?;
            self.merge(index, &v)
        })
    }

    /// returns its value based on its current state.
    fn evaluate(&self, index: usize) -> Result<ScalarValue>;
}

For Count this would be changed, most notably the change to store the state as Vec<u64>:

 #[derive(Debug)]
 struct CountAccumulator {
-    count: u64,
+    count: Vec<u64>,
 }
 
 impl CountAccumulator {
     /// new count accumulator
     pub fn new() -> Self {
-        Self { count: 0 }
+        Self { count: vec![] }
     }
 }
 
 impl Accumulator for CountAccumulator {
-    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+    fn init_state(&self, index: usize) {
+        assert_eq!(self.count.len(), index);
+        self.count.push(0);
+    }
+    fn update_batch(&mut self, index: usize, values: &[ArrayRef]) -> Result<()> {
         let array = &values[0];
-        self.count += (array.len() - array.data().null_count()) as u64;
+        self.count[index] += (array.len() - array.data().null_count()) as u64;
         Ok(())
     }
 
-    fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
+    fn update(&mut self, index: usize, values: &[ScalarValue]) -> Result<()> {
         let value = &values[0];
         if !value.is_null() {
-            self.count += 1;
+            self.count[index] += 1;
         }
         Ok(())
     }
 
-    fn merge(&mut self, states: &[ScalarValue]) -> Result<()> {
+    fn merge(&mut self, index: usize, states: &[ScalarValue]) -> Result<()> {
         let count = &states[0];
         if let ScalarValue::UInt64(Some(delta)) = count {
-            self.count += *delta;
+            self.count[index] += *delta;
         } else {
             unreachable!()
         }
         Ok(())
     }
 
-    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+    fn merge_batch(&mut self, index: usize, states: &[ArrayRef]) -> Result<()> {
         let counts = states[0].as_any().downcast_ref::<UInt64Array>().unwrap();
         let delta = &compute::sum(counts);
         if let Some(d) = delta {
-            self.count += *d;
+            self.count[index] += *d;
         }
         Ok(())
     }
 
-    fn state(&self) -> Result<Vec<ScalarValue>> {
-        Ok(vec![ScalarValue::UInt64(Some(self.count))])
+    fn state(&self, index: usize) -> Result<Vec<ScalarValue>> {
+        Ok(vec![ScalarValue::UInt64(Some(self.count[index]))])
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
-        Ok(ScalarValue::UInt64(Some(self.count)))
+    fn evaluate(&self, index: usize) -> Result<ScalarValue> {
+        Ok(ScalarValue::UInt64(Some(self.count[index])))
     }
 }

And this would be changed in the datastructure of hash aggregates (moving the state out of group state, to Accumulators:

+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -558,9 +558,6 @@ struct GroupState {
     /// The actual group by values, one for each group column
     group_by_values: Box<[ScalarValue]>,
 
-    // Accumulator state, one for each aggregate
-    accumulator_set: Vec<AccumulatorItem>,
-
     /// scratch space used to collect indices for input rows in a
     /// bach that have values to aggregate. Reset on each batch
     indices: Vec<u32>,
@@ -578,6 +575,9 @@ struct Accumulators {
     /// values: (hash, index into `group_states`)
     map: RawTable<(u64, usize)>,
 
+    // Accumulator state, keeps state of each group state
+    accumulators: Vec<AccumulatorItem>,
+
     /// State for each group
     group_states: Vec<GroupState>,
 }

Describe alternatives you've considered

Additional context

@Dandandan Dandandan added enhancement New feature or request performance Make DataFusion faster labels Aug 30, 2021
@Dandandan
Copy link
Contributor Author

FYI @alamb this is a follow up proposal on improving the hash aggregate implementation.

@alamb
Copy link
Contributor

alamb commented Sep 14, 2021

I think the idea of making accumulators (effectively) columnar is a great idea, FWIW. Mutable arrow arrays would also be awesome if/when that is available

@ic4y
Copy link
Contributor

ic4y commented Dec 6, 2021

@Dandandan
I am currently working out ways to solve the performance problem of high cardinality aggregation. Follow your method and tested it. I found that there is a certain performance improvement, but not ideal enough, only improved by about 10% under high base aggregation (I think it needs several times performance improvement likes doris and trino's performance under the high cardinality aggregation #1246). Do you have any better advice or other optimizations for these codes.
The relevant code segments are as follows.
Accumulators:

struct Accumulators {
    /// Logically maps group values to an index in `group_states`
    ///
    /// Uses the raw API of hashbrown to avoid actually storing the
    /// keys in the table
    ///
    /// keys: u64 hashes of the GroupValue
    /// values: (hash, index into `group_states`)
    map: RawTable<(u64, usize)>,

    // Accumulator state, keeps state of each group state
    accumulator_items: Vec<AccumulatorItem>,

    //group_states: Vec<GroupState>,
    group_by_values: Vec<Vec<ScalarValue>>,

    group_indices : Vec<Vec<u32>>,
}

group_aggregate_batch:

fn group_aggregate_batch(
    mode: &AggregateMode,
    random_state: &RandomState,
    group_expr: &[Arc<dyn PhysicalExpr>],
    aggr_expr: &[Arc<dyn AggregateExpr>],
    batch: RecordBatch,
    mut accumulators: Accumulators,
    aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>],
) -> Result<Accumulators> {
    // evaluate the grouping expressions

    let group_values = evaluate(group_expr, &batch)?;

    // evaluate the aggregation expressions.
    // We could evaluate them after the `take`, but since we need to evaluate all
    // of them anyways, it is more performant to do it while they are together.

    let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?;

    // 1.1 construct the key from the group values
    // 1.2 construct the mapping key if it does not exist
    // 1.3 add the row' index to `indices`

    // track which entries in `accumulators` have rows in this batch to aggregate
    let mut groups_with_rows = vec![];
    // 1.1 Calculate the group keys for the group values
    let mut batch_hashes = vec![0; batch.num_rows()];

    create_hashes(&group_values, random_state, &mut batch_hashes)?;

    for (row, hash) in batch_hashes.into_iter().enumerate() {
        let Accumulators { map, accumulator_items,group_by_values, group_indices } = &mut accumulators;

        let entry = map.get_mut(hash, |(_hash, group_idx)| {
            let group_state_c = &group_by_values[*group_idx];
            group_values
                .iter()
                .zip(group_state_c.iter())
                .all(|(array, scalar)| scalar.eq_array(array, row))
        });

        match entry {
            // Existing entry for this group value
            Some((_hash, group_idx)) => {
                let indices = &mut group_indices[*group_idx];
                // 1.3
                if indices.is_empty() {
                    groups_with_rows.push(*group_idx);
                };
                indices.push(row as u32); // remember this row
            }
            //  1.2 Need to create new entry
            None => {
                // Copy group values out of arrays into `ScalarValue`s
                let col_group_by_values = group_values
                    .iter()
                    .map(|col| ScalarValue::try_from_array(col, row))
                    .collect::<Result<Vec<_>>>()?;

                let group_idx = group_by_values.len();
                group_by_values.push(col_group_by_values);

                //TODO 这个地方需要给每个agg的状态初始化
                accumulator_items[0].init_state(group_idx);
                groups_with_rows.push(group_idx);
                group_indices.push(vec![row as u32]);

                // for hasher function, use precomputed hash value
                map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash);

            }
        };
    }

    // Collect all indices + offsets based on keys in this vec
    let mut batch_indices_cc: UInt32Builder = UInt32Builder::new(0);
    let mut offsets = vec![0];
    let mut offset_so_far = 0;
    for group_idx in groups_with_rows.iter() {
        let indices = &accumulators.group_indices[*group_idx];
        batch_indices_cc.append_slice(indices)?;
        offset_so_far += indices.len();
        offsets.push(offset_so_far);
    }
    let batch_indices = batch_indices_cc.finish();
    // `Take` all values based on indices into Arrays
    let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values
        .iter()
        .map(|array| {
            array
                .iter()
                .map(|array| {
                    compute::take(
                        array.as_ref(),
                        &batch_indices,
                        None, // None: no index check
                    )
                        .unwrap()
                })
                .collect()
            // 2.3
        })
        .collect();

    // 2.1 for each key in this batch
    // 2.2 for each aggregation
    // 2.3 `slice` from each of its arrays the keys' values
    // 2.4 update / merge the accumulator with the values
    // 2.5 clear indices
    groups_with_rows.iter()
        .zip(offsets.windows(2))
        .try_for_each(|(group_idx, offsets)| {
            accumulators.group_indices[*group_idx].clear();
            accumulators.accumulator_items.iter_mut()
                .zip(values.iter())
                .try_for_each(|(accumulator, aggr_array)| {
                    let values = aggr_array
                        .iter()
                        .map(|array| {
                            array.slice(offsets[0], offsets[1] - offsets[0])
                        })
                        .collect::<Vec<ArrayRef>>();
                    match mode {
                        AggregateMode::Partial => accumulator.update_batch(*group_idx, &values),
                        AggregateMode::FinalPartitioned | AggregateMode::Final => {
                            accumulator.merge_batch(*group_idx, &values)
                        }
                    }
            })
        });
    Ok(accumulators)
}

Count:

pub struct CountAccumulatorFly {
    count: Vec<u64>,
}

impl CountAccumulatorFly {
    /// new count accumulator
    pub fn new() -> Self {
        Self { count: vec![] }
    }
}

impl Accumulator for CountAccumulatorFly {
    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
        let array = &values[0];
        self.count[0] += (array.len() - array.data().null_count()) as u64;
        Ok(())
    }

    fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
        let value = &values[0];
        if !value.is_null() {
            self.count[0] += 1;
        }
        Ok(())
    }

    fn merge(&mut self, states: &[ScalarValue]) -> Result<()> {
        let count = &states[0];
        if let ScalarValue::UInt64(Some(delta)) = count {
            self.count[0] += *delta;
        } else {
            unreachable!()
        }
        Ok(())
    }

    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
        let counts = states[0].as_any().downcast_ref::<UInt64Array>().unwrap();
        let delta = &compute::sum(counts);
        if let Some(d) = delta {
            self.count[0] += *d;
        }
        Ok(())
    }

    fn state(&self) -> Result<Vec<ScalarValue>> {
        Ok(vec![ScalarValue::UInt64(Some(self.count[0]))])
    }

    fn evaluate(&self) -> Result<ScalarValue> {
        Ok(ScalarValue::UInt64(Some(self.count[0])))
    }
}


impl AccumulatorFly for CountAccumulatorFly {
    fn init_state(&mut self, index: usize) {
        assert_eq!(self.count.len(), index);
        self.count.push(0);
    }
    fn update_batch(&mut self, index: usize, values: &[ArrayRef]) -> Result<()> {
        let array = &values[0];
        self.count[index] += (array.len() - array.data().null_count()) as u64;
        Ok(())
    }

    fn update(&mut self, index: usize, values: &[ScalarValue]) -> Result<()> {
        let value = &values[0];
        if !value.is_null() {
            self.count[index] += 1;
        }
        Ok(())
    }

    fn merge(&mut self, index: usize, states: &[ScalarValue]) -> Result<()> {
        let count = &states[0];
        if let ScalarValue::UInt64(Some(delta)) = count {
            self.count[index] += *delta;
        } else {
            unreachable!()
        }
        Ok(())
    }

    fn merge_batch(&mut self, index: usize, states: &[ArrayRef]) -> Result<()> {
        let counts = states[0].as_any().downcast_ref::<UInt64Array>().unwrap();
        let delta = &compute::sum(counts);
        if let Some(d) = delta {
            self.count[index] += *d;
        }
        Ok(())
    }

    fn state(&self, index: usize) -> Result<Vec<ScalarValue>> {
        Ok(vec![ScalarValue::UInt64(Some(self.count[index]))])
    }

    fn evaluate(&self, index: usize) -> Result<ScalarValue> {
        Ok(ScalarValue::UInt64(Some(self.count[index])))
    }

    fn evaluate_all(&self) -> Result<ArrayRef> {
        let result = ScalarValue::iter_to_array(
            self.count.iter().map(|x| {
                ScalarValue::UInt64(Some(*x))
            }),
        );
        result
    }

    fn state_all(&self) -> Result<Vec<Vec<ScalarValue>>> {
        let dt = Local::now();
        let result = Ok(vec![self.count.iter().map(|x| {
            ScalarValue::UInt64(Some(*x))
        }).collect()]);
        println!(
            "state_all usage millis: {}",
            Local::now().timestamp_millis() - dt.timestamp_millis()
        );
        result
    }
}

more code:
https://github.com/ic4y/arrow-datafusion/blob/9d26b797f2e1565f7f65048ef1fc2ad2940d8f95/datafusion/src/physical_plan/hash_aggregate_fly.rs#L578-L596

@alamb
Copy link
Contributor

alamb commented Dec 6, 2021

@ic4y

I am currently working out ways to solve the performance problem of high cardinality aggregation. Follow your method and tested it. I found that there is a certain performance improvement, but not ideal enough, only improved by about 10% under high base aggregation (I think it needs several times performance improvement likes doris and trino's performance under the high cardinality aggregation #1246).

Did you do any performance profiling (using pprof for example) to know where the time is being spent in your query? Is it the aggregate updates? Creating the final array? Something else?

There isn't a lot of "low hanging fruit" left in the GroupByHash implementation -- to get "several times improvements" in performance I think we are likely to have to start special casing (e.g. for single column group by vs multi column group by, as well as vectorized aggregators for certain column types)

@ic4y
Copy link
Contributor

ic4y commented Dec 7, 2021

@alamb In the count sql, I use pprof to count the time consumed by the following methods. The long time consumption is mainly in make slice, hashbrown get_mut, create_hashes. The time consumed by creating the final array and aggregate updates method is relatively small.

func time
arrow::array::array::Array::slice 26% of all
hashbrown::raw::inner::RawTable<T,A>::get_mut 22% of all
datafusion::physical_plan::hash_utils::create_hashes 6.5% of all
datafusion::physical_plan::AccumulatorFly>::update_batch 2.5% of all
datafusion::physical_plan::hash_aggregate_fly::create_batch_from_map <0.1% of all

Corresponding to the following code snippets
1、array slice and update_batch

    groups_with_rows.iter()
        .zip(offsets.windows(2))
        .try_for_each(|(group_idx, offsets)| {
            accumulators.group_indices[*group_idx].clear();
            accumulators.accumulator_items.iter_mut()
                .zip(values.iter())
                .try_for_each(|(accumulator, aggr_array)| {
                    let values = aggr_array
                        .iter()
                        .map(|array| {
                            array.slice(offsets[0], offsets[1] - offsets[0])
                        })
                        .collect::<Vec<ArrayRef>>();
                    match mode {
                        AggregateMode::Partial => accumulator.update_batch(*group_idx, &values),
                        AggregateMode::FinalPartitioned | AggregateMode::Final => {
                            accumulator.merge_batch(*group_idx, &values)
                        }
                    }
            })
        });

2、hashbrown get_mut

        let entry = map.get_mut(hash, |(_hash, group_idx)| {
            let group_state_c = &group_by_values[*group_idx];
            group_values
                .iter()
                .zip(group_state_c.iter())
                .all(|(array, scalar)| scalar.eq_array(array, row)) // eq_array takes a long time
        });

3、create_hashes

create_hashes(&group_values, random_state, &mut batch_hashes)?;

4、create_batch_from_map

let batch = create_batch_from_map(&mode, &accumulators, group_expr.len(), &schema);

@Dandandan
Copy link
Contributor Author

Thanks for picking this up @ic4y

If you are getting a 10% improvement, this is already a fine achievement (if we don't slow down the low cardinality too much?)

The storage is one part of the story, as @alamb says it will require some changes in other places.

Some ideas here:

  • Do not slice in the implementation, but rather append to aggregates directy (enabled by the change listed here)
  • More efficient (and vectorized?) implementation of eq_array based on typed arrays.
  • Avoid converting values into ScalarValue but use values and arrays directy.

@ic4y
Copy link
Contributor

ic4y commented Dec 14, 2021

@Dandandan Thank you very much for your suggestions

  1. Canceling the make slice solution can indeed bring a certain performance improvement, but I tested on arrow2 and found that the performance of array.slice has been greatly improved. So slice performance is not a problem after using arrow2.
  2. For eq_array I tried hashbrown's get_each_mut method, but the performance did not change (through the analysis of PPROF, I found that eq_array still takes a lot of time). The code show as follows
      create_hashes(&group_values, random_state, &mut batch_hashes)?;
    let mut absent_rows = vec![];
    const CHUNK_SIZE: usize = 16;
    let (chunks, remainder) = batch_hashes.as_chunks::<CHUNK_SIZE>();
    for (num, chunk) in chunks.into_iter().enumerate() {
        let Accumulators { map, group_states } = &mut accumulators;
        let result = map.get_each_mut(*chunk, |index, (hash, group_idx)| {
            if(batch_hashes[num*CHUNK_SIZE + index] != *hash){
                false
            }else {
                let group_state = &group_states[*group_idx];
                group_values
                    .iter()
                    .zip(group_state.group_by_values.iter())
                    .all(|(array, scalar)| scalar.eq_array(array, index + num * CHUNK_SIZE))
            }
        });

        for (i, re) in result.iter().enumerate() {
            let row = num*CHUNK_SIZE+i;
            match re {
                Ok((hash, group_idx)) => {
                    let group_state = &mut group_states[*group_idx];
                    // 1.3
                    if group_state.indices.is_empty() {
                        groups_with_rows.push(*group_idx);
                    };
                    group_state.indices.push(row as u32); // remember this row
                }
                Err(unavailable) => { match unavailable {
                    UnavailableMutError::Absent => {
                        absent_rows.push(row);
                    }
                    UnavailableMutError::Duplicate(index) => {
                        let (hash, group_idx) = &result.get(*index).unwrap().as_ref().unwrap();
                        let group_state = &mut group_states[*group_idx];
                        // 1.3
                        if group_state.indices.is_empty() {
                            groups_with_rows.push(*group_idx);
                        };
                        group_state.indices.push(row as u32); // remember this row
                    }
                } }
            }
        }
    }

    //absent rows
    for (i, row) in absent_rows.into_iter().enumerate() {
        let hash= batch_hashes[row];
        let Accumulators { map, group_states } = &mut accumulators;
        let entry = map.get_mut(hash, |(_hash, group_idx)| {
            let group_state = &group_states[*group_idx];
            if (*_hash != hash){
                false
            }else{
            group_values
                .iter()
                .zip(group_state.group_by_values.iter())
                .all(|(array, scalar)| scalar.eq_array(array, row))
            }
            //*_hash == hash
        });

        match entry {
            // Existing entry for this group value
            Some((_hash, group_idx)) => {
                let group_state = &mut group_states[*group_idx];
                // 1.3
                if group_state.indices.is_empty() {
                    groups_with_rows.push(*group_idx);
                };
                group_state.indices.push(row as u32); // remember this row
            }
            //  1.2 Need to create new entry
            None => {
                let accumulator_set = create_accumulators(aggr_expr)
                    .map_err(DataFusionError::into_arrow_external_error)?;

                // Copy group values out of arrays into `ScalarValue`s
                let group_by_values = group_values
                    .iter()
                    .map(|col| ScalarValue::try_from_array(col, row))
                    .collect::<Result<Vec<_>>>()?;

                // Add new entry to group_states and save newly created index
                let group_state = GroupState {
                    group_by_values: group_by_values.into_boxed_slice(),
                    accumulator_set,
                    indices: vec![row as u32], // 1.3
                };
                let group_idx = group_states.len();
                group_states.push(group_state);
                groups_with_rows.push(group_idx);

                // for hasher function, use precomputed hash value
                map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash);
            }
        };
    }

    //remainder rows
    for (i, &hash) in remainder.into_iter().enumerate() {
        let row = i+CHUNK_SIZE*chunks.len();
        let Accumulators { map, group_states } = &mut accumulators;

        let entry = map.get_mut(hash, |(_hash, group_idx)| {
            let group_state = &group_states[*group_idx];
            group_values
                .iter()
                .zip(group_state.group_by_values.iter())
                .all(|(array, scalar)| scalar.eq_array(array, row))
        });

        match entry {
            // Existing entry for this group value
            Some((_hash, group_idx)) => {
                let group_state = &mut group_states[*group_idx];
                // 1.3
                if group_state.indices.is_empty() {
                    groups_with_rows.push(*group_idx);
                };
                group_state.indices.push(row as u32); // remember this row
            }
            //  1.2 Need to create new entry
            None => {
                let accumulator_set = create_accumulators(aggr_expr)
                    .map_err(DataFusionError::into_arrow_external_error)?;

                // Copy group values out of arrays into `ScalarValue`s
                let group_by_values = group_values
                    .iter()
                    .map(|col| ScalarValue::try_from_array(col, row))
                    .collect::<Result<Vec<_>>>()?;

                // Add new entry to group_states and save newly created index
                let group_state = GroupState {
                    group_by_values: group_by_values.into_boxed_slice(),
                    accumulator_set,
                    indices: vec![row as u32], // 1.3
                };
                let group_idx = group_states.len();
                group_states.push(group_state);
                groups_with_rows.push(group_idx);

                // for hasher function, use precomputed hash value
                map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash);
            }
        };
    }
  1. I separated the get_mut and eq_array methods by useing hashbrown's iter_hash to process them in two loops for making the eq_array vectorized execution (but there is no implement vectorized execution yet. I am not very familiar with rust's vectorized execution). the following code does separated most of get_mut and eq_array, but how can we vectorize execution an eq_array in a loop?
    create_hashes(&group_values, random_state, &mut batch_hashes)?;

    let map = &mut accumulators.map;
    let mut allid = Vec::with_capacity(batch_hashes.len());
    unsafe {
        for (row, &hash) in batch_hashes.iter().enumerate() {
            let hash1 = map.iter_hash(hash);
            allid.push(hash1.map(|bucket|{
                let (_hash, group_idx) = bucket.as_ref();
                *group_idx
            }).collect::<Vec<_>>())
        }
    }

    let mut gids = Vec::with_capacity(batch_hashes.len());
    for (row, iterHash) in allid.iter().enumerate(){
        let group_states = &mut accumulators.group_states;
        let mut gid = None;
        iterHash.into_iter().for_each(|group_idx| {
                let group_state = &group_states[*group_idx];
                if group_values
                    .iter()
                    .zip(group_state.group_by_values.iter())
                    .all(|(array, scalar)| scalar.eq_array(array, row))
                {
                    gid = Some(*group_idx);
                }
            });
        gids.push(gid);
    }


    for (row, op_group_idx) in gids.into_iter().enumerate() {
        let Accumulators { map, group_states } = &mut accumulators;
        let hash = batch_hashes[row];
        match op_group_idx {
            None => {
                let entry = map.get_mut(hash, |(_hash, group_idx)| {
                    let group_state = &group_states[*group_idx];
                    group_values
                        .iter()
                        .zip(group_state.group_by_values.iter())
                        .all(|(array, scalar)| scalar.eq_array(array, row))
                });

                match entry {
                    // Existing entry for this group value
                    Some((_hash, group_idx)) => {
                        let group_state = &mut group_states[*group_idx];
                        // 1.3
                        if group_state.indices.is_empty() {
                            groups_with_rows.push(*group_idx);
                        };
                        group_state.indices.push(row as u32); // remember this row
                    }
                    //  1.2 Need to create new entry
                    None => {
                        let accumulator_set = create_accumulators(aggr_expr)
                            .map_err(DataFusionError::into_arrow_external_error)?;

                        // Copy group values out of arrays into `ScalarValue`s
                        let group_by_values = group_values
                            .iter()
                            .map(|col| ScalarValue::try_from_array(col, row))
                            .collect::<Result<Vec<_>>>()?;

                        // Add new entry to group_states and save newly created index
                        let group_state = GroupState {
                            group_by_values: group_by_values.into_boxed_slice(),
                            accumulator_set,
                            indices: vec![row as u32], // 1.3
                        };
                        let group_idx = group_states.len();
                        group_states.push(group_state);
                        groups_with_rows.push(group_idx);

                        // for hasher function, use precomputed hash value
                        map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash);
                    }
                };

            }
            Some(group_idx) => {
                let group_state = &mut group_states[group_idx];
                // 1.3
                if group_state.indices.is_empty() {
                    groups_with_rows.push(group_idx);
                };
                group_state.indices.push(row as u32); // remember this row
            }
        }
    }

I really want to solve the performance problem of high cardinality aggregation, which has troubled me for a long time. Is there any problem with the above code? How can we further optimize? thanks

@alamb
Copy link
Contributor

alamb commented Jun 26, 2023

This proposal is very similar in spirit to what @tustvold and I are proposing in #4973 (comment)

@alamb
Copy link
Contributor

alamb commented Jun 30, 2023

(BTW there is lots of excitement on #4973 for anyone who wants to follow along)

@alamb alamb mentioned this issue Jul 12, 2023
16 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance Make DataFusion faster
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants