Skip to content

Commit

Permalink
[VL] Set Spark memory overhead automatically according to off-heap si…
Browse files Browse the repository at this point in the history
…ze when it's not explicitly configured (#7045)
  • Loading branch information
zhztheplayer authored Aug 28, 2024
1 parent 7e800f6 commit 36e435d
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.gluten.vectorized.{JniLibLoader, JniWorkspace}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.execution.datasources.velox.{VeloxOrcWriterInjects, VeloxParquetWriterInjects, VeloxRowSplitter}
import org.apache.spark.sql.expression.UDFResolver
import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf}
Expand All @@ -42,12 +43,24 @@ class VeloxListenerApi extends ListenerApi with Logging {
override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
val conf = pc.conf()

// FIXME: The following is a workaround. Remove once the causes are fixed.
conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, Long.MaxValue.toString)
logWarning(
"Setting overhead memory that Gluten can use to UNLIMITED. This is currently a" +
" temporary solution to avoid OOM by Velox's global memory pools." +
" See GLUTEN-6960 for more information.")
// Overhead memory limits.
val offHeapSize = conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
val desiredOverheadSize = (0.1 * offHeapSize).toLong.max(ByteUnit.MiB.toBytes(384))
if (!SparkResourceUtil.isMemoryOverheadSet(conf)) {
// If memory overhead is not set by user, automatically set it according to off-heap settings.
logInfo(
s"Memory overhead is not set. Setting it to $desiredOverheadSize automatically." +
" Gluten doesn't follow Spark's calculation on default value of this option because the" +
" actual required memory overhead will depend on off-heap usage than on on-heap usage.")
conf.set(GlutenConfig.SPARK_OVERHEAD_SIZE_KEY, desiredOverheadSize.toString)
}
val overheadSize: Long = SparkResourceUtil.getMemoryOverheadSize(conf)
if (overheadSize < desiredOverheadSize) {
logWarning(
s"Memory overhead is set to $overheadSize which is smaller than the recommended size" +
s" $desiredOverheadSize. This may cause OOM.")
}
conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, overheadSize.toString)

// Sql table cache serializer.
if (conf.getBoolean(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, defaultValue = false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public MetricRegistry metricRegistry() {
public SparkConf conf() {
final SparkConf conf = new SparkConf();
conf.set(GlutenConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(), "0");
conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
return conf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
// value (detected for the platform) is used, consistent with spark.
conf.set(GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY, SQLConf.SESSION_LOCAL_TIMEZONE.defaultValueString)

// task slots
// Task slots.
val taskSlots = SparkResourceUtil.getTaskSlots(conf)
conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, taskSlots.toString)

Expand All @@ -171,9 +171,6 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
1024 * 1024 * 1024
}

val overheadSize: Long = SparkResourceUtil.getMemoryOverheadSize(conf)
conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, overheadSize.toString)

// If dynamic off-heap sizing is enabled, the off-heap size is calculated based on the on-heap
// size. Otherwise, the off-heap size is set to the value specified by the user (if any).
// Note that this means that we will IGNORE the off-heap size specified by the user if the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.internal.SQLConf

object SparkResourceUtil extends Logging {
private val MEMORY_OVERHEAD_FACTOR = "spark.executor.memoryOverheadFactor"
private val MIN_MEMORY_OVERHEAD = "spark.executor.minMemoryOverhead"

/** Get the total cores of the Spark application */
def getTotalCores(sqlConf: SQLConf): Int = {
Expand Down Expand Up @@ -83,12 +85,18 @@ object SparkResourceUtil extends Logging {
Utils.isLocalMaster(conf)
}

// Returns whether user manually sets memory overhead.
def isMemoryOverheadSet(conf: SparkConf): Boolean = {
Seq(EXECUTOR_MEMORY_OVERHEAD.key, MEMORY_OVERHEAD_FACTOR, MIN_MEMORY_OVERHEAD).exists(
conf.contains)
}

def getMemoryOverheadSize(conf: SparkConf): Long = {
val overheadMib = conf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse {
val executorMemMib = conf.get(EXECUTOR_MEMORY)
val factor =
conf.getDouble("spark.executor.memoryOverheadFactor", 0.1d)
val minMib = conf.getLong("spark.executor.minMemoryOverhead", 384L)
conf.getDouble(MEMORY_OVERHEAD_FACTOR, 0.1d)
val minMib = conf.getLong(MIN_MEMORY_OVERHEAD, 384L)
(executorMemMib * factor).toLong.max(minMib)
}
ByteUnit.MiB.toBytes(overheadMib)
Expand Down

0 comments on commit 36e435d

Please sign in to comment.