Skip to content

Commit

Permalink
fix: Back out "fix: Fix the driver block hanging issue in serialized …
Browse files Browse the repository at this point in the history
…execution mode" (#11681)

Summary:
Pull Request resolved: #11681

Original commit changeset: c052fa3de2f4

Original Phabricator Diff: D66438632

Reviewed By: amitkdutta, bikramSingh91, weijiadeng-uber

Differential Revision: D66548972

fbshipit-source-id: c36d7230c57b7e90d44e0ce6bd43e6025229d8b4
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Nov 27, 2024
1 parent e80bf12 commit 7e4656f
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 427 deletions.
5 changes: 1 addition & 4 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1146,11 +1146,8 @@ std::string blockingReasonToString(BlockingReason reason) {
return "kYield";
case BlockingReason::kWaitForArbitration:
return "kWaitForArbitration";
default:
break;
}
VELOX_UNREACHABLE(
fmt::format("Unknown blocking reason {}", static_cast<int>(reason)));
VELOX_UNREACHABLE();
}

DriverThreadContext* driverThreadContext() {
Expand Down
105 changes: 10 additions & 95 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,9 @@ RowVectorPtr Task::next(ContinueFuture* future) {
}

VELOX_CHECK_EQ(
state_, TaskState::kRunning, "Task has already finished processing.");
static_cast<int>(state_),
static_cast<int>(kRunning),
"Task has already finished processing.");

// On first call, create the drivers.
if (driverFactories_.empty()) {
Expand Down Expand Up @@ -682,11 +684,6 @@ RowVectorPtr Task::next(ContinueFuture* future) {
}

drivers_ = std::move(drivers);
driverBlockingStates_.reserve(drivers_.size());
for (auto i = 0; i < drivers_.size(); ++i) {
driverBlockingStates_.emplace_back(
std::make_unique<DriverBlockingState>(drivers_[i].get()));
}
}

// Run drivers one at a time. If a driver blocks, continue running the other
Expand All @@ -701,10 +698,7 @@ RowVectorPtr Task::next(ContinueFuture* future) {
int runnableDrivers = 0;
int blockedDrivers = 0;
for (auto i = 0; i < numDrivers; ++i) {
// Holds a reference to driver for access as async task terminate might
// remove drivers from 'drivers_' slot.
auto driver = getDriver(i);
if (driver == nullptr) {
if (drivers_[i] == nullptr) {
// This driver has finished processing.
continue;
}
Expand All @@ -715,25 +709,16 @@ RowVectorPtr Task::next(ContinueFuture* future) {
continue;
}

ContinueFuture blockFuture = ContinueFuture::makeEmpty();
if (driverBlockingStates_[i]->blocked(&blockFuture)) {
VELOX_CHECK(blockFuture.valid());
futures[i] = std::move(blockFuture);
// This driver is still blocked.
++blockedDrivers;
continue;
}
++runnableDrivers;

ContinueFuture driverFuture = ContinueFuture::makeEmpty();
auto result = driver->next(&driverFuture);
if (result != nullptr) {
VELOX_CHECK(!driverFuture.valid());
auto result = drivers_[i]->next(&driverFuture);
if (result) {
return result;
}

if (driverFuture.valid()) {
driverBlockingStates_[i]->setDriverFuture(driverFuture);
futures[i] = std::move(driverFuture);
}

if (error()) {
Expand All @@ -743,7 +728,7 @@ RowVectorPtr Task::next(ContinueFuture* future) {

if (runnableDrivers == 0) {
if (blockedDrivers > 0) {
if (future == nullptr) {
if (!future) {
VELOX_FAIL(
"Cannot make progress as all remaining drivers are blocked and user are not expected to wait.");
} else {
Expand All @@ -753,20 +738,14 @@ RowVectorPtr Task::next(ContinueFuture* future) {
notReadyFutures.emplace_back(std::move(continueFuture));
}
}
*future = folly::collectAny(std::move(notReadyFutures)).unit();
*future = folly::collectAll(std::move(notReadyFutures)).unit();
}
}
return nullptr;
}
}
}

std::shared_ptr<Driver> Task::getDriver(uint32_t driverId) const {
VELOX_CHECK_LT(driverId, drivers_.size());
std::unique_lock<std::timed_mutex> l(mutex_);
return drivers_[driverId];
}

void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) {
facebook::velox::process::ThreadDebugInfo threadDebugInfo{
queryCtx()->queryId(), taskId_, nullptr};
Expand Down Expand Up @@ -1501,7 +1480,7 @@ void Task::noMoreSplits(const core::PlanNodeId& planNodeId) {
}

if (allFinished) {
terminate(TaskState::kFinished);
terminate(kFinished);
}
}

Expand Down Expand Up @@ -3123,68 +3102,4 @@ void Task::MemoryReclaimer::abort(
<< "Timeout waiting for task to complete during query memory aborting.";
}
}

void Task::DriverBlockingState::setDriverFuture(ContinueFuture& driverFuture) {
VELOX_CHECK(!blocked_);
{
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(promises_.empty());
VELOX_CHECK_NULL(error_);
blocked_ = true;
}
std::move(driverFuture)
.via(&folly::InlineExecutor::instance())
.thenValue(
[&, driverHolder = driver_->shared_from_this()](auto&& /* unused */) {
std::vector<std::unique_ptr<ContinuePromise>> promises;
{
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(blocked_);
VELOX_CHECK_NULL(error_);
promises = std::move(promises_);
blocked_ = false;
}
for (auto& promise : promises) {
promise->setValue();
}
})
.thenError(
folly::tag_t<std::exception>{},
[&, driverHolder = driver_->shared_from_this()](
std::exception const& e) {
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(blocked_);
VELOX_CHECK_NULL(error_);
try {
VELOX_FAIL(
"A driver future from task {} was realized with error: {}",
driver_->task()->taskId(),
e.what());
} catch (const VeloxException&) {
error_ = std::current_exception();
}
blocked_ = false;
});
}

bool Task::DriverBlockingState::blocked(ContinueFuture* future) {
VELOX_CHECK_NOT_NULL(future);
std::lock_guard<std::mutex> l(mutex_);
if (error_ != nullptr) {
std::rethrow_exception(error_);
}
if (!blocked_) {
VELOX_CHECK(promises_.empty());
return false;
}
auto [blockPromise, blockFuture] =
makeVeloxContinuePromiseContract(fmt::format(
"DriverBlockingState {} from task {}",
driver_->driverCtx()->driverId,
driver_->task()->taskId()));
*future = std::move(blockFuture);
promises_.emplace_back(
std::make_unique<ContinuePromise>(std::move(blockPromise)));
return true;
}
} // namespace facebook::velox::exec
39 changes: 2 additions & 37 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -613,13 +613,13 @@ class Task : public std::enable_shared_from_this<Task> {
/// realized when the last thread stops running for 'this'. This is used to
/// mark cancellation by the user.
ContinueFuture requestCancel() {
return terminate(TaskState::kCanceled);
return terminate(kCanceled);
}

/// Like requestCancel but sets end state to kAborted. This is for stopping
/// Tasks due to failures of other parts of the query.
ContinueFuture requestAbort() {
return terminate(TaskState::kAborted);
return terminate(kAborted);
}

void requestYield() {
Expand Down Expand Up @@ -996,8 +996,6 @@ class Task : public std::enable_shared_from_this<Task> {
// trace enabled.
void maybeInitTrace();

std::shared_ptr<Driver> getDriver(uint32_t driverId) const;

// Universally unique identifier of the task. Used to identify the task when
// calling TaskListener.
const std::string uuid_;
Expand Down Expand Up @@ -1069,39 +1067,6 @@ class Task : public std::enable_shared_from_this<Task> {

std::vector<std::unique_ptr<DriverFactory>> driverFactories_;
std::vector<std::shared_ptr<Driver>> drivers_;

// Tracks the blocking state for each driver under serialized execution mode.
class DriverBlockingState {
public:
explicit DriverBlockingState(const Driver* driver) : driver_(driver) {
VELOX_CHECK_NOT_NULL(driver_);
}

/// Sets driver future by setting the continuation callback via inline
/// executor.
void setDriverFuture(ContinueFuture& diverFuture);

/// Indicates if the associated driver is blocked or not. If blocked,
/// 'future' is set which becomes realized when the driver is unblocked.
///
/// NOTE: the function throws if the driver has encountered error.
bool blocked(ContinueFuture* future);

private:
const Driver* const driver_;

mutable std::mutex mutex_;
// Indicates if the associated driver is blocked or not.
bool blocked_{false};
// Sets the driver future error if not null.
std::exception_ptr error_{nullptr};
// Promises to fulfill when the driver is unblocked.
std::vector<std::unique_ptr<ContinuePromise>> promises_;
};

// Tracks the driver blocking state under serialized execution mode.
std::vector<std::unique_ptr<DriverBlockingState>> driverBlockingStates_;

// When Drivers are closed by the Task, there is a chance that race and/or
// bugs can cause such Drivers to be held forever, in turn holding a pointer
// to the Task making it a zombie Tasks. This vector is used to keep track of
Expand Down
28 changes: 1 addition & 27 deletions velox/exec/TaskStructs.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,8 @@ class MergeSource;
class MergeJoinSource;
struct Split;

#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
enum TaskState {
kRunning = 0,
kFinished = 1,
kCanceled = 2,
kAborted = 3,
kFailed = 4
};
#else
/// Corresponds to Presto TaskState, needed for reporting query completion.
enum class TaskState : int {
kRunning = 0,
kFinished = 1,
kCanceled = 2,
kAborted = 3,
kFailed = 4
};
#endif
enum TaskState { kRunning, kFinished, kCanceled, kAborted, kFailed };

std::string taskStateString(TaskState state);

Expand Down Expand Up @@ -155,13 +139,3 @@ struct SplitGroupState {
};

} // namespace facebook::velox::exec

template <>
struct fmt::formatter<facebook::velox::exec::TaskState>
: formatter<std::string> {
auto format(facebook::velox::exec::TaskState state, format_context& ctx)
const {
return formatter<std::string>::format(
facebook::velox::exec::taskStateString(state), ctx);
}
};
Loading

0 comments on commit 7e4656f

Please sign in to comment.