From e6df239252ad1641825cb54e1b53edc3cd8cec30 Mon Sep 17 00:00:00 2001 From: Wei He Date: Thu, 2 Jan 2025 10:56:29 -0800 Subject: [PATCH] fix(runner): Fix broadcast in LocalRunner (#11923) Summary: This diff adds a unit test of LocalRunner with a broadcast node in the query plan and fixes the handling of broadcast in LocalRunner. Differential Revision: D67549347 --- .../tests/utils/DistributedPlanBuilder.cpp | 31 ++++++++++- .../exec/tests/utils/DistributedPlanBuilder.h | 6 ++- velox/exec/tests/utils/PlanBuilder.h | 11 +++- velox/runner/LocalRunner.cpp | 4 +- velox/runner/tests/LocalRunnerTest.cpp | 53 +++++++++++++++---- 5 files changed, 88 insertions(+), 17 deletions(-) diff --git a/velox/exec/tests/utils/DistributedPlanBuilder.cpp b/velox/exec/tests/utils/DistributedPlanBuilder.cpp index fddb79a6f3caf..ce9c90643efa3 100644 --- a/velox/exec/tests/utils/DistributedPlanBuilder.cpp +++ b/velox/exec/tests/utils/DistributedPlanBuilder.cpp @@ -53,7 +53,7 @@ void DistributedPlanBuilder::newFragment() { planNode_ = nullptr; } -PlanBuilder& DistributedPlanBuilder::shuffle( +PlanBuilder& DistributedPlanBuilder::shufflePartitioned( const std::vector& partitionKeys, int numPartitions, bool replicateNullsAndAny, @@ -74,7 +74,7 @@ PlanBuilder& DistributedPlanBuilder::shuffle( return *this; } -core::PlanNodePtr DistributedPlanBuilder::shuffleResult( +core::PlanNodePtr DistributedPlanBuilder::shufflePartitionedResult( const std::vector& partitionKeys, int numPartitions, bool replicateNullsAndAny, @@ -108,6 +108,33 @@ core::PlanNodePtr DistributedPlanBuilder::shuffleResult( return std::move(planNode_); } +core::PlanNodePtr DistributedPlanBuilder::shuffleBroadcastResult() { + partitionedOutputBroadcast(); + auto* output = + dynamic_cast(planNode_.get()); + VELOX_CHECK_NOT_NULL(output); + const auto producerPrefix = current_->taskPrefix; + auto result = planNode_; + newFragment(); + + VELOX_CHECK_GE(root_->stack_.size(), 2); + root_->stack_.pop_back(); + auto* consumer = root_->stack_.back(); + VELOX_CHECK_GE(consumer->current_->width, 1); + VELOX_CHECK_EQ(fragments_.back().numBroadcastDestinations, 0); + fragments_.back().numBroadcastDestinations = consumer->current_->width; + + for (auto& fragment : fragments_) { + root_->fragments_.push_back(std::move(fragment)); + } + exchange(output->outputType(), VectorSerde::Kind::kPresto); + auto* exchange = dynamic_cast(planNode_.get()); + VELOX_CHECK_NOT_NULL(exchange); + consumer->current_->inputStages.push_back( + runner::InputStage{exchange->id(), producerPrefix}); + return std::move(planNode_); +} + void DistributedPlanBuilder::gatherScans(const core::PlanNodePtr& plan) { if (auto scan = std::dynamic_pointer_cast(plan)) { current_->scans.push_back(scan); diff --git a/velox/exec/tests/utils/DistributedPlanBuilder.h b/velox/exec/tests/utils/DistributedPlanBuilder.h index ba091ef8c66a6..56c135e3fdc23 100644 --- a/velox/exec/tests/utils/DistributedPlanBuilder.h +++ b/velox/exec/tests/utils/DistributedPlanBuilder.h @@ -40,18 +40,20 @@ class DistributedPlanBuilder : public PlanBuilder { /// is only called on the root builder. std::vector fragments(); - PlanBuilder& shuffle( + PlanBuilder& shufflePartitioned( const std::vector& keys, int numPartitions, bool replicateNullsAndAny, const std::vector& outputLayout = {}) override; - core::PlanNodePtr shuffleResult( + core::PlanNodePtr shufflePartitionedResult( const std::vector& keys, int numPartitions, bool replicateNullsAndAny, const std::vector& outputLayout = {}) override; + core::PlanNodePtr shuffleBroadcastResult() override; + private: void newFragment(); diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index 12b303c55db72..63ca50797cda3 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -1221,7 +1221,7 @@ class PlanBuilder { /// In a DistributedPlanBuilder, introduces a shuffle boundary. The plan so /// far is shuffled and subsequent nodes consume the shuffle. Arguments are as /// in partitionedOutput(). - virtual PlanBuilder& shuffle( + virtual PlanBuilder& shufflePartitioned( const std::vector& keys, int numPartitions, bool replicateNullsAndAny, @@ -1232,7 +1232,7 @@ class PlanBuilder { /// In a DistributedPlanBuilder, returns an Exchange on top of the plan built /// so far and couples it to the current stage in the enclosing builder. /// Arguments are as in shuffle(). - virtual core::PlanNodePtr shuffleResult( + virtual core::PlanNodePtr shufflePartitionedResult( const std::vector& keys, int numPartitions, bool replicateNullsAndAny, @@ -1240,6 +1240,13 @@ class PlanBuilder { VELOX_UNSUPPORTED("Needs DistributedPlanBuilder"); } + /// In a DistributedPlanBuilder, returns an Exchange on top of the plan built + /// so far that ends with a broadcast PartitionedOutput node, and couples the + /// Exchange to the current stage in the enclosing builder. + virtual core::PlanNodePtr shuffleBroadcastResult() { + VELOX_UNSUPPORTED("Needs DistributedPlanBuilder"); + } + protected: // Users who create custom operators might want to extend the PlanBuilder to // customize extended plan builders. Those functions are needed in such diff --git a/velox/runner/LocalRunner.cpp b/velox/runner/LocalRunner.cpp index b16da5fdc702d..4b5dd176b2243 100644 --- a/velox/runner/LocalRunner.cpp +++ b/velox/runner/LocalRunner.cpp @@ -188,11 +188,13 @@ LocalRunner::makeStages() { 0, onError); stages_.back().push_back(task); + // Output buffers are created during Task::start(), so we must start the + // task before calling updateOutputBuffers(). + task->start(options_.numDrivers); if (fragment.numBroadcastDestinations) { // TODO: Add support for Arbitrary partition type. task->updateOutputBuffers(fragment.numBroadcastDestinations, true); } - task->start(options_.numDrivers); } } diff --git a/velox/runner/tests/LocalRunnerTest.cpp b/velox/runner/tests/LocalRunnerTest.cpp index 0320382ba8a44..d219b24568794 100644 --- a/velox/runner/tests/LocalRunnerTest.cpp +++ b/velox/runner/tests/LocalRunnerTest.cpp @@ -44,9 +44,9 @@ class LocalRunnerTest : public LocalRunnerTestBase { }; static int32_t counter2; - counter2 = 0; + counter2 = kNumRows - 1; auto customize2 = [&](const RowVectorPtr& rows) { - makeAscending(rows, counter2); + makeDescending(rows, counter2); }; rowType_ = ROW({"c0"}, {BIGINT()}); @@ -87,13 +87,15 @@ class LocalRunnerTest : public LocalRunnerTestBase { DistributedPlanBuilder rootBuilder(options, idGenerator_, pool_.get()); rootBuilder.tableScan("T", rowType_); if (numWorkers > 1) { - rootBuilder.shuffle({}, 1, false); + rootBuilder.shufflePartitioned({}, 1, false); } return std::make_shared( rootBuilder.fragments(), std::move(options)); } - MultiFragmentPlanPtr makeJoinPlan(std::string project = "c0") { + MultiFragmentPlanPtr makeJoinPlan( + std::string project = "c0", + bool broadcastBuild = false) { MultiFragmentPlan::Options options = { .queryId = "test.", .numWorkers = 4, .numDrivers = 2}; const int32_t width = 3; @@ -101,17 +103,22 @@ class LocalRunnerTest : public LocalRunnerTestBase { DistributedPlanBuilder rootBuilder(options, idGenerator_, pool_.get()); rootBuilder.tableScan("T", rowType_) .project({project}) - .shuffle({"c0"}, 3, false) + .shufflePartitioned({"c0"}, 3, false) .hashJoin( {"c0"}, {"b0"}, - DistributedPlanBuilder(rootBuilder) - .tableScan("U", rowType_) - .project({"c0 as b0"}) - .shuffleResult({"b0"}, width, false), + broadcastBuild + ? DistributedPlanBuilder(rootBuilder) + .tableScan("U", rowType_) + .project({"c0 as b0"}) + .shuffleBroadcastResult() + : DistributedPlanBuilder(rootBuilder) + .tableScan("U", rowType_) + .project({"c0 as b0"}) + .shufflePartitionedResult({"b0"}, width, false), "", {"c0", "b0"}) - .shuffle({}, 1, false) + .shufflePartitioned({}, 1, false) .finalAggregation({}, {"count(1)"}, {{BIGINT()}}); return std::make_shared( rootBuilder.fragments(), std::move(options)); @@ -125,6 +132,14 @@ class LocalRunnerTest : public LocalRunnerTestBase { counter += ints->size(); } + static void makeDescending(const RowVectorPtr& rows, int32_t& counter) { + auto ints = rows->childAt(0)->as>(); + for (auto i = 0; i < ints->size(); ++i) { + ints->set(i, counter - i); + } + counter -= ints->size(); + } + void checkScanCount(const std::string& id, int32_t numWorkers) { auto scan = makeScanPlan(id, numWorkers); auto rootPool = makeRootPool(id); @@ -184,5 +199,23 @@ TEST_F(LocalRunnerTest, scan) { checkScanCount("s2", 3); } +TEST_F(LocalRunnerTest, broadcast) { + auto plan = makeJoinPlan("c0", true); + const std::string id = "q1"; + auto rootPool = makeRootPool(id); + auto splitSourceFactory = makeSimpleSplitSourceFactory(plan); + auto localRunner = std::make_shared( + std::move(plan), makeQueryCtx(id, rootPool.get()), splitSourceFactory); + auto results = readCursor(localRunner); + auto stats = localRunner->stats(); + EXPECT_EQ(1, results.size()); + EXPECT_EQ(1, results[0]->size()); + EXPECT_EQ( + kNumRows, results[0]->childAt(0)->as>()->valueAt(0)); + results.clear(); + EXPECT_EQ(Runner::State::kFinished, localRunner->state()); + localRunner->waitForCompletion(kWaitTimeoutUs); +} + } // namespace } // namespace facebook::velox::runner