Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Aug 16, 2024
1 parent 17f6232 commit 19d3313
Showing 1 changed file with 13 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ class VeloxListenerApi extends ListenerApi with Logging {
import VeloxListenerApi._

override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
val conf = pc.conf()

// Sql table cache serializer.
if (conf.getBoolean(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, defaultValue = false)) {
conf.set(
StaticSQLConf.SPARK_CACHE_SERIALIZER.key,
"org.apache.spark.sql.execution.ColumnarCachedBatchSerializer")
}

// Static initializers for driver.
if (!driverInitialized.compareAndSet(false, true)) {
// Make sure we call the static initializers only once.
logInfo(
Expand All @@ -48,14 +58,6 @@ class VeloxListenerApi extends ListenerApi with Logging {
return
}

// Static initializers for driver.
val conf = pc.conf()
// Sql table cache serializer.
if (conf.getBoolean(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, defaultValue = false)) {
conf.set(
StaticSQLConf.SPARK_CACHE_SERIALIZER.key,
"org.apache.spark.sql.execution.ColumnarCachedBatchSerializer")
}
SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = true)
initialize(conf)
Expand All @@ -64,14 +66,16 @@ class VeloxListenerApi extends ListenerApi with Logging {
override def onDriverShutdown(): Unit = shutdown()

override def onExecutorStart(pc: PluginContext): Unit = {
val conf = pc.conf()

// Static initializers for executor.
if (!executorInitialized.compareAndSet(false, true)) {
// Make sure we call the static initializers only once.
logInfo(
"Skip rerunning static initializers since they are only supposed to run once." +
" You see this message probably because you are creating a new SparkSession.")
return
}
val conf = pc.conf
if (inLocalMode(conf)) {
// Don't do static initializations from executor side in local mode.
// Driver already did that.
Expand All @@ -80,7 +84,6 @@ class VeloxListenerApi extends ListenerApi with Logging {
return
}

// Static initializers for executor.
SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = false)
initialize(conf)
Expand Down

0 comments on commit 19d3313

Please sign in to comment.