Skip to content

Commit

Permalink
[CORE] Remove IteratorApi.genNativeFileScanRDD, both velox and clickc…
Browse files Browse the repository at this point in the history
…house backend needn't it. (#5937)

[CORE] Remove IteratorApi.genNativeFileScanRDD, both velox and clickchouse backend needn't it.
  • Loading branch information
baibaichen authored May 31, 2024
1 parent c94cde4 commit 2fc808d
Show file tree
Hide file tree
Showing 8 changed files with 6 additions and 368 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ import org.apache.gluten.{GlutenConfig, GlutenNumaBindingInfo}
import org.apache.gluten.backendsapi.IteratorApi
import org.apache.gluten.execution._
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.metrics.{GlutenTimeMetric, IMetrics, NativeMetrics}
import org.apache.gluten.metrics.{IMetrics, NativeMetrics}
import org.apache.gluten.substrait.plan.PlanNode
import org.apache.gluten.substrait.rel._
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.utils.LogLevelUtil
import org.apache.gluten.vectorized.{CHNativeExpressionEvaluator, CloseableCHColumnBatchIterator, GeneralInIterator, GeneralOutIterator}

import org.apache.spark.{InterruptibleIterator, SparkConf, SparkContext, TaskContext}
import org.apache.spark.{InterruptibleIterator, SparkConf, TaskContext}
import org.apache.spark.affinity.CHAffinity
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -315,44 +314,6 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
context.addTaskCompletionListener[Unit](_ => close())
new CloseableCHColumnBatchIterator(resIter, Some(pipelineTime))
}

/** Generate Native FileScanRDD, currently only for ClickHouse Backend. */
override def genNativeFileScanRDD(
sparkContext: SparkContext,
wsCtx: WholeStageTransformContext,
splitInfos: Seq[SplitInfo],
scan: BasicScanExecTransformer,
numOutputRows: SQLMetric,
numOutputBatches: SQLMetric,
scanTime: SQLMetric): RDD[ColumnarBatch] = {
val substraitPlanPartition = GlutenTimeMetric.withMillisTime {
val planByteArray = wsCtx.root.toProtobuf.toByteArray
splitInfos.zipWithIndex.map {
case (splitInfo, index) =>
val splitInfoByteArray = splitInfo match {
case filesNode: LocalFilesNode =>
setFileSchemaForLocalFiles(filesNode, scan)
filesNode.setFileReadProperties(mapAsJavaMap(scan.getProperties))
filesNode.toProtobuf.toByteArray
case extensionTableNode: ExtensionTableNode =>
extensionTableNode.toProtobuf.toByteArray
}

GlutenPartition(
index,
planByteArray,
Array(splitInfoByteArray),
locations = splitInfo.preferredLocations().asScala.toArray)
}
}(t => logInfo(s"Generating the Substrait plan took: $t ms."))

new NativeFileScanColumnarRDD(
sparkContext,
substraitPlanPartition,
numOutputRows,
numOutputBatches,
scanTime)
}
}

object CHIteratorApi {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.utils._
import org.apache.gluten.vectorized._

import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.softaffinity.SoftAffinity
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
Expand Down Expand Up @@ -232,16 +231,4 @@ class VeloxIteratorApi extends IteratorApi with Logging {
.create()
}
// scalastyle:on argcount

/** Generate Native FileScanRDD, currently only for ClickHouse Backend. */
override def genNativeFileScanRDD(
sparkContext: SparkContext,
wsCxt: WholeStageTransformContext,
splitInfos: Seq[SplitInfo],
scan: BasicScanExecTransformer,
numOutputRows: SQLMetric,
numOutputBatches: SQLMetric,
scanTime: SQLMetric): RDD[ColumnarBatch] = {
throw new UnsupportedOperationException("Cannot support to generate Native FileScanRDD.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.SplitInfo

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -81,14 +80,4 @@ trait IteratorApi {
partitionIndex: Int,
materializeInput: Boolean = false): Iterator[ColumnarBatch]
// scalastyle:on argcount

/** Generate Native FileScanRDD, currently only for ClickHouse Backend. */
def genNativeFileScanRDD(
sparkContext: SparkContext,
wsCxt: WholeStageTransformContext,
splitInfos: Seq[SplitInfo],
scan: BasicScanExecTransformer,
numOutputRows: SQLMetric,
numOutputBatches: SQLMetric,
scanTime: SQLMetric): RDD[ColumnarBatch]
}
Loading

0 comments on commit 2fc808d

Please sign in to comment.