Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Aug 22, 2024
1 parent 1ad26c2 commit e22bb12
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,7 @@ class Spark35Shims extends SparkShims {
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int)
: Tuple2[Iterator[(BlockManagerId, collection.Seq[(BlockId, Long, Int)])], Boolean] = {
endPartition: Int): Tuple2[Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])], Boolean] = {
ShuffleUtils.getReaderParam(handle, startMapIndex, endMapIndex, startPartition, endPartition)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ object ShuffleUtils {
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int)
: Tuple2[Iterator[(BlockManagerId, collection.Seq[(BlockId, Long, Int)])], Boolean] = {
endPartition: Int): Tuple2[Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])], Boolean] = {
val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
if (baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked) {
val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
Expand All @@ -35,15 +34,15 @@ object ShuffleUtils {
endMapIndex,
startPartition,
endPartition)
(res.iter, res.enableBatchFetch)
(res.iter.map(b => (b._1, b._2.toSeq)), res.enableBatchFetch)
} else {
val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId,
startMapIndex,
endMapIndex,
startPartition,
endPartition)
(address, true)
(address.map(b => (b._1, b._2.toSeq)), true)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

case class PromotePrecision(child: Expression) extends UnaryExpression {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioningLike, Partitioning, PartitioningCollection}

import scala.collection.mutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, DynamicPruningExpression, Expression, FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, FileSourceMetadataAttribute, PlanExpression, Predicate}
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetUtils}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, Expression, FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, PlanExpression, Predicate}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning
import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, Scan}
import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, Scan, SupportsRuntimeV2Filtering}
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down

0 comments on commit e22bb12

Please sign in to comment.