Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize CASE expression for "expr or expr" usage. #13953

Merged
merged 3 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions datafusion/physical-expr/src/expressions/case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ enum EvalMethod {
/// are literal values
/// CASE WHEN condition THEN literal ELSE literal END
ScalarOrScalar,
/// This is a specialization for a specific use case where we can take a fast path
/// if there is just one when/then pair and both the `then` and `else` are expressions
///
/// CASE WHEN condition THEN expression ELSE expression END
ExpressionOrExpression,
}

/// The CASE expression is similar to a series of nested if/else and there are two forms that
Expand Down Expand Up @@ -149,6 +154,8 @@ impl CaseExpr {
&& else_expr.as_ref().unwrap().as_any().is::<Literal>()
{
EvalMethod::ScalarOrScalar
} else if when_then_expr.len() == 1 && else_expr.is_some() {
EvalMethod::ExpressionOrExpression
} else {
EvalMethod::NoExpression
};
Expand Down Expand Up @@ -394,6 +401,43 @@ impl CaseExpr {

Ok(ColumnarValue::Array(zip(&when_value, &then_value, &else_)?))
}

fn expr_or_expr(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let return_type = self.data_type(&batch.schema())?;

// evalute when condition on batch
let when_value = self.when_then_expr[0].0.evaluate(batch)?;
let when_value = when_value.into_array(batch.num_rows())?;
let when_value = as_boolean_array(&when_value).map_err(|e| {
DataFusionError::Context(
"WHEN expression did not return a BooleanArray".to_string(),
Box::new(e),
)
Comment on lines +412 to +415
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DataFusionError::Context(
"WHEN expression did not return a BooleanArray".to_string(),
Box::new(e),
)
internal_datafusion_err!("WHEN expression did not return a BooleanArray")

nit: We can assume all type checks have been done before, then inside this function all cast failures should be unreachable, so we can use internal error instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @2010YOUY01 for the feedback. I was not aware that all of the type-checking is guaranteed at this point.
One of my main motivations to have this here was to keep it consistent with the rest of the code in the file to minimize any deviation from the previous behavior. I can apply this change for the newly added code, what should happen to the rest of the code? Do you think it would make sense to add a new issue to clean-up the other functions to?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we can keep the code consistent now, and do clean-up later if possible

Copy link
Contributor

@alamb alamb Jan 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

})?;

// Treat 'NULL' as false value
let when_value = match when_value.null_count() {
0 => Cow::Borrowed(when_value),
_ => Cow::Owned(prep_null_mask_filter(when_value)),
};

let then_value = self.when_then_expr[0]
.1
.evaluate_selection(batch, &when_value)?
.into_array(batch.num_rows())?;

// evaluate else expression on the values not covered by when_value
let remainder = not(&when_value)?;
let e = self.else_expr.as_ref().unwrap();
// keep `else_expr`'s data type and return type consistent
let expr = try_cast(Arc::clone(e), &batch.schema(), return_type.clone())
.unwrap_or_else(|_| Arc::clone(e));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is similar, we can return an internal error directly, and avoid propagating the casting failure

Copy link
Contributor Author

@aweltsch aweltsch Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is also used in all of the other evaluation methods for the CaseExpr I would also like to include this in the same clean-up issue. Would this be fine for you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be great, thank you

let else_ = expr
.evaluate_selection(batch, &remainder)?
.into_array(batch.num_rows())?;

Ok(ColumnarValue::Array(zip(&remainder, &else_, &then_value)?))
}
}

impl PhysicalExpr for CaseExpr {
Expand Down Expand Up @@ -457,6 +501,7 @@ impl PhysicalExpr for CaseExpr {
self.case_column_or_null(batch)
}
EvalMethod::ScalarOrScalar => self.scalar_or_scalar(batch),
EvalMethod::ExpressionOrExpression => self.expr_or_expr(batch),
}
}

Expand Down Expand Up @@ -1174,6 +1219,45 @@ mod tests {
Ok(())
}

#[test]
fn test_expr_or_expr_specialization() -> Result<()> {
let batch = case_test_batch1()?;
let schema = batch.schema();
let when = binary(
col("a", &schema)?,
Operator::LtEq,
lit(2i32),
&batch.schema(),
)?;
let then = binary(
col("a", &schema)?,
Operator::Plus,
lit(1i32),
&batch.schema(),
)?;
let else_expr = binary(
col("a", &schema)?,
Operator::Minus,
lit(1i32),
&batch.schema(),
)?;
let expr = CaseExpr::try_new(None, vec![(when, then)], Some(else_expr))?;
assert!(matches!(
expr.eval_method,
EvalMethod::ExpressionOrExpression
));
let result = expr
.evaluate(&batch)?
.into_array(batch.num_rows())
.expect("Failed to convert to array");
let result = as_int32_array(&result).expect("failed to downcast to Int32Array");

let expected = &Int32Array::from(vec![Some(2), Some(1), None, Some(4)]);

assert_eq!(expected, result);
Ok(())
}

fn make_col(name: &str, index: usize) -> Arc<dyn PhysicalExpr> {
Arc::new(Column::new(name, index))
}
Expand Down
11 changes: 11 additions & 0 deletions datafusion/sqllogictest/test_files/case.slt
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,14 @@ query I
SELECT CASE arrow_cast([1,2,3], 'FixedSizeList(3, Int64)') WHEN arrow_cast([1,2,3], 'FixedSizeList(3, Int64)') THEN 1 ELSE 0 END;
----
1

# CASE WHEN with single predicate and two non-trivial branches (expr or expr usage)
query I
SELECT CASE WHEN a < 5 THEN a + b ELSE b - NVL(a, 0) END FROM foo
----
3
7
1
NULL
NULL
7
Loading