Skip to content

Commit

Permalink
fix(runner): Fix broadcast in LocalRunner (#11923)
Browse files Browse the repository at this point in the history
Summary:

This diff adds a unit test of LocalRunner with a broadcast node in the query plan and fixes the handling of broadcast in LocalRunner.

Differential Revision: D67549347
  • Loading branch information
kagamiori authored and facebook-github-bot committed Jan 2, 2025
1 parent 98932e5 commit e6df239
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 17 deletions.
31 changes: 29 additions & 2 deletions velox/exec/tests/utils/DistributedPlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void DistributedPlanBuilder::newFragment() {
planNode_ = nullptr;
}

PlanBuilder& DistributedPlanBuilder::shuffle(
PlanBuilder& DistributedPlanBuilder::shufflePartitioned(
const std::vector<std::string>& partitionKeys,
int numPartitions,
bool replicateNullsAndAny,
Expand All @@ -74,7 +74,7 @@ PlanBuilder& DistributedPlanBuilder::shuffle(
return *this;
}

core::PlanNodePtr DistributedPlanBuilder::shuffleResult(
core::PlanNodePtr DistributedPlanBuilder::shufflePartitionedResult(
const std::vector<std::string>& partitionKeys,
int numPartitions,
bool replicateNullsAndAny,
Expand Down Expand Up @@ -108,6 +108,33 @@ core::PlanNodePtr DistributedPlanBuilder::shuffleResult(
return std::move(planNode_);
}

core::PlanNodePtr DistributedPlanBuilder::shuffleBroadcastResult() {
partitionedOutputBroadcast();
auto* output =
dynamic_cast<const core::PartitionedOutputNode*>(planNode_.get());
VELOX_CHECK_NOT_NULL(output);
const auto producerPrefix = current_->taskPrefix;
auto result = planNode_;
newFragment();

VELOX_CHECK_GE(root_->stack_.size(), 2);
root_->stack_.pop_back();
auto* consumer = root_->stack_.back();
VELOX_CHECK_GE(consumer->current_->width, 1);
VELOX_CHECK_EQ(fragments_.back().numBroadcastDestinations, 0);
fragments_.back().numBroadcastDestinations = consumer->current_->width;

for (auto& fragment : fragments_) {
root_->fragments_.push_back(std::move(fragment));
}
exchange(output->outputType(), VectorSerde::Kind::kPresto);
auto* exchange = dynamic_cast<const core::ExchangeNode*>(planNode_.get());
VELOX_CHECK_NOT_NULL(exchange);
consumer->current_->inputStages.push_back(
runner::InputStage{exchange->id(), producerPrefix});
return std::move(planNode_);
}

void DistributedPlanBuilder::gatherScans(const core::PlanNodePtr& plan) {
if (auto scan = std::dynamic_pointer_cast<const core::TableScanNode>(plan)) {
current_->scans.push_back(scan);
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/tests/utils/DistributedPlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@ class DistributedPlanBuilder : public PlanBuilder {
/// is only called on the root builder.
std::vector<runner::ExecutableFragment> fragments();

PlanBuilder& shuffle(
PlanBuilder& shufflePartitioned(
const std::vector<std::string>& keys,
int numPartitions,
bool replicateNullsAndAny,
const std::vector<std::string>& outputLayout = {}) override;

core::PlanNodePtr shuffleResult(
core::PlanNodePtr shufflePartitionedResult(
const std::vector<std::string>& keys,
int numPartitions,
bool replicateNullsAndAny,
const std::vector<std::string>& outputLayout = {}) override;

core::PlanNodePtr shuffleBroadcastResult() override;

private:
void newFragment();

Expand Down
11 changes: 9 additions & 2 deletions velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -1221,7 +1221,7 @@ class PlanBuilder {
/// In a DistributedPlanBuilder, introduces a shuffle boundary. The plan so
/// far is shuffled and subsequent nodes consume the shuffle. Arguments are as
/// in partitionedOutput().
virtual PlanBuilder& shuffle(
virtual PlanBuilder& shufflePartitioned(
const std::vector<std::string>& keys,
int numPartitions,
bool replicateNullsAndAny,
Expand All @@ -1232,14 +1232,21 @@ class PlanBuilder {
/// In a DistributedPlanBuilder, returns an Exchange on top of the plan built
/// so far and couples it to the current stage in the enclosing builder.
/// Arguments are as in shuffle().
virtual core::PlanNodePtr shuffleResult(
virtual core::PlanNodePtr shufflePartitionedResult(
const std::vector<std::string>& keys,
int numPartitions,
bool replicateNullsAndAny,
const std::vector<std::string>& outputLayout = {}) {
VELOX_UNSUPPORTED("Needs DistributedPlanBuilder");
}

/// In a DistributedPlanBuilder, returns an Exchange on top of the plan built
/// so far that ends with a broadcast PartitionedOutput node, and couples the
/// Exchange to the current stage in the enclosing builder.
virtual core::PlanNodePtr shuffleBroadcastResult() {
VELOX_UNSUPPORTED("Needs DistributedPlanBuilder");
}

protected:
// Users who create custom operators might want to extend the PlanBuilder to
// customize extended plan builders. Those functions are needed in such
Expand Down
4 changes: 3 additions & 1 deletion velox/runner/LocalRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,13 @@ LocalRunner::makeStages() {
0,
onError);
stages_.back().push_back(task);
// Output buffers are created during Task::start(), so we must start the
// task before calling updateOutputBuffers().
task->start(options_.numDrivers);
if (fragment.numBroadcastDestinations) {
// TODO: Add support for Arbitrary partition type.
task->updateOutputBuffers(fragment.numBroadcastDestinations, true);
}
task->start(options_.numDrivers);
}
}

Expand Down
53 changes: 43 additions & 10 deletions velox/runner/tests/LocalRunnerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class LocalRunnerTest : public LocalRunnerTestBase {
};

static int32_t counter2;
counter2 = 0;
counter2 = kNumRows - 1;
auto customize2 = [&](const RowVectorPtr& rows) {
makeAscending(rows, counter2);
makeDescending(rows, counter2);
};

rowType_ = ROW({"c0"}, {BIGINT()});
Expand Down Expand Up @@ -87,31 +87,38 @@ class LocalRunnerTest : public LocalRunnerTestBase {
DistributedPlanBuilder rootBuilder(options, idGenerator_, pool_.get());
rootBuilder.tableScan("T", rowType_);
if (numWorkers > 1) {
rootBuilder.shuffle({}, 1, false);
rootBuilder.shufflePartitioned({}, 1, false);
}
return std::make_shared<MultiFragmentPlan>(
rootBuilder.fragments(), std::move(options));
}

MultiFragmentPlanPtr makeJoinPlan(std::string project = "c0") {
MultiFragmentPlanPtr makeJoinPlan(
std::string project = "c0",
bool broadcastBuild = false) {
MultiFragmentPlan::Options options = {
.queryId = "test.", .numWorkers = 4, .numDrivers = 2};
const int32_t width = 3;

DistributedPlanBuilder rootBuilder(options, idGenerator_, pool_.get());
rootBuilder.tableScan("T", rowType_)
.project({project})
.shuffle({"c0"}, 3, false)
.shufflePartitioned({"c0"}, 3, false)
.hashJoin(
{"c0"},
{"b0"},
DistributedPlanBuilder(rootBuilder)
.tableScan("U", rowType_)
.project({"c0 as b0"})
.shuffleResult({"b0"}, width, false),
broadcastBuild
? DistributedPlanBuilder(rootBuilder)
.tableScan("U", rowType_)
.project({"c0 as b0"})
.shuffleBroadcastResult()
: DistributedPlanBuilder(rootBuilder)
.tableScan("U", rowType_)
.project({"c0 as b0"})
.shufflePartitionedResult({"b0"}, width, false),
"",
{"c0", "b0"})
.shuffle({}, 1, false)
.shufflePartitioned({}, 1, false)
.finalAggregation({}, {"count(1)"}, {{BIGINT()}});
return std::make_shared<MultiFragmentPlan>(
rootBuilder.fragments(), std::move(options));
Expand All @@ -125,6 +132,14 @@ class LocalRunnerTest : public LocalRunnerTestBase {
counter += ints->size();
}

static void makeDescending(const RowVectorPtr& rows, int32_t& counter) {
auto ints = rows->childAt(0)->as<FlatVector<int64_t>>();
for (auto i = 0; i < ints->size(); ++i) {
ints->set(i, counter - i);
}
counter -= ints->size();
}

void checkScanCount(const std::string& id, int32_t numWorkers) {
auto scan = makeScanPlan(id, numWorkers);
auto rootPool = makeRootPool(id);
Expand Down Expand Up @@ -184,5 +199,23 @@ TEST_F(LocalRunnerTest, scan) {
checkScanCount("s2", 3);
}

TEST_F(LocalRunnerTest, broadcast) {
auto plan = makeJoinPlan("c0", true);
const std::string id = "q1";
auto rootPool = makeRootPool(id);
auto splitSourceFactory = makeSimpleSplitSourceFactory(plan);
auto localRunner = std::make_shared<LocalRunner>(
std::move(plan), makeQueryCtx(id, rootPool.get()), splitSourceFactory);
auto results = readCursor(localRunner);
auto stats = localRunner->stats();
EXPECT_EQ(1, results.size());
EXPECT_EQ(1, results[0]->size());
EXPECT_EQ(
kNumRows, results[0]->childAt(0)->as<FlatVector<int64_t>>()->valueAt(0));
results.clear();
EXPECT_EQ(Runner::State::kFinished, localRunner->state());
localRunner->waitForCompletion(kWaitTimeoutUs);
}

} // namespace
} // namespace facebook::velox::runner

0 comments on commit e6df239

Please sign in to comment.