Skip to content

Commit

Permalink
WIP: support spark-3.4
Browse files Browse the repository at this point in the history
Signed-off-by: Yuan Zhou <[email protected]>
  • Loading branch information
zhouyuan authored and JkSelf committed Oct 13, 2023
1 parent 60d2946 commit 4a4f0ff
Show file tree
Hide file tree
Showing 40 changed files with 4,510 additions and 406 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class IteratorHandler extends IteratorApi with Logging {
val partitionColumns = mutable.ArrayBuffer.empty[Map[String, String]]
files.foreach {
file =>
paths.append(URLDecoder.decode(file.filePath, StandardCharsets.UTF_8.name()))
paths.append(URLDecoder.decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
starts.append(java.lang.Long.valueOf(file.start))
lengths.append(java.lang.Long.valueOf(file.length))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP
extends UnaryExecNode
with TransformSupport
with PredicateHelper
with AliasAwareOutputPartitioning
with Logging {

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
Expand Down Expand Up @@ -129,8 +128,6 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP
}
}

override protected def outputExpressions: Seq[NamedExpression] = output

override def output: Seq[Attribute] = {
child.output.map {
a =>
Expand Down Expand Up @@ -219,7 +216,6 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch
extends UnaryExecNode
with TransformSupport
with PredicateHelper
with AliasAwareOutputPartitioning
with Logging {

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
Expand Down Expand Up @@ -358,8 +354,6 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch
throw new UnsupportedOperationException(s"This operator doesn't support doExecuteColumnar().")
}

override protected def outputExpressions: Seq[NamedExpression] = projectList

override protected def doExecute()
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
Expand Down Expand Up @@ -564,7 +558,11 @@ object FilterHandler {
getLeftFilters(scan.dataFilters, flattenCondition(plan.condition))
val newPartitionFilters =
ExpressionConverter.transformDynamicPruningExpr(scan.partitionFilters, reuseSubquery)
new BatchScanExecTransformer(batchScan.output, scan, leftFilters ++ newPartitionFilters)
new BatchScanExecTransformer(
batchScan.output,
scan,
leftFilters ++ newPartitionFilters,
batchScan.table)
case _ =>
if (batchScan.runtimeFilters.isEmpty) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import io.glutenproject.metrics.MetricsUpdater
import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim, FileScan}
Expand All @@ -43,8 +45,22 @@ class BatchScanExecTransformer(
output: Seq[AttributeReference],
@transient scan: Scan,
runtimeFilters: Seq[Expression],
keyGroupedPartitioning: Option[Seq[Expression]] = None)
extends BatchScanExecShim(output, scan, runtimeFilters)
@transient table: Table,
keyGroupedPartitioning: Option[Seq[Expression]] = None,
ordering: Option[Seq[SortOrder]] = None,
commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
applyPartialClustering: Boolean = false,
replicatePartitions: Boolean = false)
extends BatchScanExecShim(
output,
scan,
runtimeFilters,
keyGroupedPartitioning,
ordering,
table,
commonPartitionValues,
applyPartialClustering,
replicatePartitions)
with BasicScanExecTransformer {

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
Expand Down Expand Up @@ -137,7 +153,8 @@ class BatchScanExecTransformer(
new BatchScanExecTransformer(
canonicalized.output,
canonicalized.scan,
canonicalized.runtimeFilters
canonicalized.runtimeFilters,
canonicalized.table
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.util.collection.BitSet

import java.util.concurrent.TimeUnit.NANOSECONDS

import scala.collection.{mutable, JavaConverters}
import scala.collection.JavaConverters

class FileSourceScanExecTransformer(
@transient relation: HadoopFsRelation,
Expand Down Expand Up @@ -68,7 +68,7 @@ class FileSourceScanExecTransformer(
.genFileSourceScanTransformerMetrics(sparkContext) ++ staticMetrics

/** SQL metrics generated only for scans using dynamic partition pruning. */
private lazy val staticMetrics =
override protected lazy val staticMetrics =
if (partitionFilters.exists(FileSourceScanExecTransformer.isDynamicPruningFilter)) {
Map(
"staticFilesNum" -> SQLMetrics.createMetric(sparkContext, "static number of files read"),
Expand Down Expand Up @@ -162,19 +162,14 @@ class FileSourceScanExecTransformer(

// The codes below are copied from FileSourceScanExec in Spark,
// all of them are private.
protected lazy val driverMetrics: mutable.HashMap[String, Long] = mutable.HashMap.empty

/**
* Send the driver-side metrics. Before calling this function, selectedPartitions has been
* initialized. See SPARK-26327 for more details.
*/
protected def sendDriverMetrics(): Unit = {
driverMetrics.foreach(e => metrics(e._1).add(e._2))
override protected def sendDriverMetrics(): Unit = {
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkContext,
executionId,
metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, driverMetrics.values.toSeq)
}

protected def setFilesNumAndSizeMetric(
Expand All @@ -183,14 +178,14 @@ class FileSourceScanExecTransformer(
val filesNum = partitions.map(_.files.size.toLong).sum
val filesSize = partitions.map(_.files.map(_.getLen).sum).sum
if (!static || !partitionFilters.exists(FileSourceScanExecTransformer.isDynamicPruningFilter)) {
driverMetrics("numFiles") = filesNum
driverMetrics("filesSize") = filesSize
driverMetrics("numFiles").set(filesNum)
driverMetrics("filesSize").set(filesSize)
} else {
driverMetrics("staticFilesNum") = filesNum
driverMetrics("staticFilesSize") = filesSize
driverMetrics("staticFilesNum").set(filesNum)
driverMetrics("staticFilesSize").set(filesSize)
}
if (relation.partitionSchema.nonEmpty) {
driverMetrics("numPartitions") = partitions.length
driverMetrics("numPartitions").set(partitions.length)
}
}

Expand All @@ -203,13 +198,13 @@ class FileSourceScanExecTransformer(
dataFilters)
setFilesNumAndSizeMetric(ret, static = true)
ret
}(t => driverMetrics("metadataTime") = NANOSECONDS.toMillis(t + optimizerMetadataTimeNs))
}(t => driverMetrics("metadataTime").set(NANOSECONDS.toMillis(t + optimizerMetadataTimeNs)))
}.toArray

// We can only determine the actual partitions at runtime when a dynamic partition filter is
// present. This is because such a filter relies on information that is only available at run
// time (for instance the keys used in the other side of a join).
@transient lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = {
@transient override lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = {
val dynamicPartitionFilters =
partitionFilters.filter(FileSourceScanExecTransformer.isDynamicPruningFilter)
val selected = if (dynamicPartitionFilters.nonEmpty) {
Expand Down Expand Up @@ -242,7 +237,7 @@ class FileSourceScanExecTransformer(
val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
setFilesNumAndSizeMetric(ret, static = false)
ret
}(t => driverMetrics("pruningTime") = t)
}(t => driverMetrics("pruningTime").set(t))
} else {
selectedPartitions
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
import org.apache.spark.sql.execution.ScalarSubquery
import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null

object ExpressionMappings {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,8 @@ case class TransformPreOverrides(isAdaptiveContext: Boolean)
case _ =>
ExpressionConverter.transformDynamicPruningExpr(plan.runtimeFilters, reuseSubquery)
}
val transformer = new BatchScanExecTransformer(plan.output, plan.scan, newPartitionFilters)
val transformer =
new BatchScanExecTransformer(plan.output, plan.scan, newPartitionFilters, plan.table)
val validationResult = transformer.doValidate()
if (validationResult.isValid) {
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,11 @@ case class AddTransformHintRule() extends Rule[SparkPlan] {
TransformHints.tagTransformable(plan)
} else {
val transformer =
new BatchScanExecTransformer(plan.output, plan.scan, plan.runtimeFilters)
new BatchScanExecTransformer(
plan.output,
plan.scan,
plan.runtimeFilters,
plan.table)
TransformHints.tag(plan, transformer.doValidate().toTransformHint)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class GlutenDriverEndpoint extends IsolatedRpcEndpoint with Logging {
private val driverEndpoint: RpcEndpointRef =
rpcEnv.setupEndpoint(GlutenRpcConstants.GLUTEN_DRIVER_ENDPOINT_NAME, this)

// TODO(yuan): get thread cnt from spark context
override def threadCount(): Int = 1
override def receive: PartialFunction[Any, Unit] = {
case GlutenOnExecutionStart(executionId) =>
if (executionId == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
@volatile var driverEndpointRef: RpcEndpointRef = null

rpcEnv.setupEndpoint(GlutenRpcConstants.GLUTEN_EXECUTOR_ENDPOINT_NAME, this)

// TODO(yuan): get thread cnt from spark context
override def threadCount(): Int = 1
override def onStart(): Unit = {
rpcEnv
.asyncSetupEndpointRefByURI(driverUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ object SoftAffinityUtil extends LogLevelUtil with Logging {
// using SoftAffinityManager to generate target executors.
// Only using the first file to calculate the target executors
// Only get one file to calculate the target host
val file = filePartition.files.sortBy(_.filePath).head
val locations = SoftAffinityManager.askExecutors(file.filePath)
val file = filePartition.files.sortBy(_.filePath.toString).head
val locations = SoftAffinityManager.askExecutors(file.filePath.toString)
if (!locations.isEmpty) {
logOnLevel(
softAffinityLogLevel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.spark.sql.utils

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy

object DataSourceStrategyUtil {

Expand All @@ -28,6 +28,6 @@ object DataSourceStrategyUtil {
* Runtime filters usually contain a subquery that must be evaluated before the translation. If
* the underlying subquery hasn't completed yet, this method will throw an exception.
*/
def translateRuntimeFilter(expr: Expression): Option[Filter] =
DataSourceStrategy.translateRuntimeFilter(expr)
def translateRuntimeFilter(expr: Expression): Option[Predicate] =
DataSourceV2Strategy.translateRuntimeFilterV2(expr)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.glutenproject.softaffinity.scheduler.SoftAffinityListener
import io.glutenproject.substrait.plan.PlanBuilder

import org.apache.spark.SparkConf
import org.apache.spark.paths.SparkPath
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListenerExecutorRemoved}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.sql.QueryTest
Expand All @@ -42,8 +43,18 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate
val partition = FilePartition(
0,
Seq(
PartitionedFile(InternalRow.empty, "fakePath0", 0, 100, Array("host-1", "host-2")),
PartitionedFile(InternalRow.empty, "fakePath1", 0, 200, Array("host-2", "host-3"))
PartitionedFile(
InternalRow.empty,
SparkPath.fromPathString("fakePath0"),
0,
100,
Array("host-1", "host-2")),
PartitionedFile(
InternalRow.empty,
SparkPath.fromPathString("fakePath1"),
0,
200,
Array("host-2", "host-3"))
).toArray
)

Expand All @@ -59,8 +70,18 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate
val partition = FilePartition(
0,
Seq(
PartitionedFile(InternalRow.empty, "fakePath0", 0, 100, Array("host-1", "host-2")),
PartitionedFile(InternalRow.empty, "fakePath1", 0, 200, Array("host-4", "host-5"))
PartitionedFile(
InternalRow.empty,
SparkPath.fromPathString("fakePath0"),
0,
100,
Array("host-1", "host-2")),
PartitionedFile(
InternalRow.empty,
SparkPath.fromPathString("fakePath1"),
0,
200,
Array("host-4", "host-5"))
).toArray
)

Expand All @@ -77,8 +98,18 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate
val partition = FilePartition(
0,
Seq(
PartitionedFile(InternalRow.empty, "fakePath0", 0, 100, Array("host-1", "host-2")),
PartitionedFile(InternalRow.empty, "fakePath1", 0, 200, Array("host-5", "host-6"))
PartitionedFile(
InternalRow.empty,
SparkPath.fromPathString("fakePath0"),
0,
100,
Array("host-1", "host-2")),
PartitionedFile(
InternalRow.empty,
SparkPath.fromPathString("fakePath1"),
0,
200,
Array("host-5", "host-6"))
).toArray
)

Expand Down Expand Up @@ -107,8 +138,18 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate
val partition = FilePartition(
0,
Seq(
PartitionedFile(InternalRow.empty, "fakePath0", 0, 100, Array("host-1", "host-2")),
PartitionedFile(InternalRow.empty, "fakePath1", 0, 200, Array("host-5", "host-6"))
PartitionedFile(
InternalRow.empty,
SparkPath.fromPathString("fakePath0"),
0,
100,
Array("host-1", "host-2")),
PartitionedFile(
InternalRow.empty,
SparkPath.fromPathString("fakePath1"),
0,
200,
Array("host-5", "host-6"))
).toArray
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.PartitionIdPassthrough
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -210,7 +209,9 @@ object ExecUtil {
dependency
}
}

private[spark] class PartitionIdPassthrough(override val numPartitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
}
case class CloseablePairedColumnarBatchIterator(iter: Iterator[(Int, ColumnarBatch)])
extends Iterator[(Int, ColumnarBatch)]
with Logging {
Expand Down
Loading

0 comments on commit 4a4f0ff

Please sign in to comment.