diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 3d6da31c7e75..8964cad0d1b6 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -215,6 +215,10 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { gluten::getJniCommonState()->close(); } +namespace { +const std::string kBacktraceAllocation = "spark.gluten.memory.backtrace.allocation"; +} + JNIEXPORT jlong JNICALL Java_org_apache_gluten_exec_RuntimeJniWrapper_createRuntime( // NOLINT JNIEnv* env, jclass, @@ -226,13 +230,17 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_exec_RuntimeJniWrapper_createRunt if (env->GetJavaVM(&vm) != JNI_OK) { throw gluten::GlutenException("Unable to get JavaVM instance"); } - + auto safeArray = gluten::getByteArrayElementsSafe(env, sessionConf); + auto sparkConf = gluten::parseConfMap(env, safeArray.elems(), safeArray.length()); auto backendType = jStringToCString(env, jbackendType); + std::unique_ptr listener = std::make_unique(vm, jlistener, reserveMemoryMethod, unreserveMemoryMethod); + bool backtrace = sparkConf.at(kBacktraceAllocation) == "true"; + if (backtrace) { + listener = std::make_unique(std::move(listener)); + } - auto safeArray = gluten::getByteArrayElementsSafe(env, sessionConf); - auto sparkConf = gluten::parseConfMap(env, safeArray.elems(), safeArray.length()); auto runtime = gluten::Runtime::create(backendType, std::move(listener), sparkConf); return reinterpret_cast(runtime); JNI_METHOD_END(kInvalidObjectHandle) diff --git a/cpp/core/memory/AllocationListener.cc b/cpp/core/memory/AllocationListener.cc index 2c876e9f19f7..5cbeeb6bd5c9 100644 --- a/cpp/core/memory/AllocationListener.cc +++ b/cpp/core/memory/AllocationListener.cc @@ -19,8 +19,6 @@ namespace gluten { -bool backtrace_allocation = false; - class NoopAllocationListener : public gluten::AllocationListener { public: void allocationChanged(int64_t diff) override { diff --git a/cpp/core/memory/AllocationListener.h b/cpp/core/memory/AllocationListener.h index 41797641fe14..1751b6112ae2 100644 --- a/cpp/core/memory/AllocationListener.h +++ b/cpp/core/memory/AllocationListener.h @@ -23,8 +23,6 @@ namespace gluten { -extern bool backtrace_allocation; - class AllocationListener { public: static std::unique_ptr noop(); diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index a3658faa3a18..2dad6adf2e70 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -104,9 +104,6 @@ void VeloxBackend::init(const std::unordered_map& conf FLAGS_gluten_velox_aysnc_timeout_on_task_stopping = backendConf_->get(kVeloxAsyncTimeoutOnTaskStopping, kVeloxAsyncTimeoutOnTaskStoppingDefault); - // Set backtrace_allocation - gluten::backtrace_allocation = backendConf_->get(kBacktraceAllocation, false); - // Setup and register. velox::filesystems::registerLocalFileSystem(); initJolFilesystem(); diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index 65c7cb61d94d..792beda96f7d 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -104,9 +104,6 @@ const int32_t kVeloxAsyncTimeoutOnTaskStoppingDefault = 30000; // 30s // udf const std::string kVeloxUdfLibraryPaths = "spark.gluten.sql.columnar.backend.velox.internal.udfLibraryPaths"; -// backtrace allocation -const std::string kBacktraceAllocation = "spark.gluten.backtrace.allocation"; - // VeloxShuffleReader print flag. const std::string kVeloxShuffleReaderPrintFlag = "spark.gluten.velox.shuffleReaderPrintFlag"; diff --git a/docs/developers/HowTo.md b/docs/developers/HowTo.md index a13bf02ebede..5b16c965fe63 100644 --- a/docs/developers/HowTo.md +++ b/docs/developers/HowTo.md @@ -163,7 +163,7 @@ wait to add # How to track the memory exhaust problem -When your gluten spark jobs failed because of OOM, you can track the memory allocation's call stack by configuring `spark.gluten.backtrace.allocation = true`. +When your gluten spark jobs failed because of OOM, you can track the memory allocation's call stack by configuring `spark.gluten.memory.backtrace.allocation = true`. The above configuration will use `BacktraceAllocationListener` wrapping from `SparkAllocationListener` to create `VeloxMemoryManager`. `BacktraceAllocationListener` will check every allocation, if a single allocation bytes exceeds a fixed value or the accumulative allocation bytes exceeds 1/2/3...G, diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 5547feafe331..3184a83310bc 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -242,6 +242,8 @@ class GlutenConfig(conf: SQLConf) extends Logging { def memoryIsolation: Boolean = conf.getConf(COLUMNAR_MEMORY_ISOLATION) + def memoryBacktraceAllocation: Boolean = conf.getConf(COLUMNAR_MEMORY_BACKTRACE_ALLOCATION) + def numTaskSlotsPerExecutor: Int = { val numSlots = conf.getConf(NUM_TASK_SLOTS_PER_EXECUTOR) assert(numSlots > 0, s"Number of task slot not found. This should not happen.") @@ -673,7 +675,10 @@ object GlutenConfig { val keyWithDefault = ImmutableList.of( (SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValueString), - (SQLConf.IGNORE_MISSING_FILES.key, SQLConf.IGNORE_MISSING_FILES.defaultValueString) + (SQLConf.IGNORE_MISSING_FILES.key, SQLConf.IGNORE_MISSING_FILES.defaultValueString), + ( + COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.key, + COLUMNAR_MEMORY_BACKTRACE_ALLOCATION.defaultValueString) ) keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2))) @@ -720,7 +725,9 @@ object GlutenConfig { (AWS_S3_RETRY_MODE.key, AWS_S3_RETRY_MODE.defaultValueString), ( COLUMNAR_VELOX_CONNECTOR_IO_THREADS.key, - conf.getOrElse(GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, "-1")), + conf.getOrElse( + NUM_TASK_SLOTS_PER_EXECUTOR.key, + NUM_TASK_SLOTS_PER_EXECUTOR.defaultValueString)), (COLUMNAR_SHUFFLE_CODEC.key, ""), (COLUMNAR_SHUFFLE_CODEC_BACKEND.key, ""), ("spark.hadoop.input.connect.timeout", "180000"), @@ -1258,6 +1265,14 @@ object GlutenConfig { .booleanConf .createWithDefault(false) + val COLUMNAR_MEMORY_BACKTRACE_ALLOCATION = + buildConf("spark.gluten.memory.backtrace.allocation") + .internal() + .doc("Print backtrace information for large memory allocations. This helps debugging when " + + "Spark OOM happens due to large acquire requests.") + .booleanConf + .createWithDefault(false) + val COLUMNAR_MEMORY_OVER_ACQUIRED_RATIO = buildConf("spark.gluten.memory.overAcquiredMemoryRatio") .internal()