diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala index 335fb065f..ef67167c4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.comet.execution.shuffle +import java.util.Collections import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ @@ -61,7 +62,23 @@ class CometShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { private lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf) - override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf) + override val shuffleBlockResolver: IndexShuffleBlockResolver = { + // The patch versions of Spark 3.4 have different constructor signatures: + // See https://github.com/apache/spark/commit/5180694705be3508bd21dd9b863a59b8cb8ba193 + // We look for proper constructor by reflection. + classOf[IndexShuffleBlockResolver].getDeclaredConstructors + .filter(c => List(2, 3).contains(c.getParameterCount())) + .map { c => + c.getParameterCount match { + case 2 => + c.newInstance(conf, null).asInstanceOf[IndexShuffleBlockResolver] + case 3 => + c.newInstance(conf, null, Collections.emptyMap()) + .asInstanceOf[IndexShuffleBlockResolver] + } + } + .head + } /** * (override) Obtains a [[ShuffleHandle]] to pass to tasks.