Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6849][VL] Call static initializers once in Spark local mode / when session is renewed #6855

Merged
merged 20 commits into from
Aug 19, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.apache.gluten.backendsapi.velox

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.ListenerApi
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.datasource.{GlutenOrcWriterInjects, GlutenParquetWriterInjects, GlutenRowSplitter}
import org.apache.gluten.expression.UDFMappings
import org.apache.gluten.init.NativeBackendInitializer
Expand All @@ -27,138 +26,73 @@ import org.apache.gluten.vectorized.{JniLibLoader, JniWorkspace}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects, VeloxParquetWriterInjects, VeloxRowSplitter}
import org.apache.spark.sql.expression.UDFResolver
import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf}
import org.apache.spark.util.SparkDirectoryUtil
import org.apache.spark.util.{SparkDirectoryUtil, SparkResourceUtil}

import org.apache.commons.lang3.StringUtils

import scala.sys.process._
import java.util.concurrent.atomic.AtomicBoolean

class VeloxListenerApi extends ListenerApi {
private val ARROW_VERSION = "1500"
class VeloxListenerApi extends ListenerApi with Logging {
import VeloxListenerApi._

override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
if (!driverInitialized.compareAndSet(false, true)) {
// Make sure we call the static initializers only once.
logInfo(
"Skip rerunning static initializers since they are only supposed to run once." +
" You see this message probably because you are creating a new SparkSession.")
Comment on lines +53 to +57
Copy link
Contributor

@zml1206 zml1206 Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t quite understand that onDriverStart will be called multiple times. Can you explain in detail?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It happens in Spark local mode. Spark creates one driver and one executor in that mode, in the current process.

Copy link
Contributor

@zml1206 zml1206 Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this. It should be once for onDriverStart and once for onExecutorStart, but not onDriverStart is called twice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, got it wrong.

onDriverStart will be called twice when spark session is recreated.
onExecutorStart may be called twice when dynamic allocation is enabled, I am not sure about this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give an example of how spark session is recreated, cloneSession or other?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About onExecutorStart, I think it will not called twice because dynamic allocation add new executor is a new jvm.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give an example of how spark session is recreated, cloneSession or other?

Please refer to SparkSession.stop or SparkContext.stop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If just re-create sparkSession, it will not restart the driver. Re-creating sparkContext will restart the driver, but a new sparkConf may be set, so is it better to re-initialize it once?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, finally we should remove the flags and do re-initializations. See my comment and the issue #6862

return
}

// Static initializers for driver.
val conf = pc.conf()
// sql table cache serializer
// Sql table cache serializer.
if (conf.getBoolean(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, defaultValue = false)) {
conf.set(
StaticSQLConf.SPARK_CACHE_SERIALIZER.key,
"org.apache.spark.sql.execution.ColumnarCachedBatchSerializer")
}
initialize(conf, isDriver = true)
SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = true)
initialize(conf)
}

override def onDriverShutdown(): Unit = shutdown()

override def onExecutorStart(pc: PluginContext): Unit = {
initialize(pc.conf(), isDriver = false)
}

override def onExecutorShutdown(): Unit = shutdown()

private def getLibraryLoaderForOS(
systemName: String,
systemVersion: String,
system: String): SharedLibraryLoader = {
if (systemName.contains("Ubuntu") && systemVersion.startsWith("20.04")) {
new SharedLibraryLoaderUbuntu2004
} else if (systemName.contains("Ubuntu") && systemVersion.startsWith("22.04")) {
new SharedLibraryLoaderUbuntu2204
} else if (systemName.contains("CentOS") && systemVersion.startsWith("9")) {
new SharedLibraryLoaderCentos9
} else if (systemName.contains("CentOS") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("CentOS") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("3")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("2")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Anolis") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Anolis") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("2.4")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("3.2")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("9")) {
new SharedLibraryLoaderCentos9
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Debian") && systemVersion.startsWith("11")) {
new SharedLibraryLoaderDebian11
} else if (systemName.contains("Debian") && systemVersion.startsWith("12")) {
new SharedLibraryLoaderDebian12
} else {
throw new GlutenException(
s"Found unsupported OS($systemName, $systemVersion)! Currently, Gluten's Velox backend" +
" only supports Ubuntu 20.04/22.04, CentOS 7/8, " +
"Alibaba Cloud Linux 2/3 & Anolis 7/8, tencentos 2.4/3.2, RedHat 7/8, " +
"Debian 11/12.")
if (!executorInitialized.compareAndSet(false, true)) {
// Make sure we call the static initializers only once.
logInfo(
"Skip rerunning static initializers since they are only supposed to run once." +
" You see this message probably because you are creating a new SparkSession.")
return
}
}

private def loadLibFromJar(load: JniLibLoader, conf: SparkConf): Unit = {
val systemName = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS)
val loader = if (systemName.isDefined) {
val systemVersion = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION)
if (systemVersion.isEmpty) {
throw new GlutenException(
s"${GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION} must be specified when specifies the " +
s"${GlutenConfig.GLUTEN_LOAD_LIB_OS}")
}
getLibraryLoaderForOS(systemName.get, systemVersion.get, "")
} else {
val system = "cat /etc/os-release".!!
val systemNamePattern = "^NAME=\"?(.*)\"?".r
val systemVersionPattern = "^VERSION=\"?(.*)\"?".r
val systemInfoLines = system.stripMargin.split("\n")
val systemNamePattern(systemName) =
systemInfoLines.find(_.startsWith("NAME=")).getOrElse("")
val systemVersionPattern(systemVersion) =
systemInfoLines.find(_.startsWith("VERSION=")).getOrElse("")
if (systemName.isEmpty || systemVersion.isEmpty) {
throw new GlutenException("Failed to get OS name and version info.")
}
getLibraryLoaderForOS(systemName, systemVersion, system)
val conf = pc.conf
if (inLocalMode(conf)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can directly call SparkResourceUtil.isLocalMaster?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am OK to both... Keeping it would shorten the calling code a little bit

// Don't do static initializations from executor side in local mode.
// Driver already did that.
logInfo(
"Gluten is running with Spark local mode. Skip running static initializer for executor.")
return
}
loader.loadLib(load)
}

private def loadLibWithLinux(conf: SparkConf, loader: JniLibLoader): Unit = {
if (
conf.getBoolean(
GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR,
GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR_DEFAULT)
) {
loadLibFromJar(loader, conf)
}
// Static initializers for executor.
SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = false)
initialize(conf)
}

private def loadLibWithMacOS(loader: JniLibLoader): Unit = {
// Placeholder for loading shared libs on MacOS if user needs.
}
override def onExecutorShutdown(): Unit = shutdown()

private def initialize(conf: SparkConf, isDriver: Boolean): Unit = {
SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = isDriver)
private def initialize(conf: SparkConf): Unit = {
if (conf.getBoolean(GlutenConfig.GLUTEN_DEBUG_KEEP_JNI_WORKSPACE, defaultValue = false)) {
val debugDir = conf.get(GlutenConfig.GLUTEN_DEBUG_KEEP_JNI_WORKSPACE_DIR)
JniWorkspace.enableDebug(debugDir)
}
val loader = JniWorkspace.getDefault.libLoader

val osName = System.getProperty("os.name")
if (osName.startsWith("Mac OS X") || osName.startsWith("macOS")) {
loadLibWithMacOS(loader)
} else {
loadLibWithLinux(conf, loader)
}

// Set the system properties.
// Use appending policy for children with the same name in a arrow struct vector.
Expand All @@ -167,6 +101,13 @@ class VeloxListenerApi extends ListenerApi {
// Load supported hive/python/scala udfs
UDFMappings.loadFromSparkConf(conf)

// Initial library loader.
val loader = JniWorkspace.getDefault.libLoader

// Load shared native libraries the backend libraries depend on.
SharedLibraryLoader.load(conf, loader)

// Load backend libraries.
val libPath = conf.get(GlutenConfig.GLUTEN_LIB_PATH, StringUtils.EMPTY)
if (StringUtils.isNotBlank(libPath)) { // Path based load. Ignore all other loadees.
JniLibLoader.loadFromPath(libPath, false)
Expand All @@ -176,11 +117,11 @@ class VeloxListenerApi extends ListenerApi {
loader.mapAndLoad(VeloxBackend.BACKEND_NAME, false)
}

// Initial native backend with configurations.
val parsed = GlutenConfigUtil.parseConfig(conf.getAll.toMap)
NativeBackendInitializer.initializeBackend(parsed)

// inject backend-specific implementations to override spark classes
// FIXME: The following set instances twice in local mode?
// Inject backend-specific implementations to override spark classes.
GlutenParquetWriterInjects.setInstance(new VeloxParquetWriterInjects())
GlutenOrcWriterInjects.setInstance(new VeloxOrcWriterInjects())
GlutenRowSplitter.setInstance(new VeloxRowSplitter())
Expand All @@ -191,4 +132,13 @@ class VeloxListenerApi extends ListenerApi {
}
}

object VeloxListenerApi {}
object VeloxListenerApi {
// TODO: Implement graceful shutdown and remove these flags.
// As spark conf may change when active Spark session is recreated.
private val driverInitialized: AtomicBoolean = new AtomicBoolean(false)
private val executorInitialized: AtomicBoolean = new AtomicBoolean(false)

private def inLocalMode(conf: SparkConf): Boolean = {
jackylee-ch marked this conversation as resolved.
Show resolved Hide resolved
SparkResourceUtil.isLocalMaster(conf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,112 @@
*/
package org.apache.gluten.utils

import org.apache.gluten.GlutenConfig
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.vectorized.JniLibLoader

import org.apache.spark.SparkConf

import scala.sys.process._

trait SharedLibraryLoader {
def loadLib(loader: JniLibLoader): Unit
}

object SharedLibraryLoader {
def load(conf: SparkConf, jni: JniLibLoader): Unit = {
val shouldLoad = conf.getBoolean(
GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR,
GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR_DEFAULT)
if (!shouldLoad) {
return
}
val osName = System.getProperty("os.name")
if (osName.startsWith("Mac OS X") || osName.startsWith("macOS")) {
loadLibWithMacOS(jni)
} else {
loadLibWithLinux(conf, jni)
}
}

private def loadLibWithLinux(conf: SparkConf, jni: JniLibLoader): Unit = {
val loader = find(conf)
loader.loadLib(jni)
}

private def loadLibWithMacOS(jni: JniLibLoader): Unit = {
// Placeholder for loading shared libs on MacOS if user needs.
}

private def find(conf: SparkConf): SharedLibraryLoader = {
val systemName = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS)
val loader = if (systemName.isDefined) {
val systemVersion = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION)
if (systemVersion.isEmpty) {
throw new GlutenException(
s"${GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION} must be specified when specifies the " +
s"${GlutenConfig.GLUTEN_LOAD_LIB_OS}")
}
getForOS(systemName.get, systemVersion.get, "")
} else {
val system = "cat /etc/os-release".!!
val systemNamePattern = "^NAME=\"?(.*)\"?".r
val systemVersionPattern = "^VERSION=\"?(.*)\"?".r
val systemInfoLines = system.stripMargin.split("\n")
val systemNamePattern(systemName) =
systemInfoLines.find(_.startsWith("NAME=")).getOrElse("")
val systemVersionPattern(systemVersion) =
systemInfoLines.find(_.startsWith("VERSION=")).getOrElse("")
if (systemName.isEmpty || systemVersion.isEmpty) {
throw new GlutenException("Failed to get OS name and version info.")
}
getForOS(systemName, systemVersion, system)
}
loader
}

private def getForOS(
systemName: String,
systemVersion: String,
system: String): SharedLibraryLoader = {
if (systemName.contains("Ubuntu") && systemVersion.startsWith("20.04")) {
new SharedLibraryLoaderUbuntu2004
} else if (systemName.contains("Ubuntu") && systemVersion.startsWith("22.04")) {
new SharedLibraryLoaderUbuntu2204
} else if (systemName.contains("CentOS") && systemVersion.startsWith("9")) {
new SharedLibraryLoaderCentos9
} else if (systemName.contains("CentOS") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("CentOS") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("3")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("2")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Anolis") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Anolis") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("2.4")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("3.2")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("9")) {
new SharedLibraryLoaderCentos9
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Debian") && systemVersion.startsWith("11")) {
new SharedLibraryLoaderDebian11
} else if (systemName.contains("Debian") && systemVersion.startsWith("12")) {
new SharedLibraryLoaderDebian12
} else {
throw new GlutenException(
s"Found unsupported OS($systemName, $systemVersion)! Currently, Gluten's Velox backend" +
" only supports Ubuntu 20.04/22.04, CentOS 7/8, " +
"Alibaba Cloud Linux 2/3 & Anolis 7/8, tencentos 2.4/3.2, RedHat 7/8, " +
"Debian 11/12.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ private class CHCelebornColumnarBatchSerializerInstance(
private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf)
private lazy val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT)
private lazy val compressionLevel =
GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec,
GlutenShuffleUtils.getCompressionLevel(
conf,
compressionCodec,
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)

override def deserializeStream(in: InputStream): DeserializationStream = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ object SparkDirectoryUtil extends Logging {
return
}
if (INSTANCE.roots.toSet != roots.toSet) {
logWarning(
throw new IllegalArgumentException(
s"Reinitialize SparkDirectoryUtil with different root dirs: old: ${INSTANCE.ROOTS
.mkString("Array(", ", ", ")")}, new: ${roots.mkString("Array(", ", ", ")")}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,8 @@ object SparkResourceUtil extends Logging {
val taskCores = conf.getInt("spark.task.cpus", 1)
executorCores / taskCores
}

def isLocalMaster(conf: SparkConf): Boolean = {
Utils.isLocalMaster(conf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -540,14 +540,14 @@ object GlutenConfig {
val GLUTEN_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"

// For Soft Affinity Scheduling
// Enable Soft Affinity Scheduling, defalut value is false
// Enable Soft Affinity Scheduling, default value is false
val GLUTEN_SOFT_AFFINITY_ENABLED = "spark.gluten.soft-affinity.enabled"
val GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE = false
// Calculate the number of the replcations for scheduling to the target executors per file
// Calculate the number of the replications for scheduling to the target executors per file
val GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM = "spark.gluten.soft-affinity.replications.num"
val GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM_DEFAULT_VALUE = 2
// For on HDFS, if there are already target hosts,
// and then prefer to use the orginal target hosts to schedule
// and then prefer to use the original target hosts to schedule
val GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS = "spark.gluten.soft-affinity.min.target-hosts"
val GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS_DEFAULT_VALUE = 1

Expand Down
Loading