diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index bf1ed0f87901..b6aa2ea4d580 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -22,7 +22,7 @@ import org.apache.gluten.execution._ import org.apache.gluten.metrics.IMetrics import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.plan.PlanNode -import org.apache.gluten.substrait.rel.{LocalFilesBuilder, RawSplitInfo, SplitInfo} +import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode, RawSplitInfo, SplitInfo} import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat import org.apache.gluten.utils._ import org.apache.gluten.utils.iterator.Iterators @@ -81,14 +81,14 @@ class VeloxIteratorApi extends IteratorApi with Logging { GlutenRawPartition( index, planByteArray, - splitInfos.map(_.asInstanceOf[RawSplitInfo]) + splitInfos ) } } - private def toSplitInfoByteArray(splitInfos: Seq[RawSplitInfo]): Array[Array[Byte]] = { + private def toSplitInfoByteArray(splitInfos: Seq[SplitInfo]): Array[Array[Byte]] = { splitInfos.map { - splitInfo => + case rawSplitInfo: RawSplitInfo => val ( paths, starts, @@ -98,12 +98,12 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionColumns, metadataColumns) = constructSplitInfo( - splitInfo.getPartitionSchema, - splitInfo.getFilePartition.files, - splitInfo.getMetadataColumn.asScala) + rawSplitInfo.getPartitionSchema, + rawSplitInfo.getFilePartition.files, + rawSplitInfo.getMetadataColumn.asScala) LocalFilesBuilder .makeLocalFiles( - splitInfo.getFilePartition.index, + rawSplitInfo.getFilePartition.index, paths, starts, lengths, @@ -111,12 +111,13 @@ class VeloxIteratorApi extends IteratorApi with Logging { modificationTimes, partitionColumns, metadataColumns, - splitInfo.getFileFormat, + rawSplitInfo.getFileFormat, new JArrayList[String](), - splitInfo.getProperties + rawSplitInfo.getProperties ) .toProtobuf .toByteArray + case localFilesNode: LocalFilesNode => localFilesNode.toProtobuf.toByteArray }.toArray } diff --git a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala index f675f6360a55..e401a83d5085 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala @@ -19,7 +19,7 @@ package org.apache.gluten.execution import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics} -import org.apache.gluten.substrait.rel.RawSplitInfo +import org.apache.gluten.substrait.rel.SplitInfo import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} import org.apache.spark.rdd.RDD @@ -49,7 +49,7 @@ case class GlutenPartition( case class GlutenRawPartition( index: Int, plan: Array[Byte], - splitInfos: Seq[RawSplitInfo], + splitInfos: Seq[SplitInfo], files: Array[String] = Array.empty[String] // touched files, for implementing UDF input_file_names ) extends BaseGlutenPartition {