diff --git a/velox/connectors/tpch/tests/SpeedTest.cpp b/velox/connectors/tpch/tests/SpeedTest.cpp index d212386de16e..fbf0337769c0 100644 --- a/velox/connectors/tpch/tests/SpeedTest.cpp +++ b/velox/connectors/tpch/tests/SpeedTest.cpp @@ -103,8 +103,7 @@ class TpchSpeedTest { } // Wait for the task to finish. - auto& inlineExecutor = folly::QueuedImmediateExecutor::instance(); - task->taskCompletionFuture(0).via(&inlineExecutor).wait(); + task->taskCompletionFuture().wait(); std::chrono::duration elapsed = system_clock::now() - startTime; LOG(INFO) << "Summary:"; diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 8d8fe4d47530..a7949bc3d74d 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -343,7 +343,7 @@ RowVectorPtr Driver::next(std::shared_ptr& blockingState) { // error. VELOX_CHECK( stop == StopReason::kBlock || stop == StopReason::kAtEnd || - stop == StopReason::kAlreadyTerminated); + stop == StopReason::kAlreadyTerminated || stop == StopReason::kTerminate); return result; } diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index b503ed80996e..4ce96369d4da 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -2174,7 +2174,7 @@ ContinueFuture Task::stateChangeFuture(uint64_t maxWaitMicros) { return std::move(future); } -ContinueFuture Task::taskCompletionFuture(uint64_t maxWaitMicros) { +ContinueFuture Task::taskCompletionFuture() { std::lock_guard l(mutex_); // If 'this' is running, the future is realized on timeout or when // this no longer is running. @@ -2185,9 +2185,6 @@ ContinueFuture Task::taskCompletionFuture(uint64_t maxWaitMicros) { auto [promise, future] = makeVeloxContinuePromiseContract( fmt::format("Task::taskCompletionFuture {}", taskId_)); taskCompletionPromises_.emplace_back(std::move(promise)); - if (maxWaitMicros > 0) { - return std::move(future).within(std::chrono::microseconds(maxWaitMicros)); - } return std::move(future); } @@ -2761,11 +2758,21 @@ void Task::MemoryReclaimer::abort( return; } VELOX_CHECK_EQ(task->pool()->name(), pool->name()); + task->setError(error); - const static int maxTaskAbortWaitUs = 60'000'000; // 60s - // Set timeout to zero to infinite wait until task completes. - task->taskCompletionFuture(maxTaskAbortWaitUs).wait(); - memory::MemoryReclaimer::abort(pool, error); + const static uint32_t maxTaskAbortWaitUs = 6'000'000; // 60s + if (task->taskCompletionFuture().wait( + std::chrono::microseconds(maxTaskAbortWaitUs))) { + // If task is completed within 60s wait, we can safely propagate abortion. + // Otherwise long running operators might be in the middle of processing, + // making it unsafe to force abort. In this case we let running operators + // finish by hitting operator boundary, and rely on cleanup mechanism to + // release the resource. + memory::MemoryReclaimer::abort(pool, error); + } else { + LOG(WARNING) + << "Timeout waiting for task to complete during query memory aborting."; + } } } // namespace facebook::velox::exec diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 2f5a1081a223..c6aa57233410 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -247,9 +247,8 @@ class Task : public std::enable_shared_from_this { /// Returns a future which is realized when the task is no longer in /// running state. /// If the task is not in running state at the time of call, the future is - /// immediately realized. The future is realized with an exception after - /// maxWaitMicros. A zero max wait means no timeout. - ContinueFuture taskCompletionFuture(uint64_t maxWaitMicros); + /// immediately realized. + ContinueFuture taskCompletionFuture(); /// Returns task execution error or nullptr if no error occurred. std::exception_ptr error() const { diff --git a/velox/exec/tests/DriverTest.cpp b/velox/exec/tests/DriverTest.cpp index df5986d42030..10b06d8f9562 100644 --- a/velox/exec/tests/DriverTest.cpp +++ b/velox/exec/tests/DriverTest.cpp @@ -174,8 +174,10 @@ class DriverTest : public OperatorTestBase { // To be realized either after 1s wall time or when the corresponding Task // is no longer running. auto& executor = folly::QueuedImmediateExecutor::instance(); - auto future = - tasks_.back()->taskCompletionFuture(1'000'000).via(&executor); + auto future = tasks_.back() + ->taskCompletionFuture() + .within(std::chrono::microseconds(1'000'000)) + .via(&executor); stateFutures_.emplace(threadId, std::move(future)); EXPECT_FALSE(stateFutures_.at(threadId).isReady()); @@ -450,7 +452,10 @@ TEST_F(DriverTest, error) { EXPECT_EQ(numRead, 0); EXPECT_TRUE(stateFutures_.at(0).isReady()); // Realized immediately since task not running. - EXPECT_TRUE(tasks_[0]->taskCompletionFuture(1'000'000).isReady()); + EXPECT_TRUE(tasks_[0] + ->taskCompletionFuture() + .within(std::chrono::microseconds(1'000'000)) + .isReady()); EXPECT_EQ(tasks_[0]->state(), TaskState::kFailed); } @@ -472,7 +477,10 @@ TEST_F(DriverTest, cancel) { } EXPECT_GE(numRead, 1'000'000); auto& executor = folly::QueuedImmediateExecutor::instance(); - auto future = tasks_[0]->taskCompletionFuture(1'000'000).via(&executor); + auto future = tasks_[0] + ->taskCompletionFuture() + .within(std::chrono::microseconds(1'000'000)) + .via(&executor); future.wait(); EXPECT_TRUE(stateFutures_.at(0).isReady()); @@ -525,7 +533,10 @@ TEST_F(DriverTest, slow) { // are updated some tens of instructions after this. Determinism // requires a barrier. auto& executor = folly::QueuedImmediateExecutor::instance(); - auto future = tasks_[0]->taskCompletionFuture(1'000'000).via(&executor); + auto future = tasks_[0] + ->taskCompletionFuture() + .within(std::chrono::microseconds(1'000'000)) + .via(&executor); future.wait(); // Note that the driver count drops after the last thread stops and // realizes the future. @@ -561,7 +572,8 @@ TEST_F(DriverTest, pause) { readResults(params, ResultOperation::kPause, 370'000'000, &numRead); // Each thread will fully read the 1M rows in values. EXPECT_EQ(numRead, 10 * hits); - auto stateFuture = tasks_[0]->taskCompletionFuture(100'000'000); + auto stateFuture = tasks_[0]->taskCompletionFuture().within( + std::chrono::microseconds(100'000'000)); auto& executor = folly::QueuedImmediateExecutor::instance(); auto state = std::move(stateFuture).via(&executor); state.wait(); diff --git a/velox/exec/tests/ExchangeFuzzer.cpp b/velox/exec/tests/ExchangeFuzzer.cpp index ff3a24da3de9..c6076778a327 100644 --- a/velox/exec/tests/ExchangeFuzzer.cpp +++ b/velox/exec/tests/ExchangeFuzzer.cpp @@ -182,7 +182,7 @@ class ExchangeFuzzer : public VectorTestBase { // cleaning up the query if any portition of it fails. for (const auto& otherTask : tasks) { auto* taskPtr = otherTask.get(); - otherTask->taskCompletionFuture(0) + otherTask->taskCompletionFuture() .via(executor_.get()) .thenValue([&tasks, taskPtr](auto) { VELOX_CHECK(!taskPtr->isRunning()); diff --git a/velox/exec/tests/LocalPartitionTest.cpp b/velox/exec/tests/LocalPartitionTest.cpp index 8c4ad45f9d80..86549b636a5a 100644 --- a/velox/exec/tests/LocalPartitionTest.cpp +++ b/velox/exec/tests/LocalPartitionTest.cpp @@ -78,7 +78,9 @@ class LocalPartitionTest : public HiveConnectorTestBase { exec::TaskState expected) { if (task->state() != expected) { auto& executor = folly::QueuedImmediateExecutor::instance(); - auto future = task->taskCompletionFuture(1'000'000).via(&executor); + auto future = task->taskCompletionFuture() + .within(std::chrono::microseconds(1'000'000)) + .via(&executor); future.wait(); EXPECT_EQ(expected, task->state()); } diff --git a/velox/exec/tests/PartitionedOutputTest.cpp b/velox/exec/tests/PartitionedOutputTest.cpp index 470fa517f200..3d2ca6f834f8 100644 --- a/velox/exec/tests/PartitionedOutputTest.cpp +++ b/velox/exec/tests/PartitionedOutputTest.cpp @@ -125,7 +125,9 @@ TEST_F(PartitionedOutputTest, flush) { const auto taskWaitUs = std::chrono::duration_cast( std::chrono::seconds{10}) .count(); - auto future = task->taskCompletionFuture(taskWaitUs).via(executor_.get()); + auto future = task->taskCompletionFuture() + .within(std::chrono::microseconds(taskWaitUs)) + .via(executor_.get()); future.wait(); ASSERT_TRUE(waitForTaskDriversToFinish(task.get(), taskWaitUs)); diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 7963c9ad2a16..dc049e032420 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -280,8 +280,8 @@ class ExternalBlocker { }; // A test node that normally just re-project/passthrough the output from input -// When the node is blocked by external even (via externalBlocker), the operator -// will signal kBlocked. The pipeline can ONLY proceed again when it is +// When the node is blocked by external event (via externalBlocker), the +// operator will signal kBlocked. The pipeline can ONLY proceed again when it is // unblocked externally. class TestExternalBlockableNode : public core::PlanNode { public: @@ -1616,6 +1616,127 @@ DEBUG_ONLY_TEST_F(TaskTest, resumeAfterTaskFinish) { waitForAllTasksToBeDeleted(); } +DEBUG_ONLY_TEST_F( + TaskTest, + singleThreadedLongRunningOperatorInTaskReclaimerAbort) { + auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row; }), + }); + + // Filter + Project. + auto plan = + PlanBuilder().values({data, data, data}).project({"c0"}).planFragment(); + + auto queryCtx = std::make_shared(driverExecutor_.get()); + + auto blockingTask = Task::create("blocking.task.0", plan, 0, queryCtx); + + // Before we block, we expect `next` to get data normally. + EXPECT_NE(nullptr, blockingTask->next()); + + // Now, we want to block the pipeline by blocking Values operator. We expect + // `next` to return null. The `future` should be updated for the caller to + // wait before calling next() again + folly::EventCount getOutputWait; + std::atomic_bool getOutputWaitFlag{false}; + folly::EventCount abortWait; + std::atomic_bool abortWaitFlag{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Values::getOutput", + std::function([&](Values* /*unused*/) { + abortWaitFlag = true; + abortWait.notify(); + getOutputWait.await([&]() { return getOutputWaitFlag.load(); }); + })); + + const std::string abortErrorMessage("Synthetic Exception"); + auto thread = std::thread( + [&]() { VELOX_ASSERT_THROW(blockingTask->next(), abortErrorMessage); }); + + try { + VELOX_FAIL(abortErrorMessage); + } catch (VeloxException& e) { + abortWait.await([&]() { return abortWaitFlag.load(); }); + blockingTask->pool()->abort(std::current_exception()); + } + + waitForTaskCompletion(blockingTask.get(), 5'000'000); + + // We expect that abort does not trigger the operator abort by checking if the + // memory pool memory has been released or not. + blockingTask->pool()->visitChildren([](auto* child) { + if (child->isLeaf()) { + EXPECT_EQ(child->stats().numReleases, 0); + } + return true; + }); + + getOutputWaitFlag = true; + getOutputWait.notify(); + + thread.join(); + + blockingTask->taskCompletionFuture().wait(); + blockingTask->pool()->visitChildren([](auto* child) { + if (child->isLeaf()) { + EXPECT_EQ(child->stats().numReleases, 1); + } + return true; + }); +} + +DEBUG_ONLY_TEST_F(TaskTest, longRunningOperatorInTaskReclaimerAbort) { + auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row; }), + }); + folly::EventCount getOutputWait; + std::atomic_bool getOutputWaitFlag{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Values::getOutput", + std::function([&](Values* /*unused*/) { + getOutputWait.await([&]() { return getOutputWaitFlag.load(); }); + })); + + // Project only dummy plan + auto plan = + PlanBuilder().values({data, data, data}).project({"c0"}).planFragment(); + + auto queryCtx = std::make_shared(driverExecutor_.get()); + + auto blockingTask = Task::create("blocking.task.0", plan, 0, queryCtx); + + blockingTask->start(4, 1); + const std::string abortErrorMessage("Synthetic Exception"); + try { + VELOX_FAIL(abortErrorMessage); + } catch (VeloxException& e) { + blockingTask->pool()->abort(std::current_exception()); + } + waitForTaskCompletion(blockingTask.get()); + + // We expect that arbitration does not trigger release of the operator pools. + blockingTask->pool()->visitChildren([](auto* child) { + if (child->isLeaf()) { + EXPECT_EQ(child->stats().numReleases, 0); + } + return true; + }); + + getOutputWaitFlag = true; + getOutputWait.notify(); + + VELOX_ASSERT_THROW( + std::rethrow_exception(blockingTask->error()), abortErrorMessage); + + blockingTask->taskCompletionFuture().wait(); + blockingTask->pool()->visitChildren([](auto* child) { + if (child->isLeaf()) { + EXPECT_EQ(child->stats().numReleases, 1); + } + return true; + }); +} + DEBUG_ONLY_TEST_F(TaskTest, taskReclaimStats) { const auto data = makeRowVector({ makeFlatVector(50, folly::identity), diff --git a/velox/exec/tests/utils/Cursor.cpp b/velox/exec/tests/utils/Cursor.cpp index 936a3b663811..a49a27ddfb0d 100644 --- a/velox/exec/tests/utils/Cursor.cpp +++ b/velox/exec/tests/utils/Cursor.cpp @@ -277,7 +277,9 @@ class MultiThreadedTaskCursor : public TaskCursorBase { if (task_->error()) { // Wait for the task to finish (there's' a small period of time between // when the error is set on the Task and terminate is called). - task_->taskCompletionFuture(1'000'000).wait(); + task_->taskCompletionFuture() + .within(std::chrono::microseconds(1'000'000)) + .wait(); // Wait for all task drivers to finish to avoid destroying the executor_ // before task_ finished using it and causing a crash. diff --git a/velox/exec/tests/utils/QueryAssertions.cpp b/velox/exec/tests/utils/QueryAssertions.cpp index ba5bc922ea08..c271b0d4a49f 100644 --- a/velox/exec/tests/utils/QueryAssertions.cpp +++ b/velox/exec/tests/utils/QueryAssertions.cpp @@ -1423,7 +1423,9 @@ bool waitForTaskStateChange( // Wait for task to transition to finished state. if (task->state() != state) { auto& executor = folly::QueuedImmediateExecutor::instance(); - auto future = task->taskCompletionFuture(maxWaitMicros).via(&executor); + auto future = task->taskCompletionFuture() + .within(std::chrono::microseconds(maxWaitMicros)) + .via(&executor); future.wait(); }