Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL]: Fix VeloxColumnarWriteFilesExecwithNewChildren doesn't replace the dummy child #5726

Merged
merged 3 commits into from
May 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ class VeloxColumnarWriteFilesRDD(
// we need to expose a dummy child (as right child) with type "WriteFilesExec" to let Spark
// choose the new write code path (version >= 3.4). The actual plan to write is the left child
// of this operator.
case class VeloxColumnarWriteFilesExec(
child: SparkPlan,
case class VeloxColumnarWriteFilesExec private (
override val left: SparkPlan,
override val right: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
Expand All @@ -269,7 +270,8 @@ case class VeloxColumnarWriteFilesExec(
extends BinaryExecNode
with GlutenPlan
with VeloxColumnarWriteFilesExec.ExecuteWriteCompatible {
import VeloxColumnarWriteFilesExec._

val child: SparkPlan = left

override lazy val references: AttributeSet = AttributeSet.empty

Expand Down Expand Up @@ -320,28 +322,49 @@ case class VeloxColumnarWriteFilesExec(
new VeloxColumnarWriteFilesRDD(rdd, writeFilesSpec, jobTrackerID)
}
}

override def left: SparkPlan = child

// This is a workaround for FileFormatWriter#write. Vanilla Spark (version >= 3.4) requires for
// a plan that has at least one node exactly of type `WriteFilesExec` that is a Scala case-class,
// to decide to choose new `#executeWrite` code path over the legacy `#execute` for write
// operation.
//
// So we add a no-op `WriteFilesExec` child to let Spark pick the new code path.
//
// See: FileFormatWriter#write
// See: V1Writes#getWriteFilesOpt
override val right: SparkPlan =
WriteFilesExec(NoopLeaf(), fileFormat, partitionColumns, bucketSpec, options, staticPartitions)

override protected def withNewChildrenInternal(
newLeft: SparkPlan,
newRight: SparkPlan): SparkPlan =
copy(newLeft, fileFormat, partitionColumns, bucketSpec, options, staticPartitions)
copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, staticPartitions)
}

object VeloxColumnarWriteFilesExec {

def apply(
child: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec): VeloxColumnarWriteFilesExec = {
// This is a workaround for FileFormatWriter#write. Vanilla Spark (version >= 3.4) requires for
// a plan that has at least one node exactly of type `WriteFilesExec` that is a Scala
// case-class, to decide to choose new `#executeWrite` code path over the legacy `#execute`
// for write operation.
//
// So we add a no-op `WriteFilesExec` child to let Spark pick the new code path.
//
// See: FileFormatWriter#write
// See: V1Writes#getWriteFilesOpt
val right: SparkPlan =
WriteFilesExec(
NoopLeaf(),
fileFormat,
partitionColumns,
bucketSpec,
options,
staticPartitions)

VeloxColumnarWriteFilesExec(
child,
right,
fileFormat,
partitionColumns,
bucketSpec,
options,
staticPartitions)
}

private case class NoopLeaf() extends LeafExecNode {
override protected def doExecute(): RDD[InternalRow] =
throw new GlutenException(s"$nodeName does not support doExecute")
Expand Down
Loading