Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Compatible lag function in spark-3.0 #7168

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,26 @@ package org.apache.gluten.execution
class WindowFunctionsValidateSuite extends FunctionsValidateSuite {

test("lag/lead window function with negative input offset") {
runQueryAndCompare(
"select l_suppkey,lag(l_orderkey, -2) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem") {
checkGlutenOperatorMatch[WindowExecTransformer]
}

runQueryAndCompare(
"select l_suppkey, lead(l_orderkey, -2) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem") {
checkGlutenOperatorMatch[WindowExecTransformer]
}

runQueryAndCompare(
"select lag(l_orderkey, -2) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem") {
checkGlutenOperatorMatch[WindowExecTransformer]
}

runQueryAndCompare(
"select lead(l_orderkey, -2) over" +
"select lag(l_orderkey, 2) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem") {
checkGlutenOperatorMatch[WindowExecTransformer]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object WindowFunctionsBuilder {
val substraitFunc = windowFunc match {
// Handle lag with negative inputOffset, e.g., converts lag(c1, -1) to lead(c1, 1).
// Spark uses `-inputOffset` as `offset` for Lag function.
case lag: Lag if lag.offset.eval(EmptyRow).asInstanceOf[Int] > 0 =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kecookier, seems offset > 0 implies inputOffset < 0, so the original handling is expected, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PHILO-HE This PR addresses the lag result mismatch issue in my current version of Spark. Thank you for your input. I have reviewed the Spark code, and now I understand the differences.

In Spark 3.0, the LAG function calculates the bound using both the offset and the direction. However, in versions post Spark 3.1, the function does not consider the direction. Instead, it uses a single literal expression that includes the offset of the current row to calculate the bound. For example, in lag(), the offset will be wrapped with UnaryMinus(offset).

lag in spark 3.0 https://github.com/apache/spark/blob/branch-3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L474

lag in spark 3.2 https://github.com/apache/spark/blob/branch-3.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L538

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kecookier, I see. Thanks for your clarification! Seems there is no way to let gluten adapt to the two implementations. Should you fix it in a forked gluten on your side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will close this PR.

case lag: Lag if lag.offset.eval(EmptyRow).asInstanceOf[Int] < 0 =>
Some(LEAD)
// Handle lead with negative offset, e.g., converts lead(c1, -1) to lag(c1, 1).
case lead: Lead if lead.offset.eval(EmptyRow).asInstanceOf[Int] < 0 =>
Expand Down
Loading