Skip to content

Commit

Permalink
[comet-parquet-exec] Handle CometNativeScan RDD when DataSourceRDD in…
Browse files Browse the repository at this point in the history
…stead of FileScanRDD (#1088)

* DataSourceRDD handling (seems to be related to prefetching, so maybe not relevant for our ParquetExec).

* Refactor to reduce duplicate code.
  • Loading branch information
mbutrovich authored Nov 15, 2024
1 parent 786250a commit 8a0df9d
Showing 1 changed file with 34 additions and 17 deletions.
51 changes: 34 additions & 17 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import org.apache.spark.sql.execution
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregateExec, ObjectHashAggregateExec}
import org.apache.spark.sql.execution.datasources.FileScanRDD
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD}
import org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter
import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDD, DataSourceRDDPartition}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.window.WindowExec
Expand Down Expand Up @@ -2497,22 +2498,22 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
val dataFilters = scan.dataFilters.map(exprToProto(_, scan.output))
nativeScanBuilder.addAllDataFilters(dataFilters.map(_.get).asJava)

// Eventually we'll want to modify CometNativeScan to generate the file partitions
// for us without instantiating the RDD.
val file_partitions = scan.inputRDD.asInstanceOf[FileScanRDD].filePartitions;
file_partitions.foreach(partition => {
val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder()
partition.files.foreach(file => {
val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder()
fileBuilder
.setFilePath(file.pathUri.toString)
.setStart(file.start)
.setLength(file.length)
.setFileSize(file.fileSize)
partitionBuilder.addPartitionedFile(fileBuilder.build())
})
nativeScanBuilder.addFilePartitions(partitionBuilder.build())
})
// TODO: modify CometNativeScan to generate the file partitions without instantiating RDD.
scan.inputRDD match {
case rdd: DataSourceRDD =>
val partitions = rdd.partitions
partitions.foreach(p => {
val inputPartitions = p.asInstanceOf[DataSourceRDDPartition].inputPartitions
inputPartitions.foreach(partition => {
partition2Proto(partition.asInstanceOf[FilePartition], nativeScanBuilder)
})
})
case rdd: FileScanRDD =>
rdd.filePartitions.foreach(partition => {
partition2Proto(partition, nativeScanBuilder)
})
case _ =>
}

val requiredSchemaParquet =
new SparkToParquetSchemaConverter(conf).convert(scan.requiredSchema)
Expand Down Expand Up @@ -3185,4 +3186,20 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim

true
}

private def partition2Proto(
partition: FilePartition,
nativeScanBuilder: OperatorOuterClass.NativeScan.Builder): Unit = {
val partitionBuilder = OperatorOuterClass.SparkFilePartition.newBuilder()
partition.files.foreach(file => {
val fileBuilder = OperatorOuterClass.SparkPartitionedFile.newBuilder()
fileBuilder
.setFilePath(file.pathUri.toString)
.setStart(file.start)
.setLength(file.length)
.setFileSize(file.fileSize)
partitionBuilder.addPartitionedFile(fileBuilder.build())
})
nativeScanBuilder.addFilePartitions(partitionBuilder.build())
}
}

0 comments on commit 8a0df9d

Please sign in to comment.