Skip to content

Commit

Permalink
fix bug when soft affinity enable
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Jul 31, 2024
1 parent 26ea59d commit dfb765b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.commands

import org.apache.gluten.exception.GlutenException
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.substrait.rel.ExtensionTableBuilder

Expand All @@ -27,6 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, GreaterThanOrEqual, IsNotNull, Literal}
import org.apache.spark.sql.delta._
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.commands.GlutenCHCacheDataCommand.toExecutorId
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.types.{BooleanType, StringType}
import org.apache.spark.util.ThreadUtils
Expand Down Expand Up @@ -229,10 +231,14 @@ case class GlutenCHCacheDataCommand(
if (asynExecute) {
executorIdsToParts.foreach(
value => {
val executorData = GlutenDriverEndpoint.executorDataMap.get(value._1)
val executorData = GlutenDriverEndpoint.executorDataMap.get(toExecutorId(value._1))
if (executorData != null) {
executorData.executorEndpointRef.send(
GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava))
} else {
throw new GlutenException(
s"executor ${value._1} not found," +
s" all executors are ${GlutenDriverEndpoint.executorDataMap.toString}")
}
})
Seq(Row(true, ""))
Expand All @@ -241,13 +247,18 @@ case class GlutenCHCacheDataCommand(
val resultList = ArrayBuffer[CacheLoadResult]()
executorIdsToParts.foreach(
value => {
val executorData = GlutenDriverEndpoint.executorDataMap.get(value._1)
if (executorData != null) {
futureList.append(
executorData.executorEndpointRef.ask[CacheLoadResult](
GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava)
))
}
val executorData = GlutenDriverEndpoint.executorDataMap.get(toExecutorId(value._1))
if (executorData == null)
if (executorData != null) {
futureList.append(
executorData.executorEndpointRef.ask[CacheLoadResult](
GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava)
))
} else {
throw new GlutenException(
s"executor ${value._1} not found," +
s" all executors are ${GlutenDriverEndpoint.executorDataMap.toString}")
}
})
futureList.foreach(
f => {
Expand All @@ -265,4 +276,9 @@ case class GlutenCHCacheDataCommand(

object GlutenCHCacheDataCommand {
val ALL_EXECUTORS = "allExecutors"

private def toExecutorId(executorId: String): String = {
val parts = executorId.split("_")
parts(parts.size - 1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class GlutenClickHouseMergeTreeCacheDataSSuite
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error")
.set("spark.gluten.soft-affinity.enabled", "true")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
Expand Down

0 comments on commit dfb765b

Please sign in to comment.