diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 41a69f7a3..d91ae5e4d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2535,8 +2535,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } }.toArray - val windowExprProto = winExprs.map(windowExprToProto(_, output)) + if (winExprs.length != windowExpression.length) { + withInfo(op, "Unsupported window expression(s)") + return None + } + val windowExprProto = winExprs.map(windowExprToProto(_, output)) val partitionExprs = partitionSpec.map(exprToProto(_, child.output)) val sortOrders = orderSpec.map(exprToProto(_, child.output)) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index e657af9b9..b8c0d5668 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -63,6 +63,15 @@ class CometExecSuite extends CometTestBase { } } + test("Unsupported window expression should fall back to Spark") { + checkAnswer( + spark.sql("select sum(a) over () from values 1.0, 2.0, 3.0 T(a)"), + Row(6.0) :: Row(6.0) :: Row(6.0) :: Nil) + checkAnswer( + spark.sql("select avg(a) over () from values 1.0, 2.0, 3.0 T(a)"), + Row(2.0) :: Row(2.0) :: Row(2.0) :: Nil) + } + test("fix CometNativeExec.doCanonicalize for ReusedExchangeExec") { assume(isSpark34Plus, "ChunkedByteBuffer is not serializable before Spark 3.4+") withSQLConf(