Skip to content

Commit

Permalink
UDAF refactor: Add PhysicalExpr trait dependency on datafusion-expr
Browse files Browse the repository at this point in the history
… and remove logical expressions requirement for creating physical aggregate expression (#11845)

* init draft

Signed-off-by: jayzhan211 <[email protected]>

* production ready

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* fix merge conflict

Signed-off-by: jayzhan211 <[email protected]>

* mv accumulator out

Signed-off-by: jayzhan211 <[email protected]>

* fix doc

Signed-off-by: jayzhan211 <[email protected]>

* rename

Signed-off-by: jayzhan211 <[email protected]>

* fix test

Signed-off-by: jayzhan211 <[email protected]>

* fix test

Signed-off-by: jayzhan211 <[email protected]>

* doc

Signed-off-by: jayzhan211 <[email protected]>

* fix doc and cleanup

Signed-off-by: jayzhan211 <[email protected]>

* fix doc

Signed-off-by: jayzhan211 <[email protected]>

* clippy + doc

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* rename exprs

Signed-off-by: jayzhan211 <[email protected]>

* rm create_aggregate_expr_with_dfschema

Signed-off-by: jayzhan211 <[email protected]>

* revert change in calc_requirements

Signed-off-by: jayzhan211 <[email protected]>

* fmt

Signed-off-by: jayzhan211 <[email protected]>

* doc and cleanup

Signed-off-by: jayzhan211 <[email protected]>

* rm dfschema

Signed-off-by: jayzhan211 <[email protected]>

* rm input types

Signed-off-by: jayzhan211 <[email protected]>

* rename return_type

Signed-off-by: jayzhan211 <[email protected]>

* upd doc

Signed-off-by: jayzhan211 <[email protected]>

* move group accumulator adapter to functions-aggregate-common

Signed-off-by: jayzhan211 <[email protected]>

* fix

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 authored Aug 9, 2024
1 parent b5d7931 commit e088945
Show file tree
Hide file tree
Showing 100 changed files with 1,776 additions and 1,846 deletions.
10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ members = [
"datafusion/catalog",
"datafusion/core",
"datafusion/expr",
"datafusion/expr-common",
"datafusion/execution",
"datafusion/functions-aggregate",
"datafusion/functions",
"datafusion/functions-aggregate",
"datafusion/functions-aggregate-common",
"datafusion/functions-nested",
"datafusion/optimizer",
"datafusion/physical-expr-common",
"datafusion/physical-expr",
"datafusion/physical-expr-common",
"datafusion/physical-expr-functions-aggregate",
"datafusion/physical-optimizer",
"datafusion/physical-plan",
"datafusion/proto",
Expand Down Expand Up @@ -94,12 +97,15 @@ datafusion-common = { path = "datafusion/common", version = "41.0.0", default-fe
datafusion-common-runtime = { path = "datafusion/common-runtime", version = "41.0.0" }
datafusion-execution = { path = "datafusion/execution", version = "41.0.0" }
datafusion-expr = { path = "datafusion/expr", version = "41.0.0" }
datafusion-expr-common = { path = "datafusion/expr-common", version = "41.0.0" }
datafusion-functions = { path = "datafusion/functions", version = "41.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "41.0.0" }
datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "41.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "41.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "41.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "41.0.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "41.0.0", default-features = false }
datafusion-physical-expr-functions-aggregate = { path = "datafusion/physical-expr-functions-aggregate", version = "41.0.0" }
datafusion-physical-optimizer = { path = "datafusion/physical-optimizer", version = "41.0.0" }
datafusion-physical-plan = { path = "datafusion/physical-plan", version = "41.0.0" }
datafusion-proto = { path = "datafusion/proto", version = "41.0.0" }
Expand Down
47 changes: 46 additions & 1 deletion datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ datafusion-functions-nested = { workspace = true, optional = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-expr-functions-aggregate = { workspace = true }
datafusion-physical-optimizer = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-sql = { workspace = true }
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,11 @@ pub mod physical_expr_common {
pub use datafusion_physical_expr_common::*;
}

/// re-export of [`datafusion_physical_expr_functions_aggregate`] crate
pub mod physical_expr_functions_aggregate {
pub use datafusion_physical_expr_functions_aggregate::*;
}

/// re-export of [`datafusion_physical_expr`] crate
pub mod physical_expr {
pub use datafusion_physical_expr::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ mod tests {
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr_common::aggregate::AggregateExprBuilder;
use datafusion_physical_expr_functions_aggregate::aggregate::AggregateExprBuilder;

/// Runs the CombinePartialFinalAggregate optimizer and asserts the plan against the expected
macro_rules! assert_optimized {
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,8 @@ mod tests {
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::BinaryExpr;
use datafusion_physical_expr::expressions::{col, lit};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::expressions::column::col;
use datafusion_physical_expr_common::expressions::lit;
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::empty::EmptyExec;
Expand Down
39 changes: 24 additions & 15 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use crate::physical_plan::unnest::UnnestExec;
use crate::physical_plan::values::ValuesExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{
displayable, udaf, windows, AggregateExpr, ExecutionPlan, ExecutionPlanProperties,
displayable, windows, AggregateExpr, ExecutionPlan, ExecutionPlanProperties,
InputOrderMode, Partitioning, PhysicalExpr, WindowExpr,
};

Expand All @@ -73,7 +73,8 @@ use datafusion_common::{
};
use datafusion_expr::dml::CopyTo;
use datafusion_expr::expr::{
self, physical_name, AggregateFunction, Alias, GroupingSet, WindowFunction,
self, create_function_physical_name, physical_name, AggregateFunction, Alias,
GroupingSet, WindowFunction,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
Expand All @@ -83,6 +84,7 @@ use datafusion_expr::{
};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_expr_functions_aggregate::aggregate::AggregateExprBuilder;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_sql::utils::window_expr_common_partition_keys;

Expand Down Expand Up @@ -1559,6 +1561,17 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
order_by,
null_treatment,
}) => {
let name = if let Some(name) = name {
name
} else {
create_function_physical_name(
func.name(),
*distinct,
args,
order_by.as_ref(),
)?
};

let physical_args =
create_physical_exprs(args, logical_input_schema, execution_props)?;
let filter = match filter {
Expand All @@ -1575,7 +1588,6 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
== NullTreatment::IgnoreNulls;

let (agg_expr, filter, order_by) = {
let sort_exprs = order_by.clone().unwrap_or(vec![]);
let physical_sort_exprs = match order_by {
Some(exprs) => Some(create_physical_sort_exprs(
exprs,
Expand All @@ -1588,18 +1600,15 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
let ordering_reqs: Vec<PhysicalSortExpr> =
physical_sort_exprs.clone().unwrap_or(vec![]);

let agg_expr = udaf::create_aggregate_expr_with_dfschema(
func,
&physical_args,
args,
&sort_exprs,
&ordering_reqs,
logical_input_schema,
name,
ignore_nulls,
*distinct,
false,
)?;
let schema: Schema = logical_input_schema.clone().into();
let agg_expr =
AggregateExprBuilder::new(func.to_owned(), physical_args.to_vec())
.order_by(ordering_reqs.to_vec())
.schema(Arc::new(schema))
.alias(name)
.with_ignore_nulls(ignore_nulls)
.with_distinct(*distinct)
.build()?;

(agg_expr, filter, physical_sort_exprs)
};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use datafusion_physical_expr::{

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_physical_expr_common::aggregate::AggregateExprBuilder;
use datafusion_physical_expr_functions_aggregate::aggregate::AggregateExprBuilder;
use futures::Stream;
use tempfile::TempDir;
// backwards compatibility
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use arrow::util::pretty::pretty_format_batches;
use arrow_array::types::Int64Type;
use datafusion::common::Result;
use datafusion::datasource::MemTable;
use datafusion::physical_expr_functions_aggregate::aggregate::AggregateExprBuilder;
use datafusion::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
Expand All @@ -35,7 +36,6 @@ use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr_common::aggregate::AggregateExprBuilder;
use datafusion_physical_plan::InputOrderMode;
use test_utils::{add_empty_batches, StringBatchGenerator};

Expand Down
43 changes: 43 additions & 0 deletions datafusion/expr-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "datafusion-expr-common"
description = "Logical plan and expression representation for DataFusion query engine"
keywords = ["datafusion", "logical", "plan", "expressions"]
readme = "README.md"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
license = { workspace = true }
authors = { workspace = true }
rust-version = { workspace = true }

[lints]
workspace = true

[lib]
name = "datafusion_expr_common"
path = "src/lib.rs"

[features]

[dependencies]
arrow = { workspace = true }
datafusion-common = { workspace = true }
paste = "^1.0"
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Vectorized [`GroupsAccumulator`]
use arrow_array::{ArrayRef, BooleanArray};
use arrow::array::{ArrayRef, BooleanArray};
use datafusion_common::{not_impl_err, Result};

/// Describes how many rows should be emitted during grouping.
Expand Down Expand Up @@ -75,7 +75,7 @@ impl EmitTo {
/// expected that each `GroupAccumulator` will use something like `Vec<..>`
/// to store the group states.
///
/// [`Accumulator`]: crate::Accumulator
/// [`Accumulator`]: crate::accumulator::Accumulator
/// [Aggregating Millions of Groups Fast blog]: https://arrow.apache.org/blog/2023/08/05/datafusion_fast_grouping/
pub trait GroupsAccumulator: Send {
/// Updates the accumulator's state from its arguments, encoded as
Expand Down Expand Up @@ -140,7 +140,7 @@ pub trait GroupsAccumulator: Send {
/// See [`Self::evaluate`] for details on the required output
/// order and `emit_to`.
///
/// [`Accumulator::state`]: crate::Accumulator::state
/// [`Accumulator::state`]: crate::accumulator::Accumulator::state
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;

/// Merges intermediate state (the output from [`Self::state`])
Expand Down Expand Up @@ -197,7 +197,7 @@ pub trait GroupsAccumulator: Send {
/// state directly to the next aggregation phase with minimal processing
/// using this method.
///
/// [`Accumulator::state`]: crate::Accumulator::state
/// [`Accumulator::state`]: crate::accumulator::Accumulator::state
fn convert_to_state(
&self,
_values: &[ArrayRef],
Expand Down
Loading

0 comments on commit e088945

Please sign in to comment.