Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Aug 15, 2024
1 parent 721323b commit 5cbca36
Showing 1 changed file with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,28 @@ import org.apache.spark.util.SparkDirectoryUtil

import org.apache.commons.lang3.StringUtils

import java.util.concurrent.atomic.AtomicBoolean

class VeloxListenerApi extends ListenerApi with Logging {
import VeloxListenerApi._

// TODO: Implement graceful shutdown and remove these flags.
// As spark conf may change when active Spark session is recreated.
private val driverInitialized: AtomicBoolean = new AtomicBoolean(false)
private val executorInitialized: AtomicBoolean = new AtomicBoolean(false)

override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
if (!driverInitialized.compareAndSet(false, true)) {
// Make sure we call the static initializers only once.
logInfo(
"Skip rerunning static initializers since they are already called." +
" It's probably because you are creating a new SparkSession.")
return
}

// Static initializers for driver.
val conf = pc.conf()
// sql table cache serializer
// Sql table cache serializer.
if (conf.getBoolean(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, defaultValue = false)) {
conf.set(
StaticSQLConf.SPARK_CACHE_SERIALIZER.key,
Expand All @@ -53,6 +69,13 @@ class VeloxListenerApi extends ListenerApi with Logging {
override def onDriverShutdown(): Unit = shutdown()

override def onExecutorStart(pc: PluginContext): Unit = {
if (!executorInitialized.compareAndSet(false, true)) {
// Make sure we call the static initializers only once.
logInfo(
"Skip rerunning static initializers since they are already called." +
" It's probably because you are creating a new SparkSession.")
return
}
val conf = pc.conf
if (inLocalMode(conf)) {
// Don't do static initializations from executor side in local mode.
Expand All @@ -61,6 +84,8 @@ class VeloxListenerApi extends ListenerApi with Logging {
"Gluten is running with Spark local mode. Skip running static initializer for executor.")
return
}

// Static initializers for executor.
SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = false)
initialize(conf)
Expand Down

0 comments on commit 5cbca36

Please sign in to comment.