Skip to content

Commit

Permalink
Add gflag to allow growing buffers that were created in different task (
Browse files Browse the repository at this point in the history
#10768)

Summary:
Add a global bool flag `FLAGS_transferred_arbitration_allowed` (by default `false`), when it is set to `true`, Velox will allow growing a buffer that was created in a different task.

This could fix Gluten issue apache/incubator-gluten#6864.

Pull Request resolved: #10768

Reviewed By: tanjialiang

Differential Revision: D61457781

Pulled By: xiaoxmeng

fbshipit-source-id: f8b1b77cf9d5a1d913a77f880b592e2600e8c716
  • Loading branch information
zhztheplayer authored and facebook-github-bot committed Aug 19, 2024
1 parent 54b16e0 commit 676b170
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 14 deletions.
5 changes: 5 additions & 0 deletions velox/common/memory/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@

#include <re2/re2.h>

DEFINE_bool(
velox_memory_pool_capacity_transfer_across_tasks,
false,
"Whether allow to memory capacity transfer between memory pools from different tasks, which might happen in use case like Spark-Gluten");

DECLARE_bool(velox_suppress_memory_capacity_exceeding_error_message);

using facebook::velox::common::testutil::TestValue;
Expand Down
1 change: 1 addition & 0 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

DECLARE_bool(velox_memory_leak_check_enabled);
DECLARE_bool(velox_memory_pool_debug_enabled);
DECLARE_bool(velox_memory_pool_capacity_transfer_across_tasks);

namespace facebook::velox::exec {
class ParallelMemoryReclaimer;
Expand Down
34 changes: 20 additions & 14 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -563,15 +563,19 @@ void Operator::MemoryReclaimer::enterArbitration() {
}

Driver* const runningDriver = driverThreadCtx->driverCtx.driver;
if (auto opDriver = ensureDriver()) {
// NOTE: the current running driver might not be the driver of the operator
// that requests memory arbitration. The reason is that an operator might
// extend the buffer allocated from the other operator either from the same
// or different drivers. But they must be from the same task.
VELOX_CHECK_EQ(
runningDriver->task()->taskId(),
opDriver->task()->taskId(),
"The current running driver and the request driver must be from the same task");
if (!FLAGS_velox_memory_pool_capacity_transfer_across_tasks) {
if (auto opDriver = ensureDriver()) {
// NOTE: the current running driver might not be the driver of the
// operator that requests memory arbitration. The reason is that an
// operator might extend the buffer allocated from the other operator
// either from the same or different drivers. But they must be from the
// same task as the following check. User could set
// FLAGS_transferred_arbitration_allowed=true to bypass this check.
VELOX_CHECK_EQ(
runningDriver->task()->taskId(),
opDriver->task()->taskId(),
"The current running driver and the request driver must be from the same task");
}
}
if (runningDriver->task()->enterSuspended(runningDriver->state()) !=
StopReason::kNone) {
Expand All @@ -589,11 +593,13 @@ void Operator::MemoryReclaimer::leaveArbitration() noexcept {
return;
}
Driver* const runningDriver = driverThreadCtx->driverCtx.driver;
if (auto opDriver = ensureDriver()) {
VELOX_CHECK_EQ(
runningDriver->task()->taskId(),
opDriver->task()->taskId(),
"The current running driver and the request driver must be from the same task");
if (!FLAGS_velox_memory_pool_capacity_transfer_across_tasks) {
if (auto opDriver = ensureDriver()) {
VELOX_CHECK_EQ(
runningDriver->task()->taskId(),
opDriver->task()->taskId(),
"The current running driver and the request driver must be from the same task");
}
}
runningDriver->task()->leaveSuspended(runningDriver->state());
}
Expand Down

0 comments on commit 676b170

Please sign in to comment.