Skip to content

Commit

Permalink
[VL]: Fix VeloxColumnarWriteFilesExec.withNewChildren doesn't replace…
Browse files Browse the repository at this point in the history
… the dummy child
  • Loading branch information
zhztheplayer committed May 13, 2024
1 parent b0ae7a6 commit c332466
Showing 1 changed file with 42 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ class VeloxColumnarWriteFilesRDD(
// we need to expose a dummy child (as right child) with type "WriteFilesExec" to let Spark
// choose the new write code path (version >= 3.4). The actual plan to write is the left child
// of this operator.
case class VeloxColumnarWriteFilesExec(
child: SparkPlan,
case class VeloxColumnarWriteFilesExec private (
override val left: SparkPlan,
override val right: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
Expand All @@ -269,7 +270,8 @@ case class VeloxColumnarWriteFilesExec(
extends BinaryExecNode
with GlutenPlan
with VeloxColumnarWriteFilesExec.ExecuteWriteCompatible {
import VeloxColumnarWriteFilesExec._

val child: SparkPlan = left

override lazy val references: AttributeSet = AttributeSet.empty

Expand Down Expand Up @@ -320,28 +322,49 @@ case class VeloxColumnarWriteFilesExec(
new VeloxColumnarWriteFilesRDD(rdd, writeFilesSpec, jobTrackerID)
}
}

override def left: SparkPlan = child

// This is a workaround for FileFormatWriter#write. Vanilla Spark (version >= 3.4) requires for
// a plan that has at least one node exactly of type `WriteFilesExec` that is a Scala case-class,
// to decide to choose new `#executeWrite` code path over the legacy `#execute` for write
// operation.
//
// So we add a no-op `WriteFilesExec` child to let Spark pick the new code path.
//
// See: FileFormatWriter#write
// See: V1Writes#getWriteFilesOpt
override val right: SparkPlan =
WriteFilesExec(NoopLeaf(), fileFormat, partitionColumns, bucketSpec, options, staticPartitions)

override protected def withNewChildrenInternal(
newLeft: SparkPlan,
newRight: SparkPlan): SparkPlan =
copy(newLeft, fileFormat, partitionColumns, bucketSpec, options, staticPartitions)
copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, staticPartitions)
}

object VeloxColumnarWriteFilesExec {

def apply(
child: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec): VeloxColumnarWriteFilesExec = {
// This is a workaround for FileFormatWriter#write. Vanilla Spark (version >= 3.4) requires for
// a plan that has at least one node exactly of type `WriteFilesExec` that is a Scala
// case-class, to decide to choose new `#executeWrite` code path over the legacy `#execute`
// for write operation.
//
// So we add a no-op `WriteFilesExec` child to let Spark pick the new code path.
//
// See: FileFormatWriter#write
// See: V1Writes#getWriteFilesOpt
val right: SparkPlan =
WriteFilesExec(
NoopLeaf(),
fileFormat,
partitionColumns,
bucketSpec,
options,
staticPartitions)

VeloxColumnarWriteFilesExec(
child,
right,
fileFormat,
partitionColumns,
bucketSpec,
options,
staticPartitions)
}

private case class NoopLeaf() extends LeafExecNode {
override protected def doExecute(): RDD[InternalRow] =
throw new GlutenException(s"$nodeName does not support doExecute")
Expand Down

0 comments on commit c332466

Please sign in to comment.