Skip to content

Commit

Permalink
nits
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Aug 15, 2024
1 parent d6008f6 commit 173a482
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.apache.gluten.backendsapi.velox

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.ListenerApi
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.datasource.{GlutenOrcWriterInjects, GlutenParquetWriterInjects, GlutenRowSplitter}
import org.apache.gluten.expression.UDFMappings
import org.apache.gluten.init.NativeBackendInitializer
Expand All @@ -27,17 +26,16 @@ import org.apache.gluten.vectorized.{JniLibLoader, JniWorkspace}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects, VeloxParquetWriterInjects, VeloxRowSplitter}
import org.apache.spark.sql.expression.UDFResolver
import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf}
import org.apache.spark.util.SparkDirectoryUtil

import org.apache.commons.lang3.StringUtils

import scala.sys.process._

class VeloxListenerApi extends ListenerApi {
private val ARROW_VERSION = "1500"
class VeloxListenerApi extends ListenerApi with Logging {
import VeloxListenerApi._

override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
val conf = pc.conf()
Expand All @@ -47,106 +45,30 @@ class VeloxListenerApi extends ListenerApi {
StaticSQLConf.SPARK_CACHE_SERIALIZER.key,
"org.apache.spark.sql.execution.ColumnarCachedBatchSerializer")
}
initialize(conf, isDriver = true)
SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = true)
initialize(conf)
}

override def onDriverShutdown(): Unit = shutdown()

override def onExecutorStart(pc: PluginContext): Unit = {
initialize(pc.conf(), isDriver = false)
}

override def onExecutorShutdown(): Unit = shutdown()

private def getLibraryLoaderForOS(
systemName: String,
systemVersion: String,
system: String): SharedLibraryLoader = {
if (systemName.contains("Ubuntu") && systemVersion.startsWith("20.04")) {
new SharedLibraryLoaderUbuntu2004
} else if (systemName.contains("Ubuntu") && systemVersion.startsWith("22.04")) {
new SharedLibraryLoaderUbuntu2204
} else if (systemName.contains("CentOS") && systemVersion.startsWith("9")) {
new SharedLibraryLoaderCentos9
} else if (systemName.contains("CentOS") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("CentOS") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("3")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("2")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Anolis") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Anolis") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("2.4")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("3.2")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("9")) {
new SharedLibraryLoaderCentos9
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Debian") && systemVersion.startsWith("11")) {
new SharedLibraryLoaderDebian11
} else if (systemName.contains("Debian") && systemVersion.startsWith("12")) {
new SharedLibraryLoaderDebian12
} else {
throw new GlutenException(
s"Found unsupported OS($systemName, $systemVersion)! Currently, Gluten's Velox backend" +
" only supports Ubuntu 20.04/22.04, CentOS 7/8, " +
"Alibaba Cloud Linux 2/3 & Anolis 7/8, tencentos 2.4/3.2, RedHat 7/8, " +
"Debian 11/12.")
}
}

private def loadLibFromJar(load: JniLibLoader, conf: SparkConf): Unit = {
val systemName = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS)
val loader = if (systemName.isDefined) {
val systemVersion = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION)
if (systemVersion.isEmpty) {
throw new GlutenException(
s"${GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION} must be specified when specifies the " +
s"${GlutenConfig.GLUTEN_LOAD_LIB_OS}")
}
getLibraryLoaderForOS(systemName.get, systemVersion.get, "")
} else {
val system = "cat /etc/os-release".!!
val systemNamePattern = "^NAME=\"?(.*)\"?".r
val systemVersionPattern = "^VERSION=\"?(.*)\"?".r
val systemInfoLines = system.stripMargin.split("\n")
val systemNamePattern(systemName) =
systemInfoLines.find(_.startsWith("NAME=")).getOrElse("")
val systemVersionPattern(systemVersion) =
systemInfoLines.find(_.startsWith("VERSION=")).getOrElse("")
if (systemName.isEmpty || systemVersion.isEmpty) {
throw new GlutenException("Failed to get OS name and version info.")
}
getLibraryLoaderForOS(systemName, systemVersion, system)
}
loader.loadLib(load)
}

private def loadLibWithLinux(conf: SparkConf, loader: JniLibLoader): Unit = {
if (
conf.getBoolean(
GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR,
GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR_DEFAULT)
) {
loadLibFromJar(loader, conf)
val conf = pc.conf
if (inLocalMode(conf)) {
// Don't do static initializations from executor side in local mode.
// Driver already did that.
logInfo(
"Gluten is running with Spark local mode. Skip running static initializer for executor.")
return
}
SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = false)
initialize(conf)
}

private def loadLibWithMacOS(loader: JniLibLoader): Unit = {
// Placeholder for loading shared libs on MacOS if user needs.
}
override def onExecutorShutdown(): Unit = shutdown()

private def initialize(conf: SparkConf, isDriver: Boolean): Unit = {
SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = isDriver)
private def initialize(conf: SparkConf): Unit = {
if (conf.getBoolean(GlutenConfig.GLUTEN_DEBUG_KEEP_JNI_WORKSPACE, defaultValue = false)) {
val debugDir = conf.get(GlutenConfig.GLUTEN_DEBUG_KEEP_JNI_WORKSPACE_DIR)
JniWorkspace.enableDebug(debugDir)
Expand Down Expand Up @@ -191,4 +113,37 @@ class VeloxListenerApi extends ListenerApi {
}
}

object VeloxListenerApi {}
object VeloxListenerApi {
// See org.apache.spark.SparkMasterRegex
private val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
private val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r

private def inLocalMode(conf: SparkConf): Boolean = {
val master = conf.get("spark.master")
master match {
case "local" => true
case LOCAL_N_REGEX(_) => true
case LOCAL_N_FAILURES_REGEX(_, _) => true
case _ => false
}
}

private def loadLibFromJar(load: JniLibLoader, conf: SparkConf): Unit = {
val loader = SharedLibraryLoader.find(conf)
loader.loadLib(load)
}

private def loadLibWithLinux(conf: SparkConf, loader: JniLibLoader): Unit = {
if (
conf.getBoolean(
GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR,
GlutenConfig.GLUTEN_LOAD_LIB_FROM_JAR_DEFAULT)
) {
loadLibFromJar(loader, conf)
}
}

private def loadLibWithMacOS(loader: JniLibLoader): Unit = {
// Placeholder for loading shared libs on MacOS if user needs.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,88 @@
*/
package org.apache.gluten.utils

import org.apache.gluten.GlutenConfig
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.vectorized.JniLibLoader

import org.apache.spark.SparkConf

import scala.sys.process._

trait SharedLibraryLoader {
def loadLib(loader: JniLibLoader): Unit
}

object SharedLibraryLoader {
def find(conf: SparkConf): SharedLibraryLoader = {
val systemName = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS)
val loader = if (systemName.isDefined) {
val systemVersion = conf.getOption(GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION)
if (systemVersion.isEmpty) {
throw new GlutenException(
s"${GlutenConfig.GLUTEN_LOAD_LIB_OS_VERSION} must be specified when specifies the " +
s"${GlutenConfig.GLUTEN_LOAD_LIB_OS}")
}
getForOS(systemName.get, systemVersion.get, "")
} else {
val system = "cat /etc/os-release".!!
val systemNamePattern = "^NAME=\"?(.*)\"?".r
val systemVersionPattern = "^VERSION=\"?(.*)\"?".r
val systemInfoLines = system.stripMargin.split("\n")
val systemNamePattern(systemName) =
systemInfoLines.find(_.startsWith("NAME=")).getOrElse("")
val systemVersionPattern(systemVersion) =
systemInfoLines.find(_.startsWith("VERSION=")).getOrElse("")
if (systemName.isEmpty || systemVersion.isEmpty) {
throw new GlutenException("Failed to get OS name and version info.")
}
getForOS(systemName, systemVersion, system)
}
loader
}

private def getForOS(
systemName: String,
systemVersion: String,
system: String): SharedLibraryLoader = {
if (systemName.contains("Ubuntu") && systemVersion.startsWith("20.04")) {
new SharedLibraryLoaderUbuntu2004
} else if (systemName.contains("Ubuntu") && systemVersion.startsWith("22.04")) {
new SharedLibraryLoaderUbuntu2204
} else if (systemName.contains("CentOS") && systemVersion.startsWith("9")) {
new SharedLibraryLoaderCentos9
} else if (systemName.contains("CentOS") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("CentOS") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("3")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Alibaba Cloud Linux") && systemVersion.startsWith("2")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Anolis") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Anolis") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("2.4")) {
new SharedLibraryLoaderCentos7
} else if (system.contains("tencentos") && system.contains("3.2")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("9")) {
new SharedLibraryLoaderCentos9
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("8")) {
new SharedLibraryLoaderCentos8
} else if (systemName.contains("Red Hat") && systemVersion.startsWith("7")) {
new SharedLibraryLoaderCentos7
} else if (systemName.contains("Debian") && systemVersion.startsWith("11")) {
new SharedLibraryLoaderDebian11
} else if (systemName.contains("Debian") && systemVersion.startsWith("12")) {
new SharedLibraryLoaderDebian12
} else {
throw new GlutenException(
s"Found unsupported OS($systemName, $systemVersion)! Currently, Gluten's Velox backend" +
" only supports Ubuntu 20.04/22.04, CentOS 7/8, " +
"Alibaba Cloud Linux 2/3 & Anolis 7/8, tencentos 2.4/3.2, RedHat 7/8, " +
"Debian 11/12.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ private class CHCelebornColumnarBatchSerializerInstance(
private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf)
private lazy val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT)
private lazy val compressionLevel =
GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec,
GlutenShuffleUtils.getCompressionLevel(
conf,
compressionCodec,
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)

override def deserializeStream(in: InputStream): DeserializationStream = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,9 @@ object SparkDirectoryUtil extends Logging {
INSTANCE = new SparkDirectoryUtil(roots)
return
}
if (INSTANCE.roots.toSet != roots.toSet) {
logWarning(
s"Reinitialize SparkDirectoryUtil with different root dirs: old: ${INSTANCE.ROOTS
.mkString("Array(", ", ", ")")}, new: ${roots.mkString("Array(", ", ", ")")}"
)
}
throw new UnsupportedOperationException(
s"SparkDirectoryUtil was already initialized with root dirs: ${INSTANCE.ROOTS
.mkString("Array(", ", ", ")")}")
}

def get(): SparkDirectoryUtil = synchronized {
Expand Down

0 comments on commit 173a482

Please sign in to comment.