Skip to content

Commit

Permalink
[GLUTEN-3361] Support spark 3.4 in Gluten (#3360)
Browse files Browse the repository at this point in the history
Mainly changes:

In Spark 3.4, there was an API change in BatchScanExec. You can find the details here. A new table parameter has been added, so it needs to be included in the shim layer.
We have copied some code from vanilla Spark in FileSourceScanExecTransformer. You can find it here. In Spark 3.3 and 3.2, the copied code was private, but in Spark 3.4, it is protected. Therefore, it should be placed in the shim layer. Additionally, GlutenTimeMetric.scala and Arm.scala need to be placed in the shim layer to resolve compilation issues.
Offset.scala is newly introduced in Spark 3.4. Hence, it should be added to the Spark 3.2 and Spark 3.3 shim layers.
Empty2Null is in org.apache.spark.sql.execution.datasources.FileFormatWriter package in spark 3.2 and spark 3.3. So we also need add Empty2Null in org.apache.spark.sql.execution.datasources.FileFormatWriter package in Spark 3.4 to pass the compile issue.
The PartitionedFile API changed the filePath parameter from string to SparkPath object in 3.4.
In Spark 3.4, StatFunctions.scala is no longer needed in shim layer. Therefore, it has been moved from shim common to spark 3.3 and shim spark 3.2 shim layer.
PromotePrecision.scala has been deleted in Spark 3.4.
The unused DwrfScan code has been removed from the Spark 3.2 and Spark 3.3 shim layers.
Since DataSourceStrategyUtil.scala is not used in gluten, it has been deleted.
A new API, visitOffset(p: Offset), has been added to the LogicalPlanVisitor in Spark 3.4.
  • Loading branch information
JkSelf authored Oct 20, 2023
1 parent 60e92e4 commit 3e73f32
Show file tree
Hide file tree
Showing 57 changed files with 5,329 additions and 348 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/velox_be.yml
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,19 @@ jobs:
--local --preset=velox --benchmark-type=h --error-on-memleak --disable-aqe --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
&& GLUTEN_IT_JVM_ARGS=-Xmx20G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=30g -s=10.0 --threads=32 --iterations=1'
- name: Build for Spark 3.4.1
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c '
cd /opt/gluten && \
mvn clean install -Pspark-3.4 -Pbackends-velox -Prss -DskipTests'
- name: TPC-H SF1.0 && TPC-DS SF10.0 Parquet local spark3.4
run: |
docker exec ubuntu2204-test-$GITHUB_RUN_ID bash -c 'cd /opt/gluten/tools/gluten-it && \
mvn clean install -Pspark-3.4 \
&& GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=h --error-on-memleak --disable-aqe --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
&& GLUTEN_IT_JVM_ARGS=-Xmx20G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak --off-heap-size=30g -s=10.0 --threads=32 --iterations=1'
- name: Exit docker container
if: ${{ always() }}
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
val partitionColumns = mutable.ArrayBuffer.empty[Map[String, String]]
files.foreach {
file =>
paths.append(URLDecoder.decode(file.filePath, StandardCharsets.UTF_8.name()))
paths.append(URLDecoder.decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
starts.append(java.lang.Long.valueOf(file.start))
lengths.append(java.lang.Long.valueOf(file.length))

Expand Down
1 change: 1 addition & 0 deletions dev/buildbundle-veloxbe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ source "$BASEDIR/builddeps-veloxbe.sh"
cd $GLUTEN_DIR
mvn clean package -Pbackends-velox -Prss -Pspark-3.2 -DskipTests
mvn clean package -Pbackends-velox -Prss -Pspark-3.3 -DskipTests
mvn clean package -Pbackends-velox -Prss -Pspark-3.4 -DskipTests
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.glutenproject.expression.{ConverterUtils, ExpressionConverter, Express
import io.glutenproject.extension.{GlutenPlan, ValidationResult}
import io.glutenproject.extension.columnar.TransformHints
import io.glutenproject.metrics.MetricsUpdater
import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.substrait.`type`.{TypeBuilder, TypeNode}
import io.glutenproject.substrait.SubstraitContext
import io.glutenproject.substrait.expression.ExpressionNode
Expand All @@ -45,7 +46,6 @@ import scala.collection.JavaConverters._
abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkPlan)
extends UnaryTransformSupport
with PredicateHelper
with AliasAwareOutputPartitioning
with Logging {

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
Expand Down Expand Up @@ -104,8 +104,6 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP
}
}

override protected def outputExpressions: Seq[NamedExpression] = output

override def output: Seq[Attribute] = {
child.output.map {
a =>
Expand Down Expand Up @@ -188,7 +186,6 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP
case class ProjectExecTransformer private (projectList: Seq[NamedExpression], child: SparkPlan)
extends UnaryTransformSupport
with PredicateHelper
with AliasAwareOutputPartitioning
with Logging {

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
Expand Down Expand Up @@ -304,8 +301,6 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch
throw new UnsupportedOperationException(s"This operator doesn't support doExecuteColumnar().")
}

override protected def outputExpressions: Seq[NamedExpression] = projectList

override protected def withNewChildInternal(newChild: SparkPlan): ProjectExecTransformer =
copy(child = newChild)
}
Expand Down Expand Up @@ -505,7 +500,11 @@ object FilterHandler {
getLeftFilters(scan.dataFilters, flattenCondition(plan.condition))
val newPartitionFilters =
ExpressionConverter.transformDynamicPruningExpr(scan.partitionFilters, reuseSubquery)
new BatchScanExecTransformer(batchScan.output, scan, leftFilters ++ newPartitionFilters)
new BatchScanExecTransformer(
batchScan.output,
scan,
leftFilters ++ newPartitionFilters,
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScan))
case _ =>
if (batchScan.runtimeFilters.isEmpty) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package io.glutenproject.execution
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.extension.ValidationResult
import io.glutenproject.metrics.MetricsUpdater
import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim, FileScan}
import org.apache.spark.sql.execution.metric.SQLMetric
Expand All @@ -41,8 +44,13 @@ class BatchScanExecTransformer(
output: Seq[AttributeReference],
@transient scan: Scan,
runtimeFilters: Seq[Expression],
keyGroupedPartitioning: Option[Seq[Expression]] = None)
extends BatchScanExecShim(output, scan, runtimeFilters)
keyGroupedPartitioning: Option[Seq[Expression]] = None,
ordering: Option[Seq[SortOrder]] = None,
@transient table: Table,
commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
applyPartialClustering: Boolean = false,
replicatePartitions: Boolean = false)
extends BatchScanExecShim(output, scan, runtimeFilters, table)
with BasicScanExecTransformer {

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
Expand Down Expand Up @@ -117,7 +125,8 @@ class BatchScanExecTransformer(
new BatchScanExecTransformer(
canonicalized.output,
canonicalized.scan,
canonicalized.runtimeFilters
canonicalized.runtimeFilters,
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(canonicalized)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,23 @@ package io.glutenproject.execution
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.expression.ConverterUtils
import io.glutenproject.extension.ValidationResult
import io.glutenproject.metrics.{GlutenTimeMetric, MetricsUpdater}
import io.glutenproject.metrics.MetricsUpdater
import io.glutenproject.substrait.SubstraitContext
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
import io.glutenproject.substrait.rel.ReadRelNode

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, DynamicPruningExpression, Expression, PlanExpression, Predicate}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PlanExpression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.{FileSourceScanExecShim, InSubqueryExec, ScalarSubquery, SQLExecution}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.FileSourceScanExecShim
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.collection.BitSet

import java.util.concurrent.TimeUnit.NANOSECONDS

import scala.collection.{mutable, JavaConverters}
import scala.collection.JavaConverters

class FileSourceScanExecTransformer(
@transient relation: HadoopFsRelation,
Expand All @@ -64,10 +62,10 @@ class FileSourceScanExecTransformer(
// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
@transient override lazy val metrics: Map[String, SQLMetric] =
BackendsApiManager.getMetricsApiInstance
.genFileSourceScanTransformerMetrics(sparkContext) ++ staticMetrics
.genFileSourceScanTransformerMetrics(sparkContext) ++ staticMetricsAlias

/** SQL metrics generated only for scans using dynamic partition pruning. */
private lazy val staticMetrics =
private lazy val staticMetricsAlias =
if (partitionFilters.exists(FileSourceScanExecTransformer.isDynamicPruningFilter)) {
Map(
"staticFilesNum" -> SQLMetrics.createMetric(sparkContext, "static number of files read"),
Expand Down Expand Up @@ -135,96 +133,6 @@ class FileSourceScanExecTransformer(
override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genFileSourceScanTransformerMetricsUpdater(metrics)

// The codes below are copied from FileSourceScanExec in Spark,
// all of them are private.
protected lazy val driverMetrics: mutable.HashMap[String, Long] = mutable.HashMap.empty

/**
* Send the driver-side metrics. Before calling this function, selectedPartitions has been
* initialized. See SPARK-26327 for more details.
*/
protected def sendDriverMetrics(): Unit = {
driverMetrics.foreach(e => metrics(e._1).add(e._2))
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkContext,
executionId,
metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
}

protected def setFilesNumAndSizeMetric(
partitions: Seq[PartitionDirectory],
static: Boolean): Unit = {
val filesNum = partitions.map(_.files.size.toLong).sum
val filesSize = partitions.map(_.files.map(_.getLen).sum).sum
if (!static || !partitionFilters.exists(FileSourceScanExecTransformer.isDynamicPruningFilter)) {
driverMetrics("numFiles") = filesNum
driverMetrics("filesSize") = filesSize
} else {
driverMetrics("staticFilesNum") = filesNum
driverMetrics("staticFilesSize") = filesSize
}
if (relation.partitionSchema.nonEmpty) {
driverMetrics("numPartitions") = partitions.length
}
}

@transient override lazy val selectedPartitions: Array[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
GlutenTimeMetric.withNanoTime {
val ret =
relation.location.listFiles(
partitionFilters.filterNot(FileSourceScanExecTransformer.isDynamicPruningFilter),
dataFilters)
setFilesNumAndSizeMetric(ret, static = true)
ret
}(t => driverMetrics("metadataTime") = NANOSECONDS.toMillis(t + optimizerMetadataTimeNs))
}.toArray

// We can only determine the actual partitions at runtime when a dynamic partition filter is
// present. This is because such a filter relies on information that is only available at run
// time (for instance the keys used in the other side of a join).
@transient lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = {
val dynamicPartitionFilters =
partitionFilters.filter(FileSourceScanExecTransformer.isDynamicPruningFilter)
val selected = if (dynamicPartitionFilters.nonEmpty) {
// When it includes some DynamicPruningExpression,
// it needs to execute InSubqueryExec first,
// because doTransform path can't execute 'doExecuteColumnar' which will
// execute prepare subquery first.
dynamicPartitionFilters.foreach {
case DynamicPruningExpression(inSubquery: InSubqueryExec) =>
executeInSubqueryForDynamicPruningExpression(inSubquery)
case e: Expression =>
e.foreach {
case s: ScalarSubquery => s.updateResult()
case _ =>
}
case _ =>
}
GlutenTimeMetric.withMillisTime {
// call the file index for the files matching all filters except dynamic partition filters
val predicate = dynamicPartitionFilters.reduce(And)
val partitionColumns = relation.partitionSchema
val boundPredicate = Predicate.create(
predicate.transform {
case a: AttributeReference =>
val index = partitionColumns.indexWhere(a.name == _.name)
BoundReference(index, partitionColumns(index).dataType, nullable = true)
},
Nil
)
val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
setFilesNumAndSizeMetric(ret, static = false)
ret
}(t => driverMetrics("pruningTime") = t)
} else {
selectedPartitions
}
sendDriverMetrics()
selected
}

override val nodeNamePrefix: String = "NativeFile"

override val nodeName: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.glutenproject.execution._
import io.glutenproject.expression.ExpressionConverter
import io.glutenproject.extension.columnar._
import io.glutenproject.metrics.GlutenTimeMetric
import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.utils.{ColumnarShuffleUtil, LogLevelUtil, PhysicalPlanSelector}

import org.apache.spark.api.python.EvalPythonExecTransformer
Expand Down Expand Up @@ -578,7 +579,12 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean)
case _ =>
ExpressionConverter.transformDynamicPruningExpr(plan.runtimeFilters, reuseSubquery)
}
val transformer = new BatchScanExecTransformer(plan.output, plan.scan, newPartitionFilters)
val transformer = new BatchScanExecTransformer(
plan.output,
plan.scan,
newPartitionFilters,
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(plan))

val validationResult = transformer.doValidate()
if (validationResult.isValid) {
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.glutenproject.GlutenConfig
import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.execution._
import io.glutenproject.extension.{GlutenPlan, ValidationResult}
import io.glutenproject.sql.shims.SparkShimLoader
import io.glutenproject.utils.PhysicalPlanSelector

import org.apache.spark.api.python.EvalPythonExecTransformer
Expand Down Expand Up @@ -333,8 +334,11 @@ case class AddTransformHintRule() extends Rule[SparkPlan] {
if (plan.runtimeFilters.nonEmpty) {
TransformHints.tagTransformable(plan)
} else {
val transformer =
new BatchScanExecTransformer(plan.output, plan.scan, plan.runtimeFilters)
val transformer = new BatchScanExecTransformer(
plan.output,
plan.scan,
plan.runtimeFilters,
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(plan))
TransformHints.tag(plan, transformer.doValidate().toTransformHint)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class GlutenDriverEndpoint extends IsolatedRpcEndpoint with Logging {
private val driverEndpoint: RpcEndpointRef =
rpcEnv.setupEndpoint(GlutenRpcConstants.GLUTEN_DRIVER_ENDPOINT_NAME, this)

// TODO(yuan): get thread cnt from spark context
override def threadCount(): Int = 1
override def receive: PartialFunction[Any, Unit] = {
case GlutenOnExecutionStart(executionId) =>
if (executionId == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
@volatile var driverEndpointRef: RpcEndpointRef = null

rpcEnv.setupEndpoint(GlutenRpcConstants.GLUTEN_EXECUTOR_ENDPOINT_NAME, this)

// TODO(yuan): get thread cnt from spark context
override def threadCount(): Int = 1
override def onStart(): Unit = {
rpcEnv
.asyncSetupEndpointRefByURI(driverUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ object SoftAffinityUtil extends LogLevelUtil with Logging {
// using SoftAffinityManager to generate target executors.
// Only using the first file to calculate the target executors
// Only get one file to calculate the target host
val file = filePartition.files.sortBy(_.filePath).head
val locations = SoftAffinityManager.askExecutors(file.filePath)
val file = filePartition.files.sortBy(_.filePath.toString).head
val locations = SoftAffinityManager.askExecutors(file.filePath.toString)
if (!locations.isEmpty) {
logOnLevel(
softAffinityLogLevel,
Expand Down
Loading

0 comments on commit 3e73f32

Please sign in to comment.