From e22bb12a4b5fe2ff1df8d78a0a7688243e034830 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 22 Aug 2024 11:29:57 +0800 Subject: [PATCH] fixup --- .../org/apache/gluten/sql/shims/spark35/Spark35Shims.scala | 3 +-- .../src/main/scala/org/apache/spark/ShuffleUtils.scala | 7 +++---- .../spark/sql/catalyst/expressions/PromotePrecision.scala | 3 --- .../spark/sql/execution/ExpandOutputPartitioningShim.scala | 1 - .../spark/sql/execution/FileSourceScanExecShim.scala | 6 +++--- .../sql/execution/datasources/v2/BatchScanExecShim.scala | 4 +--- 6 files changed, 8 insertions(+), 16 deletions(-) diff --git a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala index 93785d7a26595..6474c74fe8f3b 100644 --- a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala +++ b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala @@ -353,8 +353,7 @@ class Spark35Shims extends SparkShims { startMapIndex: Int, endMapIndex: Int, startPartition: Int, - endPartition: Int) - : Tuple2[Iterator[(BlockManagerId, collection.Seq[(BlockId, Long, Int)])], Boolean] = { + endPartition: Int): Tuple2[Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])], Boolean] = { ShuffleUtils.getReaderParam(handle, startMapIndex, endMapIndex, startPartition, endPartition) } diff --git a/shims/spark35/src/main/scala/org/apache/spark/ShuffleUtils.scala b/shims/spark35/src/main/scala/org/apache/spark/ShuffleUtils.scala index a08f0310d14b1..c2a6cd5cffc1b 100644 --- a/shims/spark35/src/main/scala/org/apache/spark/ShuffleUtils.scala +++ b/shims/spark35/src/main/scala/org/apache/spark/ShuffleUtils.scala @@ -25,8 +25,7 @@ object ShuffleUtils { startMapIndex: Int, endMapIndex: Int, startPartition: Int, - endPartition: Int) - : Tuple2[Iterator[(BlockManagerId, collection.Seq[(BlockId, Long, Int)])], Boolean] = { + endPartition: Int): Tuple2[Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])], Boolean] = { val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]] if (baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked) { val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId( @@ -35,7 +34,7 @@ object ShuffleUtils { endMapIndex, startPartition, endPartition) - (res.iter, res.enableBatchFetch) + (res.iter.map(b => (b._1, b._2.toSeq)), res.enableBatchFetch) } else { val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( handle.shuffleId, @@ -43,7 +42,7 @@ object ShuffleUtils { endMapIndex, startPartition, endPartition) - (address, true) + (address.map(b => (b._1, b._2.toSeq)), true) } } } diff --git a/shims/spark35/src/main/scala/org/apache/spark/sql/catalyst/expressions/PromotePrecision.scala b/shims/spark35/src/main/scala/org/apache/spark/sql/catalyst/expressions/PromotePrecision.scala index 8de5a07fe045e..b18a79b864e20 100644 --- a/shims/spark35/src/main/scala/org/apache/spark/sql/catalyst/expressions/PromotePrecision.scala +++ b/shims/spark35/src/main/scala/org/apache/spark/sql/catalyst/expressions/PromotePrecision.scala @@ -18,9 +18,6 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.expressions.codegen.Block._ -import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ case class PromotePrecision(child: Expression) extends UnaryExpression { diff --git a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/ExpandOutputPartitioningShim.scala b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/ExpandOutputPartitioningShim.scala index 40fc16d2ed956..791490064a967 100644 --- a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/ExpandOutputPartitioningShim.scala +++ b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/ExpandOutputPartitioningShim.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.{InnerLike, JoinType} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioningLike, Partitioning, PartitioningCollection} import scala.collection.mutable diff --git a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala index c8795e31ceb43..5ec4499ec6e93 100644 --- a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala +++ b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala @@ -21,9 +21,9 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, DynamicPruningExpression, Expression, FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, FileSourceMetadataAttribute, PlanExpression, Predicate} -import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, PartitionDirectory} -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetUtils} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BoundReference, Expression, FileSourceConstantMetadataAttribute, FileSourceGeneratedMetadataAttribute, PlanExpression, Predicate} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory} +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType import org.apache.spark.util.collection.BitSet diff --git a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala index 343070d7f209e..fb3078d397fbf 100644 --- a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala +++ b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExecShim.scala @@ -24,9 +24,7 @@ import org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.expressions.aggregate.Aggregation -import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, Scan} -import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, Scan, SupportsRuntimeV2Filtering} import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.execution.metric.SQLMetric