diff --git a/velox/common/memory/MemoryPool.cpp b/velox/common/memory/MemoryPool.cpp index 804d7bca3ac4..74e45b74c856 100644 --- a/velox/common/memory/MemoryPool.cpp +++ b/velox/common/memory/MemoryPool.cpp @@ -27,6 +27,11 @@ #include +DEFINE_bool( + velox_memory_pool_capacity_transfer_across_tasks, + false, + "Whether allow to memory capacity transfer between memory pools from different tasks, which might happen in use case like Spark-Gluten"); + DECLARE_bool(velox_suppress_memory_capacity_exceeding_error_message); using facebook::velox::common::testutil::TestValue; diff --git a/velox/common/memory/MemoryPool.h b/velox/common/memory/MemoryPool.h index 23422c8e9662..4110e25626d8 100644 --- a/velox/common/memory/MemoryPool.h +++ b/velox/common/memory/MemoryPool.h @@ -33,6 +33,7 @@ DECLARE_bool(velox_memory_leak_check_enabled); DECLARE_bool(velox_memory_pool_debug_enabled); +DECLARE_bool(velox_memory_pool_capacity_transfer_across_tasks); namespace facebook::velox::exec { class ParallelMemoryReclaimer; diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 8bf3bad0fca3..449a2aaece91 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -563,15 +563,19 @@ void Operator::MemoryReclaimer::enterArbitration() { } Driver* const runningDriver = driverThreadCtx->driverCtx.driver; - if (auto opDriver = ensureDriver()) { - // NOTE: the current running driver might not be the driver of the operator - // that requests memory arbitration. The reason is that an operator might - // extend the buffer allocated from the other operator either from the same - // or different drivers. But they must be from the same task. - VELOX_CHECK_EQ( - runningDriver->task()->taskId(), - opDriver->task()->taskId(), - "The current running driver and the request driver must be from the same task"); + if (!FLAGS_velox_memory_pool_capacity_transfer_across_tasks) { + if (auto opDriver = ensureDriver()) { + // NOTE: the current running driver might not be the driver of the + // operator that requests memory arbitration. The reason is that an + // operator might extend the buffer allocated from the other operator + // either from the same or different drivers. But they must be from the + // same task as the following check. User could set + // FLAGS_transferred_arbitration_allowed=true to bypass this check. + VELOX_CHECK_EQ( + runningDriver->task()->taskId(), + opDriver->task()->taskId(), + "The current running driver and the request driver must be from the same task"); + } } if (runningDriver->task()->enterSuspended(runningDriver->state()) != StopReason::kNone) { @@ -589,11 +593,13 @@ void Operator::MemoryReclaimer::leaveArbitration() noexcept { return; } Driver* const runningDriver = driverThreadCtx->driverCtx.driver; - if (auto opDriver = ensureDriver()) { - VELOX_CHECK_EQ( - runningDriver->task()->taskId(), - opDriver->task()->taskId(), - "The current running driver and the request driver must be from the same task"); + if (!FLAGS_velox_memory_pool_capacity_transfer_across_tasks) { + if (auto opDriver = ensureDriver()) { + VELOX_CHECK_EQ( + runningDriver->task()->taskId(), + opDriver->task()->taskId(), + "The current running driver and the request driver must be from the same task"); + } } runningDriver->task()->leaveSuspended(runningDriver->state()); }