diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala index 23dff990c4647..1d3d55afb5262 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala @@ -259,8 +259,9 @@ class VeloxColumnarWriteFilesRDD( // we need to expose a dummy child (as right child) with type "WriteFilesExec" to let Spark // choose the new write code path (version >= 3.4). The actual plan to write is the left child // of this operator. -case class VeloxColumnarWriteFilesExec( - child: SparkPlan, +case class VeloxColumnarWriteFilesExec private ( + override val left: SparkPlan, + override val right: SparkPlan, fileFormat: FileFormat, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], @@ -269,7 +270,8 @@ case class VeloxColumnarWriteFilesExec( extends BinaryExecNode with GlutenPlan with VeloxColumnarWriteFilesExec.ExecuteWriteCompatible { - import VeloxColumnarWriteFilesExec._ + + val child: SparkPlan = left override lazy val references: AttributeSet = AttributeSet.empty @@ -320,28 +322,49 @@ case class VeloxColumnarWriteFilesExec( new VeloxColumnarWriteFilesRDD(rdd, writeFilesSpec, jobTrackerID) } } - - override def left: SparkPlan = child - - // This is a workaround for FileFormatWriter#write. Vanilla Spark (version >= 3.4) requires for - // a plan that has at least one node exactly of type `WriteFilesExec` that is a Scala case-class, - // to decide to choose new `#executeWrite` code path over the legacy `#execute` for write - // operation. - // - // So we add a no-op `WriteFilesExec` child to let Spark pick the new code path. - // - // See: FileFormatWriter#write - // See: V1Writes#getWriteFilesOpt - override val right: SparkPlan = - WriteFilesExec(NoopLeaf(), fileFormat, partitionColumns, bucketSpec, options, staticPartitions) - override protected def withNewChildrenInternal( newLeft: SparkPlan, newRight: SparkPlan): SparkPlan = - copy(newLeft, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) + copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) } object VeloxColumnarWriteFilesExec { + + def apply( + child: SparkPlan, + fileFormat: FileFormat, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + options: Map[String, String], + staticPartitions: TablePartitionSpec): VeloxColumnarWriteFilesExec = { + // This is a workaround for FileFormatWriter#write. Vanilla Spark (version >= 3.4) requires for + // a plan that has at least one node exactly of type `WriteFilesExec` that is a Scala + // case-class, to decide to choose new `#executeWrite` code path over the legacy `#execute` + // for write operation. + // + // So we add a no-op `WriteFilesExec` child to let Spark pick the new code path. + // + // See: FileFormatWriter#write + // See: V1Writes#getWriteFilesOpt + val right: SparkPlan = + WriteFilesExec( + NoopLeaf(), + fileFormat, + partitionColumns, + bucketSpec, + options, + staticPartitions) + + VeloxColumnarWriteFilesExec( + child, + right, + fileFormat, + partitionColumns, + bucketSpec, + options, + staticPartitions) + } + private case class NoopLeaf() extends LeafExecNode { override protected def doExecute(): RDD[InternalRow] = throw new GlutenException(s"$nodeName does not support doExecute")