Skip to content

Commit

Permalink
[VL] Recover broken memory-trace option spark.gluten.backtrace.alloca…
Browse files Browse the repository at this point in the history
…tion (#6635)
  • Loading branch information
zhztheplayer authored Aug 2, 2024
1 parent 807d9f7 commit 3fbf488
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 16 deletions.
14 changes: 11 additions & 3 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
gluten::getJniCommonState()->close();
}

namespace {
const std::string kBacktraceAllocation = "spark.gluten.memory.backtrace.allocation";
}

JNIEXPORT jlong JNICALL Java_org_apache_gluten_exec_RuntimeJniWrapper_createRuntime( // NOLINT
JNIEnv* env,
jclass,
Expand All @@ -226,13 +230,17 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_exec_RuntimeJniWrapper_createRunt
if (env->GetJavaVM(&vm) != JNI_OK) {
throw gluten::GlutenException("Unable to get JavaVM instance");
}

auto safeArray = gluten::getByteArrayElementsSafe(env, sessionConf);
auto sparkConf = gluten::parseConfMap(env, safeArray.elems(), safeArray.length());
auto backendType = jStringToCString(env, jbackendType);

std::unique_ptr<AllocationListener> listener =
std::make_unique<SparkAllocationListener>(vm, jlistener, reserveMemoryMethod, unreserveMemoryMethod);
bool backtrace = sparkConf.at(kBacktraceAllocation) == "true";
if (backtrace) {
listener = std::make_unique<BacktraceAllocationListener>(std::move(listener));
}

auto safeArray = gluten::getByteArrayElementsSafe(env, sessionConf);
auto sparkConf = gluten::parseConfMap(env, safeArray.elems(), safeArray.length());
auto runtime = gluten::Runtime::create(backendType, std::move(listener), sparkConf);
return reinterpret_cast<jlong>(runtime);
JNI_METHOD_END(kInvalidObjectHandle)
Expand Down
2 changes: 0 additions & 2 deletions cpp/core/memory/AllocationListener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

namespace gluten {

bool backtrace_allocation = false;

class NoopAllocationListener : public gluten::AllocationListener {
public:
void allocationChanged(int64_t diff) override {
Expand Down
2 changes: 0 additions & 2 deletions cpp/core/memory/AllocationListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

namespace gluten {

extern bool backtrace_allocation;

class AllocationListener {
public:
static std::unique_ptr<AllocationListener> noop();
Expand Down
3 changes: 0 additions & 3 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
FLAGS_gluten_velox_aysnc_timeout_on_task_stopping =
backendConf_->get<int32_t>(kVeloxAsyncTimeoutOnTaskStopping, kVeloxAsyncTimeoutOnTaskStoppingDefault);

// Set backtrace_allocation
gluten::backtrace_allocation = backendConf_->get<bool>(kBacktraceAllocation, false);

// Setup and register.
velox::filesystems::registerLocalFileSystem();
initJolFilesystem();
Expand Down
3 changes: 0 additions & 3 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ const int32_t kVeloxAsyncTimeoutOnTaskStoppingDefault = 30000; // 30s
// udf
const std::string kVeloxUdfLibraryPaths = "spark.gluten.sql.columnar.backend.velox.internal.udfLibraryPaths";

// backtrace allocation
const std::string kBacktraceAllocation = "spark.gluten.backtrace.allocation";

// VeloxShuffleReader print flag.
const std::string kVeloxShuffleReaderPrintFlag = "spark.gluten.velox.shuffleReaderPrintFlag";

Expand Down
2 changes: 1 addition & 1 deletion docs/developers/HowTo.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ wait to add

# How to track the memory exhaust problem

When your gluten spark jobs failed because of OOM, you can track the memory allocation's call stack by configuring `spark.gluten.backtrace.allocation = true`.
When your gluten spark jobs failed because of OOM, you can track the memory allocation's call stack by configuring `spark.gluten.memory.backtrace.allocation = true`.
The above configuration will use `BacktraceAllocationListener` wrapping from `SparkAllocationListener` to create `VeloxMemoryManager`.

`BacktraceAllocationListener` will check every allocation, if a single allocation bytes exceeds a fixed value or the accumulative allocation bytes exceeds 1/2/3...G,
Expand Down
19 changes: 17 additions & 2 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def memoryIsolation: Boolean = conf.getConf(COLUMNAR_MEMORY_ISOLATION)

def memoryBacktraceAllocation: Boolean = conf.getConf(COLUMNAR_MEMORY_BACKTRACE_ALLOCATION)

def numTaskSlotsPerExecutor: Int = {
val numSlots = conf.getConf(NUM_TASK_SLOTS_PER_EXECUTOR)
assert(numSlots > 0, s"Number of task slot not found. This should not happen.")
Expand Down Expand Up @@ -659,7 +661,10 @@ object GlutenConfig {

val keyWithDefault = ImmutableList.of(
(SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValueString),
(SQLConf.IGNORE_MISSING_FILES.key, SQLConf.IGNORE_MISSING_FILES.defaultValueString)
(SQLConf.IGNORE_MISSING_FILES.key, SQLConf.IGNORE_MISSING_FILES.defaultValueString),
(
COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.key,
COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.defaultValueString)
)
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2)))

Expand Down Expand Up @@ -706,7 +711,9 @@ object GlutenConfig {
(AWS_S3_RETRY_MODE.key, AWS_S3_RETRY_MODE.defaultValueString),
(
COLUMNAR_VELOX_CONNECTOR_IO_THREADS.key,
conf.getOrElse(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, "-1")),
conf.getOrElse(
NUM_TASK_SLOTS_PER_EXECUTOR.key,
NUM_TASK_SLOTS_PER_EXECUTOR.defaultValueString)),
(COLUMNAR_SHUFFLE_CODEC.key, ""),
(COLUMNAR_SHUFFLE_CODEC_BACKEND.key, ""),
("spark.hadoop.input.connect.timeout", "180000"),
Expand Down Expand Up @@ -1244,6 +1251,14 @@ object GlutenConfig {
.booleanConf
.createWithDefault(false)

val COLUMNAR_MEMORY_BACKTRACE_ALLOCATION =
buildConf("spark.gluten.memory.backtrace.allocation")
.internal()
.doc("Print backtrace information for large memory allocations. This helps debugging when " +
"Spark OOM happens due to large acquire requests.")
.booleanConf
.createWithDefault(false)

val COLUMNAR_MEMORY_OVER_ACQUIRED_RATIO =
buildConf("spark.gluten.memory.overAcquiredMemoryRatio")
.internal()
Expand Down

0 comments on commit 3fbf488

Please sign in to comment.