Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Aug 23, 2024
1 parent d12bf1f commit 185fa65
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public long borrow(long size) {
.append(
String.format(
"\t%s=%s",
GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED(),
SQLConf.get().getConfString(GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED())))
GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED(),
SQLConf.get().getConfString(GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED())))
.append(System.lineSeparator())
.append(
String.format(
Expand Down
23 changes: 13 additions & 10 deletions gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,13 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
val minOffHeapSize = "1MB"
if (
!conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false) &&
(!conf.getBoolean(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, false) ||
conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, 0) < JavaUtils.byteStringAsBytes(
(!conf.getBoolean(GlutenConfig.SPARK_OFFHEAP_ENABLED, false) ||
conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, 0) < JavaUtils.byteStringAsBytes(
minOffHeapSize))
) {
throw new GlutenException(
s"Must set '${GlutenConfig.GLUTEN_OFFHEAP_ENABLED}' to true " +
s"and set '${GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY}' to be greater than $minOffHeapSize")
s"Must set '${GlutenConfig.SPARK_OFFHEAP_ENABLED}' to true " +
s"and set '${GlutenConfig.SPARK_OFFHEAP_SIZE_KEY}' to be greater than $minOffHeapSize")
}

// Session's local time zone must be set. If not explicitly set by user, its default
Expand All @@ -174,13 +174,16 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, taskSlots.toString)

val onHeapSize: Long =
if (conf.contains(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)) {
conf.getSizeAsBytes(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)
if (conf.contains(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)) {
conf.getSizeAsBytes(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)
} else {
// 1GB default
1024 * 1024 * 1024
}

val overheadSize : Long = SparkResourceUtil.getMemoryOverheadSize(conf)
conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, overheadSize.toString)

// If dynamic off-heap sizing is enabled, the off-heap size is calculated based on the on-heap
// size. Otherwise, the off-heap size is set to the value specified by the user (if any).
// Note that this means that we will IGNORE the off-heap size specified by the user if the
Expand All @@ -200,25 +203,25 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
// The 300MB value, unfortunately, is hard-coded in Spark code.
((onHeapSize - (300 * 1024 * 1024)) *
conf.getDouble(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION, 0.6d)).toLong
} else if (conf.contains(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)) {
} else if (conf.contains(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)) {
// Optimistic off-heap sizes, assuming all storage memory can be borrowed into execution
// memory pool, regardless of Spark option spark.memory.storageFraction.
conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)
conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
} else {
// Default Spark Value.
0L
}

conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapSize.toString)
conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, offHeapSize.toString)
conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString)

val offHeapPerTask = offHeapSize / taskSlots
conf.set(GlutenConfig.GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapPerTask.toString)

// If we are using dynamic off-heap sizing, we should also enable off-heap memory
// officially.
if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) {
conf.set(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, "true")
conf.set(GlutenConfig.SPARK_OFFHEAP_ENABLED, "true")

// We already sized the off-heap per task in a conservative manner, so we can just
// use it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.spark.util

import org.apache.spark.{SparkConf, SparkMasterRegex}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.internal.SQLConf

object SparkResourceUtil extends Logging {
Expand Down Expand Up @@ -80,4 +82,14 @@ object SparkResourceUtil extends Logging {
def isLocalMaster(conf: SparkConf): Boolean = {
Utils.isLocalMaster(conf)
}

def getMemoryOverheadSize(conf: SparkConf): Long = {
conf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse {
val executorMemMib = conf.get(EXECUTOR_MEMORY)
val factor =
conf.getDouble("spark.executor.memoryOverheadFactor", 0.1D)
val minMib = conf.getLong("spark.executor.minMemoryOverhead", 384L)
ByteUnit.MiB.toBytes((executorMemMib * factor).toLong max minMib)
}
}
}
22 changes: 18 additions & 4 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,11 @@ object GlutenConfig {
val GLUTEN_CONFIG_PREFIX = "spark.gluten.sql.columnar.backend."

// Private Spark configs.
val GLUTEN_ONHEAP_SIZE_KEY = "spark.executor.memory"
val GLUTEN_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size"
val GLUTEN_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
val SPARK_ONHEAP_SIZE_KEY = "spark.executor.memory"
val SPARK_OVERHEAD_SIZE_KEY = "spark.executor.memoryOverhead"
val SPARK_OVERHEAD_FACTOR_KEY = "spark.executor.memoryOverheadFactor"
val SPARK_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size"
val SPARK_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
val SPARK_REDACTION_REGEX = "spark.redaction.regex"

// For Soft Affinity Scheduling
Expand Down Expand Up @@ -570,6 +572,7 @@ object GlutenConfig {

// Added back to Spark Conf during executor initialization
val GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY = "spark.gluten.numTaskSlotsPerExecutor"
val GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY = "spark.gluten.memoryOverhead.size.in.bytes"
val GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.offHeap.size.in.bytes"
val GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.task.offHeap.size.in.bytes"
val GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY =
Expand Down Expand Up @@ -762,9 +765,10 @@ object GlutenConfig {
SPARK_SQL_PARQUET_COMPRESSION_CODEC,
// datasource config end

GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY,
GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY,
GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
GLUTEN_OFFHEAP_ENABLED,
SPARK_OFFHEAP_ENABLED,
SESSION_LOCAL_TIMEZONE.key,
DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key,
SPARK_REDACTION_REGEX
Expand Down Expand Up @@ -1244,6 +1248,16 @@ object GlutenConfig {
.intConf
.createWithDefaultString("-1")

val COLUMNAR_OVERHEAD_SIZE_IN_BYTES =
buildConf(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY)
.internal()
.doc(
"Must provide default value since non-execution operations " +
"(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate configurations using " +
"org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("0")

val COLUMNAR_OFFHEAP_SIZE_IN_BYTES =
buildConf(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY)
.internal()
Expand Down

0 comments on commit 185fa65

Please sign in to comment.