From 0b34e8e941adc7eff84bba9eaaae6ea0e432ac77 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Mon, 1 Jul 2024 07:13:04 +0800 Subject: [PATCH] [GLUTEN-6253] Use internal udf config to avoid modify the original one (#6255) The current implementation sets spark.gluten.sql.columnar.backend.velox.udfLibraryPaths on driver side after resolving the library paths. This approach can overwrite the original settings with a local file path on the driver node before sending the SparkConf to all executors, and the executors on different nodes will fail while accessing that path. This PR sets the resolved library paths to an internal config to avoid the conflicts. Manually verified on a multi-node cluster. --- .../org/apache/gluten/backendsapi/velox/VeloxBackend.scala | 1 + .../scala/org/apache/spark/sql/expression/UDFResolver.scala | 3 ++- cpp/velox/config/VeloxConfig.h | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 82b45f2d4394..0238508d9699 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -65,6 +65,7 @@ object VeloxBackendSettings extends BackendSettingsApi { val GLUTEN_VELOX_UDF_LIB_PATHS = getBackendConfigPrefix() + ".udfLibraryPaths" val GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS = getBackendConfigPrefix() + ".driver.udfLibraryPaths" + val GLUTEN_VELOX_INTERNAL_UDF_LIB_PATHS = getBackendConfigPrefix() + ".internal.udfLibraryPaths" val MAXIMUM_BATCH_SIZE: Int = 32768 diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala index 8a549c9b4ea9..99f9faf9914a 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/expression/UDFResolver.scala @@ -231,8 +231,9 @@ object UDFResolver extends Logging { udfLibPaths match { case Some(paths) => + // Set resolved paths to the internal config to parse on native side. sparkConf.set( - VeloxBackendSettings.GLUTEN_VELOX_UDF_LIB_PATHS, + VeloxBackendSettings.GLUTEN_VELOX_INTERNAL_UDF_LIB_PATHS, getAllLibraries(sparkConf, isDriver, paths)) case None => } diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index f57f1293e22e..7a96f03f4985 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -99,7 +99,7 @@ const std::string kVeloxAsyncTimeoutOnTaskStopping = const int32_t kVeloxAsyncTimeoutOnTaskStoppingDefault = 30000; // 30s // udf -const std::string kVeloxUdfLibraryPaths = "spark.gluten.sql.columnar.backend.velox.udfLibraryPaths"; +const std::string kVeloxUdfLibraryPaths = "spark.gluten.sql.columnar.backend.velox.internal.udfLibraryPaths"; // backtrace allocation const std::string kBacktraceAllocation = "spark.gluten.backtrace.allocation";