From dfb765b3561dbeab149e46840ff55865d3faa244 Mon Sep 17 00:00:00 2001 From: liuneng1994 Date: Wed, 31 Jul 2024 15:52:09 +0800 Subject: [PATCH] fix bug when soft affinity enable --- .../commands/GlutenCHCacheDataCommand.scala | 32 ++++++++++++++----- ...enClickHouseMergeTreeCacheDataSSuite.scala | 1 + 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala index 06a5779024c8b..333b88b064da7 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.commands +import org.apache.gluten.exception.GlutenException import org.apache.gluten.expression.ConverterUtils import org.apache.gluten.substrait.rel.ExtensionTableBuilder @@ -27,6 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, GreaterThanOrEqual, IsNotNull, Literal} import org.apache.spark.sql.delta._ import org.apache.spark.sql.execution.command.LeafRunnableCommand +import org.apache.spark.sql.execution.commands.GlutenCHCacheDataCommand.toExecutorId import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.types.{BooleanType, StringType} import org.apache.spark.util.ThreadUtils @@ -229,10 +231,14 @@ case class GlutenCHCacheDataCommand( if (asynExecute) { executorIdsToParts.foreach( value => { - val executorData = GlutenDriverEndpoint.executorDataMap.get(value._1) + val executorData = GlutenDriverEndpoint.executorDataMap.get(toExecutorId(value._1)) if (executorData != null) { executorData.executorEndpointRef.send( GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava)) + } else { + throw new GlutenException( + s"executor ${value._1} not found," + + s" all executors are ${GlutenDriverEndpoint.executorDataMap.toString}") } }) Seq(Row(true, "")) @@ -241,13 +247,18 @@ case class GlutenCHCacheDataCommand( val resultList = ArrayBuffer[CacheLoadResult]() executorIdsToParts.foreach( value => { - val executorData = GlutenDriverEndpoint.executorDataMap.get(value._1) - if (executorData != null) { - futureList.append( - executorData.executorEndpointRef.ask[CacheLoadResult]( - GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava) - )) - } + val executorData = GlutenDriverEndpoint.executorDataMap.get(toExecutorId(value._1)) + if (executorData == null) + if (executorData != null) { + futureList.append( + executorData.executorEndpointRef.ask[CacheLoadResult]( + GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava) + )) + } else { + throw new GlutenException( + s"executor ${value._1} not found," + + s" all executors are ${GlutenDriverEndpoint.executorDataMap.toString}") + } }) futureList.foreach( f => { @@ -265,4 +276,9 @@ case class GlutenCHCacheDataCommand( object GlutenCHCacheDataCommand { val ALL_EXECUTORS = "allExecutors" + + private def toExecutorId(executorId: String): String = { + val parts = executorId.split("_") + parts(parts.size - 1) + } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala index dbe7aec531be3..40973edcd9718 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala @@ -54,6 +54,7 @@ class GlutenClickHouseMergeTreeCacheDataSSuite .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error") + .set("spark.gluten.soft-affinity.enabled", "true") .set( "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", "false")