diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala index 86925fd1d6a8..2cfc4e9a9099 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala @@ -18,7 +18,6 @@ package org.apache.gluten.backendsapi.velox import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.ListenerApi -import org.apache.gluten.exception.GlutenException import org.apache.gluten.execution.datasource.{GlutenOrcWriterInjects, GlutenParquetWriterInjects, GlutenRowSplitter} import org.apache.gluten.expression.UDFMappings import org.apache.gluten.init.NativeBackendInitializer @@ -27,138 +26,76 @@ import org.apache.gluten.vectorized.{JniLibLoader, JniWorkspace} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.api.plugin.PluginContext +import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects, VeloxParquetWriterInjects, VeloxRowSplitter} import org.apache.spark.sql.expression.UDFResolver import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf} -import org.apache.spark.util.SparkDirectoryUtil +import org.apache.spark.util.{SparkDirectoryUtil, SparkResourceUtil} import org.apache.commons.lang3.StringUtils -import scala.sys.process._ +import java.util.concurrent.atomic.AtomicBoolean -class VeloxListenerApi extends ListenerApi { - private val ARROW_VERSION = "1500" +class VeloxListenerApi extends ListenerApi with Logging { + import VeloxListenerApi._ override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = { val conf = pc.conf() - // sql table cache serializer + + // Sql table cache serializer. if (conf.getBoolean(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, defaultValue = false)) { conf.set( StaticSQLConf.SPARK_CACHE_SERIALIZER.key, "org.apache.spark.sql.execution.ColumnarCachedBatchSerializer") } - initialize(conf, isDriver = true) + + // Static initializers for driver. + if (!driverInitialized.compareAndSet(false, true)) { + // Make sure we call the static initializers only once. + logInfo( + "Skip rerunning static initializers since they are only supposed to run once." + + " You see this message probably because you are creating a new SparkSession.") + return + } + + SparkDirectoryUtil.init(conf) + UDFResolver.resolveUdfConf(conf, isDriver = true) + initialize(conf) } override def onDriverShutdown(): Unit = shutdown() override def onExecutorStart(pc: PluginContext): Unit = { - initialize(pc.conf(), isDriver = false) - } - - override def onExecutorShutdown(): Unit = shutdown() + val conf = pc.conf() - private def getLibraryLoaderForOS( - systemName: String, - systemVersion: String, - system: String): SharedLibraryLoader = { - if (systemName.contains("Ubuntu") && systemVersion.startsWith("20.04")) { - new SharedLibraryLoaderUbuntu2004 - } else if (systemName.contains("Ubuntu") && systemVersion.startsWith("22.04")) { - new SharedLibraryLoaderUbuntu2204 - } else if (systemName.contains("CentOS") && systemVersion.startsWith("9")) { - new SharedLibraryLoaderCentos9 - } else if (systemName.contains("CentOS") && systemVersion.startsWith("8")) { - new SharedLibraryLoaderCentos8 - } else if (systemName.contains("CentOS") && systemVersion.startsWith("7")) { - new SharedLibraryLoaderCentos7 - } else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("3")) { - new SharedLibraryLoaderCentos8 - } else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("2")) { - new SharedLibraryLoaderCentos7 - } else if (systemName.contains("Anolis") && systemVersion.startsWith("8")) { - new SharedLibraryLoaderCentos8 - } else if (systemName.contains("Anolis") && systemVersion.startsWith("7")) { - new SharedLibraryLoaderCentos7 - } else if (system.contains("tencentos") && system.contains("2.4")) { - new SharedLibraryLoaderCentos7 - } else if (system.contains("tencentos") && system.contains("3.2")) { - new SharedLibraryLoaderCentos8 - } else if (systemName.contains("Red Hat") && systemVersion.startsWith("9")) { - new SharedLibraryLoaderCentos9 - } else if (systemName.contains("Red Hat") && systemVersion.startsWith("8")) { - new SharedLibraryLoaderCentos8 - } else if (systemName.contains("Red Hat") && systemVersion.startsWith("7")) { - new SharedLibraryLoaderCentos7 - } else if (systemName.contains("Debian") && systemVersion.startsWith("11")) { - new SharedLibraryLoaderDebian11 - } else if (systemName.contains("Debian") && systemVersion.startsWith("12")) { - new SharedLibraryLoaderDebian12 - } else { - throw new GlutenException( - s"Found unsupported OS($systemName, $systemVersion)! Currently, Gluten's Velox backend" + - " only supports Ubuntu 20.04/22.04, CentOS 7/8, " + - "Alibaba Cloud Linux 2/3 & Anolis 7/8, tencentos 2.4/3.2, RedHat 7/8, " + - "Debian 11/12.") + // Static initializers for executor. + if (!executorInitialized.compareAndSet(false, true)) { + // Make sure we call the static initializers only once. + logInfo( + "Skip rerunning static initializers since they are only supposed to run once." + + " You see this message probably because you are creating a new SparkSession.") + return } - } - - private def loadLibFromJar(load: JniLibLoader, conf: SparkConf): Unit = { - val systemName = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS) - val loader = if (systemName.isDefined) { - val systemVersion = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION) - if (systemVersion.isEmpty) { - throw new GlutenException( - s"${GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION} must be specified when specifies the " + - s"${GlutenConfig.GLUTEN_LOAD_LIB_OS}") - } - getLibraryLoaderForOS(systemName.get, systemVersion.get, "") - } else { - val system = "cat /etc/os-release".!! - val systemNamePattern = "^NAME=\"?(.*)\"?".r - val systemVersionPattern = "^VERSION=\"?(.*)\"?".r - val systemInfoLines = system.stripMargin.split("\n") - val systemNamePattern(systemName) = - systemInfoLines.find(_.startsWith("NAME=")).getOrElse("") - val systemVersionPattern(systemVersion) = - systemInfoLines.find(_.startsWith("VERSION=")).getOrElse("") - if (systemName.isEmpty || systemVersion.isEmpty) { - throw new GlutenException("Failed to get OS name and version info.") - } - getLibraryLoaderForOS(systemName, systemVersion, system) + if (inLocalMode(conf)) { + // Don't do static initializations from executor side in local mode. + // Driver already did that. + logInfo( + "Gluten is running with Spark local mode. Skip running static initializer for executor.") + return } - loader.loadLib(load) - } - private def loadLibWithLinux(conf: SparkConf, loader: JniLibLoader): Unit = { - if ( - conf.getBoolean( - GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR, - GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR_DEFAULT) - ) { - loadLibFromJar(loader, conf) - } + SparkDirectoryUtil.init(conf) + UDFResolver.resolveUdfConf(conf, isDriver = false) + initialize(conf) } - private def loadLibWithMacOS(loader: JniLibLoader): Unit = { - // Placeholder for loading shared libs on MacOS if user needs. - } + override def onExecutorShutdown(): Unit = shutdown() - private def initialize(conf: SparkConf, isDriver: Boolean): Unit = { - SparkDirectoryUtil.init(conf) - UDFResolver.resolveUdfConf(conf, isDriver = isDriver) + private def initialize(conf: SparkConf): Unit = { if (conf.getBoolean(GlutenConfig.GLUTEN_DEBUG_KEEP_JNI_WORKSPACE, defaultValue = false)) { val debugDir = conf.get(GlutenConfig.GLUTEN_DEBUG_KEEP_JNI_WORKSPACE_DIR) JniWorkspace.enableDebug(debugDir) } - val loader = JniWorkspace.getDefault.libLoader - - val osName = System.getProperty("os.name") - if (osName.startsWith("Mac OS X") || osName.startsWith("macOS")) { - loadLibWithMacOS(loader) - } else { - loadLibWithLinux(conf, loader) - } // Set the system properties. // Use appending policy for children with the same name in a arrow struct vector. @@ -167,6 +104,13 @@ class VeloxListenerApi extends ListenerApi { // Load supported hive/python/scala udfs UDFMappings.loadFromSparkConf(conf) + // Initial library loader. + val loader = JniWorkspace.getDefault.libLoader + + // Load shared native libraries the backend libraries depend on. + SharedLibraryLoader.load(conf, loader) + + // Load backend libraries. val libPath = conf.get(GlutenConfig.GLUTEN_LIB_PATH, StringUtils.EMPTY) if (StringUtils.isNotBlank(libPath)) { // Path based load. Ignore all other loadees. JniLibLoader.loadFromPath(libPath, false) @@ -176,11 +120,11 @@ class VeloxListenerApi extends ListenerApi { loader.mapAndLoad(VeloxBackend.BACKEND_NAME, false) } + // Initial native backend with configurations. val parsed = GlutenConfigUtil.parseConfig(conf.getAll.toMap) NativeBackendInitializer.initializeBackend(parsed) - // inject backend-specific implementations to override spark classes - // FIXME: The following set instances twice in local mode? + // Inject backend-specific implementations to override spark classes. GlutenParquetWriterInjects.setInstance(new VeloxParquetWriterInjects()) GlutenOrcWriterInjects.setInstance(new VeloxOrcWriterInjects()) GlutenRowSplitter.setInstance(new VeloxRowSplitter()) @@ -191,4 +135,13 @@ class VeloxListenerApi extends ListenerApi { } } -object VeloxListenerApi {} +object VeloxListenerApi { + // TODO: Implement graceful shutdown and remove these flags. + // As spark conf may change when active Spark session is recreated. + private val driverInitialized: AtomicBoolean = new AtomicBoolean(false) + private val executorInitialized: AtomicBoolean = new AtomicBoolean(false) + + private def inLocalMode(conf: SparkConf): Boolean = { + SparkResourceUtil.isLocalMaster(conf) + } +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoader.scala b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoader.scala index 137da83c0980..1f3ca30de9f5 100755 --- a/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoader.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/utils/SharedLibraryLoader.scala @@ -16,8 +16,112 @@ */ package org.apache.gluten.utils +import org.apache.gluten.GlutenConfig +import org.apache.gluten.exception.GlutenException import org.apache.gluten.vectorized.JniLibLoader +import org.apache.spark.SparkConf + +import scala.sys.process._ + trait SharedLibraryLoader { def loadLib(loader: JniLibLoader): Unit } + +object SharedLibraryLoader { + def load(conf: SparkConf, jni: JniLibLoader): Unit = { + val shouldLoad = conf.getBoolean( + GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR, + GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR_DEFAULT) + if (!shouldLoad) { + return + } + val osName = System.getProperty("os.name") + if (osName.startsWith("Mac OS X") || osName.startsWith("macOS")) { + loadLibWithMacOS(jni) + } else { + loadLibWithLinux(conf, jni) + } + } + + private def loadLibWithLinux(conf: SparkConf, jni: JniLibLoader): Unit = { + val loader = find(conf) + loader.loadLib(jni) + } + + private def loadLibWithMacOS(jni: JniLibLoader): Unit = { + // Placeholder for loading shared libs on MacOS if user needs. + } + + private def find(conf: SparkConf): SharedLibraryLoader = { + val systemName = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS) + val loader = if (systemName.isDefined) { + val systemVersion = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION) + if (systemVersion.isEmpty) { + throw new GlutenException( + s"${GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION} must be specified when specifies the " + + s"${GlutenConfig.GLUTEN_LOAD_LIB_OS}") + } + getForOS(systemName.get, systemVersion.get, "") + } else { + val system = "cat /etc/os-release".!! + val systemNamePattern = "^NAME=\"?(.*)\"?".r + val systemVersionPattern = "^VERSION=\"?(.*)\"?".r + val systemInfoLines = system.stripMargin.split("\n") + val systemNamePattern(systemName) = + systemInfoLines.find(_.startsWith("NAME=")).getOrElse("") + val systemVersionPattern(systemVersion) = + systemInfoLines.find(_.startsWith("VERSION=")).getOrElse("") + if (systemName.isEmpty || systemVersion.isEmpty) { + throw new GlutenException("Failed to get OS name and version info.") + } + getForOS(systemName, systemVersion, system) + } + loader + } + + private def getForOS( + systemName: String, + systemVersion: String, + system: String): SharedLibraryLoader = { + if (systemName.contains("Ubuntu") && systemVersion.startsWith("20.04")) { + new SharedLibraryLoaderUbuntu2004 + } else if (systemName.contains("Ubuntu") && systemVersion.startsWith("22.04")) { + new SharedLibraryLoaderUbuntu2204 + } else if (systemName.contains("CentOS") && systemVersion.startsWith("9")) { + new SharedLibraryLoaderCentos9 + } else if (systemName.contains("CentOS") && systemVersion.startsWith("8")) { + new SharedLibraryLoaderCentos8 + } else if (systemName.contains("CentOS") && systemVersion.startsWith("7")) { + new SharedLibraryLoaderCentos7 + } else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("3")) { + new SharedLibraryLoaderCentos8 + } else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("2")) { + new SharedLibraryLoaderCentos7 + } else if (systemName.contains("Anolis") && systemVersion.startsWith("8")) { + new SharedLibraryLoaderCentos8 + } else if (systemName.contains("Anolis") && systemVersion.startsWith("7")) { + new SharedLibraryLoaderCentos7 + } else if (system.contains("tencentos") && system.contains("2.4")) { + new SharedLibraryLoaderCentos7 + } else if (system.contains("tencentos") && system.contains("3.2")) { + new SharedLibraryLoaderCentos8 + } else if (systemName.contains("Red Hat") && systemVersion.startsWith("9")) { + new SharedLibraryLoaderCentos9 + } else if (systemName.contains("Red Hat") && systemVersion.startsWith("8")) { + new SharedLibraryLoaderCentos8 + } else if (systemName.contains("Red Hat") && systemVersion.startsWith("7")) { + new SharedLibraryLoaderCentos7 + } else if (systemName.contains("Debian") && systemVersion.startsWith("11")) { + new SharedLibraryLoaderDebian11 + } else if (systemName.contains("Debian") && systemVersion.startsWith("12")) { + new SharedLibraryLoaderDebian12 + } else { + throw new GlutenException( + s"Found unsupported OS($systemName, $systemVersion)! Currently, Gluten's Velox backend" + + " only supports Ubuntu 20.04/22.04, CentOS 7/8, " + + "Alibaba Cloud Linux 2/3 & Anolis 7/8, tencentos 2.4/3.2, RedHat 7/8, " + + "Debian 11/12.") + } + } +} diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala index 5072ce6a1a2e..3b8e92bfe1d2 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala @@ -62,7 +62,9 @@ private class CHCelebornColumnarBatchSerializerInstance( private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf) private lazy val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT) private lazy val compressionLevel = - GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec, + GlutenShuffleUtils.getCompressionLevel( + conf, + compressionCodec, GlutenConfig.getConf.columnarShuffleCodecBackend.orNull) override def deserializeStream(in: InputStream): DeserializationStream = { diff --git a/gluten-core/src/main/scala/org/apache/spark/util/SparkDirectoryUtil.scala b/gluten-core/src/main/scala/org/apache/spark/util/SparkDirectoryUtil.scala index fbc59edfdd6b..833575178c66 100644 --- a/gluten-core/src/main/scala/org/apache/spark/util/SparkDirectoryUtil.scala +++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkDirectoryUtil.scala @@ -79,7 +79,7 @@ object SparkDirectoryUtil extends Logging { return } if (INSTANCE.roots.toSet != roots.toSet) { - logWarning( + throw new IllegalArgumentException( s"Reinitialize SparkDirectoryUtil with different root dirs: old: ${INSTANCE.ROOTS .mkString("Array(", ", ", ")")}, new: ${roots.mkString("Array(", ", ", ")")}" ) diff --git a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala index b16d43de5d68..f8c791fe1374 100644 --- a/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala +++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkResourceUtil.scala @@ -76,4 +76,8 @@ object SparkResourceUtil extends Logging { val taskCores = conf.getInt("spark.task.cpus", 1) executorCores / taskCores } + + def isLocalMaster(conf: SparkConf): Boolean = { + Utils.isLocalMaster(conf) + } } diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 287e8bee32f7..d11873b088a3 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -540,14 +540,14 @@ object GlutenConfig { val GLUTEN_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled" // For Soft Affinity Scheduling - // Enable Soft Affinity Scheduling, defalut value is false + // Enable Soft Affinity Scheduling, default value is false val GLUTEN_SOFT_AFFINITY_ENABLED = "spark.gluten.soft-affinity.enabled" val GLUTEN_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE = false - // Calculate the number of the replcations for scheduling to the target executors per file + // Calculate the number of the replications for scheduling to the target executors per file val GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM = "spark.gluten.soft-affinity.replications.num" val GLUTEN_SOFT_AFFINITY_REPLICATIONS_NUM_DEFAULT_VALUE = 2 // For on HDFS, if there are already target hosts, - // and then prefer to use the orginal target hosts to schedule + // and then prefer to use the original target hosts to schedule val GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS = "spark.gluten.soft-affinity.min.target-hosts" val GLUTEN_SOFT_AFFINITY_MIN_TARGET_HOSTS_DEFAULT_VALUE = 1