Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Recover broken memory-trace option spark.gluten.backtrace.allocation #6635

Merged
merged 5 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
gluten::getJniCommonState()->close();
}

namespace {
const std::string kBacktraceAllocation = "spark.gluten.memory.backtrace.allocation";
}

JNIEXPORT jlong JNICALL Java_org_apache_gluten_exec_RuntimeJniWrapper_createRuntime( // NOLINT
JNIEnv* env,
jclass,
Expand All @@ -226,13 +230,17 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_exec_RuntimeJniWrapper_createRunt
if (env->GetJavaVM(&vm) != JNI_OK) {
throw gluten::GlutenException("Unable to get JavaVM instance");
}

auto safeArray = gluten::getByteArrayElementsSafe(env, sessionConf);
auto sparkConf = gluten::parseConfMap(env, safeArray.elems(), safeArray.length());
auto backendType = jStringToCString(env, jbackendType);

std::unique_ptr<AllocationListener> listener =
std::make_unique<SparkAllocationListener>(vm, jlistener, reserveMemoryMethod, unreserveMemoryMethod);
bool backtrace = sparkConf.at(kBacktraceAllocation) == "true";
if (backtrace) {
listener = std::make_unique<BacktraceAllocationListener>(std::move(listener));
}

auto safeArray = gluten::getByteArrayElementsSafe(env, sessionConf);
auto sparkConf = gluten::parseConfMap(env, safeArray.elems(), safeArray.length());
auto runtime = gluten::Runtime::create(backendType, std::move(listener), sparkConf);
return reinterpret_cast<jlong>(runtime);
JNI_METHOD_END(kInvalidObjectHandle)
Expand Down
2 changes: 0 additions & 2 deletions cpp/core/memory/AllocationListener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

namespace gluten {

bool backtrace_allocation = false;

class NoopAllocationListener : public gluten::AllocationListener {
public:
void allocationChanged(int64_t diff) override {
Expand Down
2 changes: 0 additions & 2 deletions cpp/core/memory/AllocationListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

namespace gluten {

extern bool backtrace_allocation;

class AllocationListener {
public:
static std::unique_ptr<AllocationListener> noop();
Expand Down
3 changes: 0 additions & 3 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
FLAGS_gluten_velox_aysnc_timeout_on_task_stopping =
backendConf_->get<int32_t>(kVeloxAsyncTimeoutOnTaskStopping, kVeloxAsyncTimeoutOnTaskStoppingDefault);

// Set backtrace_allocation
gluten::backtrace_allocation = backendConf_->get<bool>(kBacktraceAllocation, false);

// Setup and register.
velox::filesystems::registerLocalFileSystem();
initJolFilesystem();
Expand Down
3 changes: 0 additions & 3 deletions cpp/velox/config/VeloxConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ const int32_t kVeloxAsyncTimeoutOnTaskStoppingDefault = 30000; // 30s
// udf
const std::string kVeloxUdfLibraryPaths = "spark.gluten.sql.columnar.backend.velox.internal.udfLibraryPaths";

// backtrace allocation
const std::string kBacktraceAllocation = "spark.gluten.backtrace.allocation";

// VeloxShuffleReader print flag.
const std::string kVeloxShuffleReaderPrintFlag = "spark.gluten.velox.shuffleReaderPrintFlag";

Expand Down
2 changes: 1 addition & 1 deletion docs/developers/HowTo.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ wait to add

# How to track the memory exhaust problem

When your gluten spark jobs failed because of OOM, you can track the memory allocation's call stack by configuring `spark.gluten.backtrace.allocation = true`.
When your gluten spark jobs failed because of OOM, you can track the memory allocation's call stack by configuring `spark.gluten.memory.backtrace.allocation = true`.
The above configuration will use `BacktraceAllocationListener` wrapping from `SparkAllocationListener` to create `VeloxMemoryManager`.

`BacktraceAllocationListener` will check every allocation, if a single allocation bytes exceeds a fixed value or the accumulative allocation bytes exceeds 1/2/3...G,
Expand Down
19 changes: 17 additions & 2 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def memoryIsolation: Boolean = conf.getConf(COLUMNAR_MEMORY_ISOLATION)

def memoryBacktraceAllocation: Boolean = conf.getConf(COLUMNAR_MEMORY_BACKTRACE_ALLOCATION)

def numTaskSlotsPerExecutor: Int = {
val numSlots = conf.getConf(NUM_TASK_SLOTS_PER_EXECUTOR)
assert(numSlots > 0, s"Number of task slot not found. This should not happen.")
Expand Down Expand Up @@ -673,7 +675,10 @@ object GlutenConfig {

val keyWithDefault = ImmutableList.of(
(SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValueString),
(SQLConf.IGNORE_MISSING_FILES.key, SQLConf.IGNORE_MISSING_FILES.defaultValueString)
(SQLConf.IGNORE_MISSING_FILES.key, SQLConf.IGNORE_MISSING_FILES.defaultValueString),
(
COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.key,
COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.defaultValueString)
)
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2)))

Expand Down Expand Up @@ -720,7 +725,9 @@ object GlutenConfig {
(AWS_S3_RETRY_MODE.key, AWS_S3_RETRY_MODE.defaultValueString),
(
COLUMNAR_VELOX_CONNECTOR_IO_THREADS.key,
conf.getOrElse(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, "-1")),
conf.getOrElse(
NUM_TASK_SLOTS_PER_EXECUTOR.key,
NUM_TASK_SLOTS_PER_EXECUTOR.defaultValueString)),
(COLUMNAR_SHUFFLE_CODEC.key, ""),
(COLUMNAR_SHUFFLE_CODEC_BACKEND.key, ""),
("spark.hadoop.input.connect.timeout", "180000"),
Expand Down Expand Up @@ -1258,6 +1265,14 @@ object GlutenConfig {
.booleanConf
.createWithDefault(false)

val COLUMNAR_MEMORY_BACKTRACE_ALLOCATION =
buildConf("spark.gluten.memory.backtrace.allocation")
.internal()
.doc("Print backtrace information for large memory allocations. This helps debugging when " +
"Spark OOM happens due to large acquire requests.")
.booleanConf
.createWithDefault(false)

val COLUMNAR_MEMORY_OVER_ACQUIRED_RATIO =
buildConf("spark.gluten.memory.overAcquiredMemoryRatio")
.internal()
Expand Down
Loading