Skip to content

Commit

Permalink
[GLUTEN-6849][VL] Call static initializers once in Spark local mode /…
Browse files Browse the repository at this point in the history
… when session is renewed (apache#6855)
  • Loading branch information
zhztheplayer authored and shamirchen committed Oct 14, 2024
1 parent 8ded15c commit bb87635
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.apache.gluten.backendsapi.velox

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.ListenerApi
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.datasource.{GlutenOrcWriterInjects, GlutenParquetWriterInjects, GlutenRowSplitter}
import org.apache.gluten.expression.UDFMappings
import org.apache.gluten.init.NativeBackendInitializer
Expand All @@ -27,138 +26,76 @@ import org.apache.gluten.vectorized.{JniLibLoader, JniWorkspace}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects, VeloxParquetWriterInjects, VeloxRowSplitter}
import org.apache.spark.sql.expression.UDFResolver
import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf}
import org.apache.spark.util.SparkDirectoryUtil
import org.apache.spark.util.{SparkDirectoryUtil, SparkResourceUtil}

import org.apache.commons.lang3.StringUtils

import scala.sys.process._
import java.util.concurrent.atomic.AtomicBoolean

class VeloxListenerApi extends ListenerApi {
private val ARROW_VERSION = "1500"
class VeloxListenerApi extends ListenerApi with Logging {
import VeloxListenerApi._

override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
val conf = pc.conf()
// sql table cache serializer

// Sql table cache serializer.
if (conf.getBoolean(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, defaultValue = false)) {
conf.set(
StaticSQLConf.SPARK_CACHE_SERIALIZER.key,
"org.apache.spark.sql.execution.ColumnarCachedBatchSerializer")
}
initialize(conf, isDriver = true)

// Static initializers for driver.
if (!driverInitialized.compareAndSet(false, true)) {
// Make sure we call the static initializers only once.
logInfo(
"Skip rerunning static initializers since they are only supposed to run once." +
" You see this message probably because you are creating a new SparkSession.")
return
}

SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = true)
initialize(conf)
}

override def onDriverShutdown(): Unit = shutdown()

override def onExecutorStart(pc: PluginContext): Unit = {
initialize(pc.conf(), isDriver = false)
}

override def onExecutorShutdown(): Unit = shutdown()
val conf = pc.conf()

private def getLibraryLoaderForOS(
systemName: String,
systemVersion: String,
system: String): SharedLibraryLoader = {
if (systemName.contains("Ubuntu") && systemVersion.startsWith("20.04")) {
new SharedLibraryLoaderUbuntu2004
} else if (systemName.contains("Ubuntu") && systemVersion.startsWith("22.04")) {
new SharedLibraryLoaderUbuntu2204
} else if (systemName.contains("CentOS") && systemVersion.startsWith("9")) {
new SharedLibraryLoaderCentos9
} else if (systemName.contains("CentOS") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("CentOS") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("3")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("2")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Anolis") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Anolis") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("2.4")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("3.2")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("9")) {
new SharedLibraryLoaderCentos9
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Debian") && systemVersion.startsWith("11")) {
new SharedLibraryLoaderDebian11
} else if (systemName.contains("Debian") && systemVersion.startsWith("12")) {
new SharedLibraryLoaderDebian12
} else {
throw new GlutenException(
s"Found unsupported OS($systemName, $systemVersion)! Currently, Gluten's Velox backend" +
" only supports Ubuntu 20.04/22.04, CentOS 7/8, " +
"Alibaba Cloud Linux 2/3 & Anolis 7/8, tencentos 2.4/3.2, RedHat 7/8, " +
"Debian 11/12.")
// Static initializers for executor.
if (!executorInitialized.compareAndSet(false, true)) {
// Make sure we call the static initializers only once.
logInfo(
"Skip rerunning static initializers since they are only supposed to run once." +
" You see this message probably because you are creating a new SparkSession.")
return
}
}

private def loadLibFromJar(load: JniLibLoader, conf: SparkConf): Unit = {
val systemName = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS)
val loader = if (systemName.isDefined) {
val systemVersion = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION)
if (systemVersion.isEmpty) {
throw new GlutenException(
s"${GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION} must be specified when specifies the " +
s"${GlutenConfig.GLUTEN_LOAD_LIB_OS}")
}
getLibraryLoaderForOS(systemName.get, systemVersion.get, "")
} else {
val system = "cat /etc/os-release".!!
val systemNamePattern = "^NAME=\"?(.*)\"?".r
val systemVersionPattern = "^VERSION=\"?(.*)\"?".r
val systemInfoLines = system.stripMargin.split("\n")
val systemNamePattern(systemName) =
systemInfoLines.find(_.startsWith("NAME=")).getOrElse("")
val systemVersionPattern(systemVersion) =
systemInfoLines.find(_.startsWith("VERSION=")).getOrElse("")
if (systemName.isEmpty || systemVersion.isEmpty) {
throw new GlutenException("Failed to get OS name and version info.")
}
getLibraryLoaderForOS(systemName, systemVersion, system)
if (inLocalMode(conf)) {
// Don't do static initializations from executor side in local mode.
// Driver already did that.
logInfo(
"Gluten is running with Spark local mode. Skip running static initializer for executor.")
return
}
loader.loadLib(load)
}

private def loadLibWithLinux(conf: SparkConf, loader: JniLibLoader): Unit = {
if (
conf.getBoolean(
GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR,
GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR_DEFAULT)
) {
loadLibFromJar(loader, conf)
}
SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = false)
initialize(conf)
}

private def loadLibWithMacOS(loader: JniLibLoader): Unit = {
// Placeholder for loading shared libs on MacOS if user needs.
}
override def onExecutorShutdown(): Unit = shutdown()

private def initialize(conf: SparkConf, isDriver: Boolean): Unit = {
SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = isDriver)
private def initialize(conf: SparkConf): Unit = {
if (conf.getBoolean(GlutenConfig.GLUTEN_DEBUG_KEEP_JNI_WORKSPACE, defaultValue = false)) {
val debugDir = conf.get(GlutenConfig.GLUTEN_DEBUG_KEEP_JNI_WORKSPACE_DIR)
JniWorkspace.enableDebug(debugDir)
}
val loader = JniWorkspace.getDefault.libLoader

val osName = System.getProperty("os.name")
if (osName.startsWith("Mac OS X") || osName.startsWith("macOS")) {
loadLibWithMacOS(loader)
} else {
loadLibWithLinux(conf, loader)
}

// Set the system properties.
// Use appending policy for children with the same name in a arrow struct vector.
Expand All @@ -167,6 +104,13 @@ class VeloxListenerApi extends ListenerApi {
// Load supported hive/python/scala udfs
UDFMappings.loadFromSparkConf(conf)

// Initial library loader.
val loader = JniWorkspace.getDefault.libLoader

// Load shared native libraries the backend libraries depend on.
SharedLibraryLoader.load(conf, loader)

// Load backend libraries.
val libPath = conf.get(GlutenConfig.GLUTEN_LIB_PATH, StringUtils.EMPTY)
if (StringUtils.isNotBlank(libPath)) { // Path based load. Ignore all other loadees.
JniLibLoader.loadFromPath(libPath, false)
Expand All @@ -176,11 +120,11 @@ class VeloxListenerApi extends ListenerApi {
loader.mapAndLoad(VeloxBackend.BACKEND_NAME, false)
}

// Initial native backend with configurations.
val parsed = GlutenConfigUtil.parseConfig(conf.getAll.toMap)
NativeBackendInitializer.initializeBackend(parsed)

// inject backend-specific implementations to override spark classes
// FIXME: The following set instances twice in local mode?
// Inject backend-specific implementations to override spark classes.
GlutenParquetWriterInjects.setInstance(new VeloxParquetWriterInjects())
GlutenOrcWriterInjects.setInstance(new VeloxOrcWriterInjects())
GlutenRowSplitter.setInstance(new VeloxRowSplitter())
Expand All @@ -191,4 +135,13 @@ class VeloxListenerApi extends ListenerApi {
}
}

object VeloxListenerApi {}
object VeloxListenerApi {
// TODO: Implement graceful shutdown and remove these flags.
// As spark conf may change when active Spark session is recreated.
private val driverInitialized: AtomicBoolean = new AtomicBoolean(false)
private val executorInitialized: AtomicBoolean = new AtomicBoolean(false)

private def inLocalMode(conf: SparkConf): Boolean = {
SparkResourceUtil.isLocalMaster(conf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,112 @@
*/
package org.apache.gluten.utils

import org.apache.gluten.GlutenConfig
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.vectorized.JniLibLoader

import org.apache.spark.SparkConf

import scala.sys.process._

trait SharedLibraryLoader {
def loadLib(loader: JniLibLoader): Unit
}

object SharedLibraryLoader {
def load(conf: SparkConf, jni: JniLibLoader): Unit = {
val shouldLoad = conf.getBoolean(
GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR,
GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR_DEFAULT)
if (!shouldLoad) {
return
}
val osName = System.getProperty("os.name")
if (osName.startsWith("Mac OS X") || osName.startsWith("macOS")) {
loadLibWithMacOS(jni)
} else {
loadLibWithLinux(conf, jni)
}
}

private def loadLibWithLinux(conf: SparkConf, jni: JniLibLoader): Unit = {
val loader = find(conf)
loader.loadLib(jni)
}

private def loadLibWithMacOS(jni: JniLibLoader): Unit = {
// Placeholder for loading shared libs on MacOS if user needs.
}

private def find(conf: SparkConf): SharedLibraryLoader = {
val systemName = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS)
val loader = if (systemName.isDefined) {
val systemVersion = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION)
if (systemVersion.isEmpty) {
throw new GlutenException(
s"${GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION} must be specified when specifies the " +
s"${GlutenConfig.GLUTEN_LOAD_LIB_OS}")
}
getForOS(systemName.get, systemVersion.get, "")
} else {
val system = "cat /etc/os-release".!!
val systemNamePattern = "^NAME=\"?(.*)\"?".r
val systemVersionPattern = "^VERSION=\"?(.*)\"?".r
val systemInfoLines = system.stripMargin.split("\n")
val systemNamePattern(systemName) =
systemInfoLines.find(_.startsWith("NAME=")).getOrElse("")
val systemVersionPattern(systemVersion) =
systemInfoLines.find(_.startsWith("VERSION=")).getOrElse("")
if (systemName.isEmpty || systemVersion.isEmpty) {
throw new GlutenException("Failed to get OS name and version info.")
}
getForOS(systemName, systemVersion, system)
}
loader
}

private def getForOS(
systemName: String,
systemVersion: String,
system: String): SharedLibraryLoader = {
if (systemName.contains("Ubuntu") && systemVersion.startsWith("20.04")) {
new SharedLibraryLoaderUbuntu2004
} else if (systemName.contains("Ubuntu") && systemVersion.startsWith("22.04")) {
new SharedLibraryLoaderUbuntu2204
} else if (systemName.contains("CentOS") && systemVersion.startsWith("9")) {
new SharedLibraryLoaderCentos9
} else if (systemName.contains("CentOS") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("CentOS") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("3")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("2")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Anolis") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Anolis") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("2.4")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("3.2")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("9")) {
new SharedLibraryLoaderCentos9
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Debian") && systemVersion.startsWith("11")) {
new SharedLibraryLoaderDebian11
} else if (systemName.contains("Debian") && systemVersion.startsWith("12")) {
new SharedLibraryLoaderDebian12
} else {
throw new GlutenException(
s"Found unsupported OS($systemName, $systemVersion)! Currently, Gluten's Velox backend" +
" only supports Ubuntu 20.04/22.04, CentOS 7/8, " +
"Alibaba Cloud Linux 2/3 & Anolis 7/8, tencentos 2.4/3.2, RedHat 7/8, " +
"Debian 11/12.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ private class CHCelebornColumnarBatchSerializerInstance(
private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf)
private lazy val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT)
private lazy val compressionLevel =
GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec,
GlutenShuffleUtils.getCompressionLevel(
conf,
compressionCodec,
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)

override def deserializeStream(in: InputStream): DeserializationStream = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ object SparkDirectoryUtil extends Logging {
return
}
if (INSTANCE.roots.toSet != roots.toSet) {
logWarning(
throw new IllegalArgumentException(
s"Reinitialize SparkDirectoryUtil with different root dirs: old: ${INSTANCE.ROOTS
.mkString("Array(", ", ", ")")}, new: ${roots.mkString("Array(", ", ", ")")}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,8 @@ object SparkResourceUtil extends Logging {
val taskCores = conf.getInt("spark.task.cpus", 1)
executorCores / taskCores
}

def isLocalMaster(conf: SparkConf): Boolean = {
Utils.isLocalMaster(conf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -540,14 +540,14 @@ object GlutenConfig {
val GLUTEN_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"

// For Soft Affinity Scheduling
// Enable Soft Affinity Scheduling, defalut value is false
// Enable Soft Affinity Scheduling, default value is false
val GLUTEN_SOFT_AFFINITY_ENABLED = "spark.gluten.soft-affinity.enabled"
val GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE = false
// Calculate the number of the replcations for scheduling to the target executors per file
// Calculate the number of the replications for scheduling to the target executors per file
val GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM = "spark.gluten.soft-affinity.replications.num"
val GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM_DEFAULT_VALUE = 2
// For on HDFS, if there are already target hosts,
// and then prefer to use the orginal target hosts to schedule
// and then prefer to use the original target hosts to schedule
val GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS = "spark.gluten.soft-affinity.min.target-hosts"
val GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS_DEFAULT_VALUE = 1

Expand Down

0 comments on commit bb87635

Please sign in to comment.