From 5cbca3691364319c5d928beeec72396dbb5e23b3 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 15 Aug 2024 13:25:49 +0800 Subject: [PATCH] fixup --- .../backendsapi/velox/VeloxListenerApi.scala | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala index dc81ec65ba42..1519ecb8ab22 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala @@ -34,12 +34,28 @@ import org.apache.spark.util.SparkDirectoryUtil import org.apache.commons.lang3.StringUtils +import java.util.concurrent.atomic.AtomicBoolean + class VeloxListenerApi extends ListenerApi with Logging { import VeloxListenerApi._ + // TODO: Implement graceful shutdown and remove these flags. + // As spark conf may change when active Spark session is recreated. + private val driverInitialized: AtomicBoolean = new AtomicBoolean(false) + private val executorInitialized: AtomicBoolean = new AtomicBoolean(false) + override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = { + if (!driverInitialized.compareAndSet(false, true)) { + // Make sure we call the static initializers only once. + logInfo( + "Skip rerunning static initializers since they are already called." + + " It's probably because you are creating a new SparkSession.") + return + } + + // Static initializers for driver. val conf = pc.conf() - // sql table cache serializer + // Sql table cache serializer. if (conf.getBoolean(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, defaultValue = false)) { conf.set( StaticSQLConf.SPARK_CACHE_SERIALIZER.key, @@ -53,6 +69,13 @@ class VeloxListenerApi extends ListenerApi with Logging { override def onDriverShutdown(): Unit = shutdown() override def onExecutorStart(pc: PluginContext): Unit = { + if (!executorInitialized.compareAndSet(false, true)) { + // Make sure we call the static initializers only once. + logInfo( + "Skip rerunning static initializers since they are already called." + + " It's probably because you are creating a new SparkSession.") + return + } val conf = pc.conf if (inLocalMode(conf)) { // Don't do static initializations from executor side in local mode. @@ -61,6 +84,8 @@ class VeloxListenerApi extends ListenerApi with Logging { "Gluten is running with Spark local mode. Skip running static initializer for executor.") return } + + // Static initializers for executor. SparkDirectoryUtil.init(conf) UDFResolver.resolveUdfConf(conf, isDriver = false) initialize(conf)