Skip to content

Commit

Permalink
Add config to specify the window type in velox backend (#3703)
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf authored Nov 21, 2023
1 parent 60fc2a0 commit 92356bc
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,55 +194,59 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
}

test("window expression") {
runQueryAndCompare(
"select row_number() over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }

runQueryAndCompare(
"select rank() over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }
Seq("sort", "streaming").foreach {
windowType =>
withSQLConf("spark.gluten.sql.columnar.backend.velox.window.type" -> windowType.toString) {
runQueryAndCompare(
"select row_number() over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }

runQueryAndCompare(
"select rank() over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }

runQueryAndCompare(
"select dense_rank() over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }

runQueryAndCompare(
"select percent_rank() over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }

runQueryAndCompare(
"select cume_dist() over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }

runQueryAndCompare(
"select l_suppkey, l_orderkey, nth_value(l_orderkey, 2) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") {
df =>
{
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[WindowExecTransformer]
}) > 0)
}
}

runQueryAndCompare(
"select dense_rank() over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }
runQueryAndCompare(
"select sum(l_partkey + 1) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem") { _ => }

runQueryAndCompare(
"select percent_rank() over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }
runQueryAndCompare(
"select max(l_partkey) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }

runQueryAndCompare(
"select cume_dist() over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }
runQueryAndCompare(
"select min(l_partkey) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }

runQueryAndCompare(
"select l_suppkey, l_orderkey, nth_value(l_orderkey, 2) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") {
df =>
{
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[WindowExecTransformer]
}) > 0)
runQueryAndCompare(
"select avg(l_partkey) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }
}
}

runQueryAndCompare(
"select sum(l_partkey + 1) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem") { _ => }

runQueryAndCompare(
"select max(l_partkey) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }

runQueryAndCompare(
"select min(l_partkey) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }

runQueryAndCompare(
"select avg(l_partkey) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }

}

test("chr function") {
Expand Down
32 changes: 23 additions & 9 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -636,15 +636,29 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
}

auto [sortingKeys, sortingOrders] = processSortField(windowRel.sorts(), inputType);
return std::make_shared<core::WindowNode>(
nextPlanNodeId(),
partitionKeys,
sortingKeys,
sortingOrders,
windowColumnNames,
windowNodeFunctions,
true /*inputsSorted*/,
childNode);

if (windowRel.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(windowRel.advanced_extension(), "isStreaming=")) {
return std::make_shared<core::WindowNode>(
nextPlanNodeId(),
partitionKeys,
sortingKeys,
sortingOrders,
windowColumnNames,
windowNodeFunctions,
true /*inputsSorted*/,
childNode);
} else {
return std::make_shared<core::WindowNode>(
nextPlanNodeId(),
partitionKeys,
sortingKeys,
sortingOrders,
windowColumnNames,
windowNodeFunctions,
false /*inputsSorted*/,
childNode);
}
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::SortRel& sortRel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package io.glutenproject.execution

import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.expression._
import io.glutenproject.extension.ValidationResult
Expand All @@ -33,7 +34,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.window.WindowExecBase
import org.apache.spark.sql.vectorized.ColumnarBatch

import com.google.protobuf.Any
import com.google.protobuf.{Any, StringValue}
import io.substrait.proto.SortField

import java.util.{ArrayList => JArrayList}
Expand Down Expand Up @@ -68,8 +69,11 @@ case class WindowExecTransformer(
}

override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
if (BackendsApiManager.getSettings.requiredChildOrderingForWindow()) {
// We still need to do sort for columnar window, see `FLAGS_SkipRowSortInWindowOp`
if (
BackendsApiManager.getSettings.requiredChildOrderingForWindow()
&& GlutenConfig.getConf.veloxColumnarWindowType.equals("streaming")
) {
// Velox StreamingWindow need to require child order.
Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)
} else {
Seq(Nil)
Expand All @@ -80,6 +84,26 @@ case class WindowExecTransformer(

override def outputPartitioning: Partitioning = child.outputPartitioning

def genWindowParametersBuilder(): com.google.protobuf.Any.Builder = {
// Start with "WindowParameters:"
val windowParametersStr = new StringBuffer("WindowParameters:")
// isStreaming: 1 for streaming, 0 for sort
val isStreaming: Int =
if (GlutenConfig.getConf.veloxColumnarWindowType.equals("streaming")) 1 else 0

windowParametersStr
.append("isStreaming=")
.append(isStreaming)
.append("\n")
val message = StringValue
.newBuilder()
.setValue(windowParametersStr.toString)
.build()
com.google.protobuf.Any.newBuilder
.setValue(message.toByteString)
.setTypeUrl("/google.protobuf.StringValue")
}

def getRelNode(
context: SubstraitContext,
windowExpression: Seq[NamedExpression],
Expand Down Expand Up @@ -132,11 +156,14 @@ case class WindowExecTransformer(
builder.build()
}.asJava
if (!validation) {
val extensionNode =
ExtensionBuilder.makeAdvancedExtension(genWindowParametersBuilder.build(), null)
RelBuilder.makeWindowRel(
input,
windowExpressions,
partitionsExpressions,
sortFieldList,
extensionNode,
context,
operatorId)
} else {
Expand Down
17 changes: 17 additions & 0 deletions shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def enableColumnarWindow: Boolean = conf.getConf(COLUMNAR_WINDOW_ENABLED)

def veloxColumnarWindowType: String = conf.getConfString(COLUMNAR_VELOX_WINDOW_TYPE.key)

def enableColumnarShuffledHashJoin: Boolean = conf.getConf(COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED)

def enableNativeColumnarToRow: Boolean = conf.getConf(COLUMNAR_COLUMNAR_TO_ROW_ENABLED)
Expand Down Expand Up @@ -622,6 +624,21 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)

val COLUMNAR_VELOX_WINDOW_TYPE =
buildConf("spark.gluten.sql.columnar.backend.velox.window.type")
.internal()
.doc(
"Velox backend supports both SortWindow and" +
" StreamingWindow operators." +
" The StreamingWindow operator skips the sorting step" +
" in the input but does not support spill." +
" On the other hand, the SortWindow operator is " +
"responsible for sorting the input data within the" +
" Window operator and also supports spill.")
.stringConf
.checkValues(Set("streaming", "sort"))
.createWithDefault("streaming")

val COLUMNAR_FPRCE_SHUFFLED_HASH_JOIN_ENABLED =
buildConf("spark.gluten.sql.columnar.forceShuffledHashJoin")
.internal()
Expand Down

0 comments on commit 92356bc

Please sign in to comment.