Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optimize CASE expression for "column or null" use case #11534

Merged
merged 5 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added datafusion/core/example.parquet
Binary file not shown.
41 changes: 37 additions & 4 deletions datafusion/physical-expr/benches/case_when.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,29 @@ fn criterion_benchmark(c: &mut Criterion) {
// create input data
let mut c1 = Int32Builder::new();
let mut c2 = StringBuilder::new();
let mut c3 = StringBuilder::new();
for i in 0..1000 {
c1.append_value(i);
if i % 7 == 0 {
c2.append_null();
} else {
c2.append_value(&format!("string {i}"));
}
if i % 9 == 0 {
c3.append_null();
} else {
c3.append_value(&format!("other string {i}"));
}
}
let c1 = Arc::new(c1.finish());
let c2 = Arc::new(c2.finish());
let c3 = Arc::new(c3.finish());
let schema = Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Utf8, true),
Field::new("c3", DataType::Utf8, true),
]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![c1, c2]).unwrap();
let batch = RecordBatch::try_new(Arc::new(schema), vec![c1, c2, c3]).unwrap();

// use same predicate for all benchmarks
let predicate = Arc::new(BinaryExpr::new(
Expand All @@ -63,7 +71,7 @@ fn criterion_benchmark(c: &mut Criterion) {
make_lit_i32(500),
));

// CASE WHEN expr THEN 1 ELSE 0 END
// CASE WHEN c1 <= 500 THEN 1 ELSE 0 END
c.bench_function("case_when: scalar or scalar", |b| {
let expr = Arc::new(
CaseExpr::try_new(
Expand All @@ -76,13 +84,38 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});

// CASE WHEN expr THEN col ELSE null END
// CASE WHEN c1 <= 500 THEN c2 [ELSE NULL] END
c.bench_function("case_when: column or null", |b| {
let expr = Arc::new(
CaseExpr::try_new(None, vec![(predicate.clone(), make_col("c2", 1))], None)
.unwrap(),
);
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});

// CASE WHEN c1 <= 500 THEN c2 ELSE c3 END
c.bench_function("case_when: expr or expr", |b| {
let expr = Arc::new(
CaseExpr::try_new(
None,
vec![(predicate.clone(), make_col("c2", 1))],
Some(Arc::new(Literal::new(ScalarValue::Utf8(None)))),
Some(make_col("c3", 2)),
)
.unwrap(),
);
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});

// CASE c1 WHEN 1 THEN c2 WHEN 2 THEN c3 END
c.bench_function("case_when: CASE expr", |b| {
let expr = Arc::new(
CaseExpr::try_new(
Some(make_col("c1", 0)),
vec![
(make_lit_i32(1), make_col("c2", 1)),
(make_lit_i32(2), make_col("c3", 2)),
],
None,
)
.unwrap(),
);
Expand Down
136 changes: 127 additions & 9 deletions datafusion/physical-expr/src/expressions/case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,29 @@ use datafusion_common::cast::as_boolean_array;
use datafusion_common::{exec_err, internal_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::ColumnarValue;

use datafusion_physical_expr_common::expressions::column::Column;
use itertools::Itertools;

type WhenThen = (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>);

#[derive(Debug, Hash)]
enum EvalMethod {
/// CASE WHEN condition THEN result
/// [WHEN ...]
/// [ELSE result]
/// END
CaseNoExpression,
/// CASE expression
/// WHEN value THEN result
/// [WHEN ...]
/// [ELSE result]
/// END
CaseWithExpression,
/// This is a specialization for a specific use case where we can take a fast path
/// CASE WHEN condition THEN column [ELSE NULL] END
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I'm wondering why this special format of case when is not optimized to if/else during query optimization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think DataFusion has an if/else expression. We have one in Comet that could be upstreamed.

CaseColumnOrNull,
}

/// The CASE expression is similar to a series of nested if/else and there are two forms that
/// can be used. The first form consists of a series of boolean "when" expressions with
/// corresponding "then" expressions, and an optional "else" expression.
Expand All @@ -61,6 +80,8 @@ pub struct CaseExpr {
when_then_expr: Vec<WhenThen>,
/// Optional "else" expression
else_expr: Option<Arc<dyn PhysicalExpr>>,
/// Evaluation method to use
eval_method: EvalMethod,
}

impl std::fmt::Display for CaseExpr {
Expand Down Expand Up @@ -89,10 +110,22 @@ impl CaseExpr {
if when_then_expr.is_empty() {
exec_err!("There must be at least one WHEN clause")
} else {
let eval_method = if expr.is_some() {
EvalMethod::CaseWithExpression
} else if when_then_expr.len() == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this optimization and I think it would be valid for any CASE expression that had a NULL ELSE, not just column

So in other words, I think you could remove the when_then_expr[0].1.as_any().is::<Column>() check and this would still work fine

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, it is only safe to use this approach for expressions that are infallible. One of the main reasons that regular CaseExpr is expensive is that we need to only evaluate the "true" expression on rows where the predicate has evaluated to true.

I do think that this could be extended beyond just Column expressions though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add a comment in the code to explain this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added more comments to explain the rationale and what is safe or not

&& when_then_expr[0].1.as_any().is::<Column>()
&& else_expr.is_none()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If else value is a null literal, is else_expr still none? Or is it a null literal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From SQL planning, it will be None, but it makes sense to add code here to normalize this for other use cases (other query engines that delegate to DataFusion). I have added this.

{
EvalMethod::CaseColumnOrNull
} else {
EvalMethod::CaseNoExpression
};

Ok(Self {
expr,
when_then_expr,
else_expr,
eval_method,
})
}
}
Expand Down Expand Up @@ -256,6 +289,36 @@ impl CaseExpr {

Ok(ColumnarValue::Array(current_value))
}

/// This function evaluates the specialized case of:
///
/// CASE WHEN condition THEN column
/// [ELSE NULL]
/// END
fn case_column_or_null(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let when_expr = &self.when_then_expr[0].0;
let then_expr = &self.when_then_expr[0].1;
if let ColumnarValue::Array(bit_mask) = when_expr.evaluate(batch)? {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is evaluated to Scalar, maybe we still can work on it? I.e., we can convert the scalar value to an boolean array.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am planning on adding a specialization for scalar values as well to avoid converting scalars to arrays

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, nm, your question is different. Yes, we could implement special handling for CASE WHEN [true|false] THEN but I'm not sure that is a real-world use case that is worth optimizing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I forgot that in this optimization, the when expression could only be a Column.

let bit_mask = bit_mask
.as_any()
.downcast_ref::<BooleanArray>()
.expect("predicate should evaluate to a boolean array");
// invert the bitmask
let bit_mask = not(bit_mask)?;
Copy link
Contributor

@Dandandan Dandandan Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a further optimization I think the when predicate can be transformed to be not(condition) so it's possible to use bit_mask directly instead of not(bit_mask).
This makes it possible to optimize/simplify a condition like x=1 to x!=1 instead of not(x=1), saving the invert.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I will look at this suggestion as a follow on. Great idea.

match then_expr.evaluate(batch)? {
ColumnarValue::Array(array) => {
Ok(ColumnarValue::Array(nullif(&array, &bit_mask)?))
}
ColumnarValue::Scalar(_) => Err(DataFusionError::Execution(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be better as an internal error

You could also use the macros like

internal_err!("expression did not evaluate to an array")

"expression did not evaluate to an array".to_string(),
)),
}
} else {
Err(DataFusionError::Execution(
"predicate did not evaluate to an array".to_string(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @viirya I think it would be fairly straightforward here to handle ColumnarValue::Scalar s well. I don't think we need to do it in this PR

Given you say in https://github.com/apache/datafusion/pull/11534/files#r1683081792

I am planning on adding a specialization for scalar values as well to avoid converting scalars to arrays

I think we could definitely do it in a follow on PR

))
}
}
}

impl PhysicalExpr for CaseExpr {
Expand Down Expand Up @@ -303,14 +366,21 @@ impl PhysicalExpr for CaseExpr {
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
if self.expr.is_some() {
// this use case evaluates "expr" and then compares the values with the "when"
// values
self.case_when_with_expr(batch)
} else {
// The "when" conditions all evaluate to boolean in this use case and can be
// arbitrary expressions
self.case_when_no_expr(batch)
match self.eval_method {
EvalMethod::CaseWithExpression => {
// this use case evaluates "expr" and then compares the values with the "when"
// values
self.case_when_with_expr(batch)
}
EvalMethod::CaseNoExpression => {
// The "when" conditions all evaluate to boolean in this use case and can be
// arbitrary expressions
self.case_when_no_expr(batch)
}
EvalMethod::CaseColumnOrNull => {
// Specialization for CASE WHEN expr THEN column [ELSE NULL] END
self.case_column_or_null(batch)
}
}
}

Expand Down Expand Up @@ -409,7 +479,7 @@ pub fn case(
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::{binary, cast, col, lit};
use crate::expressions::{binary, cast, col, lit, BinaryExpr};

use arrow::buffer::Buffer;
use arrow::datatypes::DataType::Float64;
Expand All @@ -419,6 +489,7 @@ mod tests {
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_expr::type_coercion::binary::comparison_coercion;
use datafusion_expr::Operator;
use datafusion_physical_expr_common::expressions::Literal;

#[test]
fn case_with_expr() -> Result<()> {
Expand Down Expand Up @@ -998,6 +1069,53 @@ mod tests {
Ok(())
}

#[test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to make sure we had a slt level test to cover this as well,

Maybe in
https://github.com/apache/datafusion/blob/382bf4f3c7a730828684b9e4ce01369b89717e19/datafusion/sqllogictest/test_files/expr.slt

Or we could start adding a file just for CASE if we are about to spend a bunch of time optimizing it 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I willl add slt tests

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am planning on more CASE optimizations so will create a separate file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the slt test. This is my first time using slt and it is very cool

fn test_column_or_null_specialization() -> Result<()> {
// create input data
let mut c1 = Int32Builder::new();
let mut c2 = StringBuilder::new();
for i in 0..1000 {
c1.append_value(i);
if i % 7 == 0 {
c2.append_null();
} else {
c2.append_value(&format!("string {i}"));
}
}
let c1 = Arc::new(c1.finish());
let c2 = Arc::new(c2.finish());
let schema = Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Utf8, true),
]);
let batch = RecordBatch::try_new(Arc::new(schema), vec![c1, c2]).unwrap();

// CaseWhenExprOrNull should produce same results as CaseExpr
let predicate = Arc::new(BinaryExpr::new(
make_col("c1", 0),
Operator::LtEq,
make_lit_i32(250),
));
let expr = CaseExpr::try_new(None, vec![(predicate, make_col("c2", 1))], None)?;
assert!(matches!(expr.eval_method, EvalMethod::CaseColumnOrNull));
match expr.evaluate(&batch)? {
ColumnarValue::Array(array) => {
assert_eq!(1000, array.len());
assert_eq!(785, array.null_count());
}
_ => unreachable!(),
}
Ok(())
}

fn make_col(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
Arc::new(Column::new(name, index))
}

fn make_lit_i32(n: i32) -> Arc<dyn PhysicalExpr> {
Arc::new(Literal::new(ScalarValue::Int32(Some(n))))
}

fn generate_case_when_with_type_coercion(
expr: Option<Arc<dyn PhysicalExpr>>,
when_thens: Vec<WhenThen>,
Expand Down
Loading