From 663dc36b3398dc831ba11c300c089d80b7053309 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Sat, 7 Dec 2024 21:55:08 -0800 Subject: [PATCH] feat: Make skewed partition balancer thread-safe and share among scale writer partitioners (#11786) Summary: Make skewed partition balancer thread-safe and share it with all the scale writer local partitioners. Skewed partition balance is created by driver factory init and saved in the associated exchange plan node's state. This will help further reduces the unnecessary written files and make memory usage more efficient as all the local scale writer partitioners have the consistent partition assignment views Differential Revision: D66628614 --- velox/common/base/SkewedPartitionBalancer.cpp | 85 +++++++++++++---- velox/common/base/SkewedPartitionBalancer.h | 62 ++++++++++--- .../tests/SkewedPartitionBalancerTest.cpp | 92 ++++++++++++++++++- velox/exec/Driver.h | 11 ++- velox/exec/ScaleWriterLocalPartition.cpp | 22 +++-- velox/exec/ScaleWriterLocalPartition.h | 2 +- velox/exec/Task.cpp | 50 ++++++++-- velox/exec/Task.h | 10 +- velox/exec/TaskStructs.h | 7 +- 9 files changed, 277 insertions(+), 64 deletions(-) diff --git a/velox/common/base/SkewedPartitionBalancer.cpp b/velox/common/base/SkewedPartitionBalancer.cpp index 0442e2f8c5146..21693bbe01aca 100644 --- a/velox/common/base/SkewedPartitionBalancer.cpp +++ b/velox/common/base/SkewedPartitionBalancer.cpp @@ -16,6 +16,10 @@ #include "velox/common/base/SkewedPartitionBalancer.h" +#include "velox/common/testutil/TestValue.h" + +using facebook::velox::common::testutil::TestValue; + namespace facebook::velox::common { SkewedPartitionRebalancer::SkewedPartitionRebalancer( uint32_t partitionCount, @@ -28,40 +32,74 @@ SkewedPartitionRebalancer::SkewedPartitionRebalancer( minProcessedBytesRebalanceThresholdPerPartition), minProcessedBytesRebalanceThreshold_(std::max( minProcessedBytesRebalanceThreshold, - minProcessedBytesRebalanceThresholdPerPartition_)) { + minProcessedBytesRebalanceThresholdPerPartition_)), + partitionRowCount_(partitionCount_), + partitionAssignments_(partitionCount_) { VELOX_CHECK_GT(partitionCount_, 0); VELOX_CHECK_GT(taskCount_, 0); - partitionRowCount_.resize(partitionCount_, 0); partitionBytes_.resize(partitionCount_, 0); partitionBytesAtLastRebalance_.resize(partitionCount_, 0); partitionBytesSinceLastRebalancePerTask_.resize(partitionCount_, 0); estimatedTaskBytesSinceLastRebalance_.resize(taskCount_, 0); - partitionAssignments_.resize(partitionCount_); - // Assigns one task for each partition intitially. for (auto partition = 0; partition < partitionCount_; ++partition) { - const uint32_t taskId = partition % taskCount; - partitionAssignments_[partition].emplace_back(taskId); + const uint32_t taskId = partition % taskCount_; + partitionAssignments_[partition].addTaskId(taskId); } } +void SkewedPartitionRebalancer::PartitionAssignment::addTaskId( + uint32_t taskId) { + std::unique_lock guard{lock_}; + taskIds_.push_back(taskId); +} + +uint32_t SkewedPartitionRebalancer::PartitionAssignment::nextTaskId( + uint64_t index) const { + std::shared_lock guard{lock_}; + const auto taskIndex = index % taskIds_.size(); + return taskIds_[taskIndex]; +} + +uint32_t SkewedPartitionRebalancer::PartitionAssignment::size() const { + std::shared_lock guard{lock_}; + return taskIds_.size(); +} + +const std::vector +SkewedPartitionRebalancer::PartitionAssignment::taskIds() const { + std::shared_lock guard{lock_}; + return taskIds_; +} + void SkewedPartitionRebalancer::rebalance() { - if (shouldRebalance()) { - rebalancePartitions(); + const int64_t processedBytes = processedBytes_.load(); + if (shouldRebalance(processedBytes)) { + rebalancePartitions(processedBytes); } } -bool SkewedPartitionRebalancer::shouldRebalance() const { - VELOX_CHECK_GE(processedBytes_, processedBytesAtLastRebalance_); - return (processedBytes_ - processedBytesAtLastRebalance_) >= +bool SkewedPartitionRebalancer::shouldRebalance(int64_t processedBytes) const { + return (processedBytes - processedBytesAtLastRebalance_) >= minProcessedBytesRebalanceThreshold_; } -void SkewedPartitionRebalancer::rebalancePartitions() { - VELOX_DCHECK(shouldRebalance()); - ++stats_.numBalanceTriggers; +void SkewedPartitionRebalancer::rebalancePartitions(int64_t processedBytes) { + if (rebalancing_.exchange(true)) { + return; + } + + SCOPE_EXIT { + VELOX_CHECK(rebalancing_); + rebalancing_ = false; + }; + ++numBalanceTriggers_; + + TestValue::adjust( + "facebook::velox::common::SkewedPartitionRebalancer::rebalancePartitions", + this); // Updates the processed bytes for each partition. calculatePartitionProcessedBytes(); @@ -83,7 +121,7 @@ void SkewedPartitionRebalancer::rebalancePartitions() { taskMaxPartitions{taskCount_}; for (auto partition = 0; partition < partitionCount_; ++partition) { auto& taskAssignments = partitionAssignments_[partition]; - for (uint32_t taskId : taskAssignments) { + for (uint32_t taskId : taskAssignments.taskIds()) { auto& taskQueue = taskMaxPartitions[taskId]; taskQueue.addOrUpdate( partition, partitionBytesSinceLastRebalancePerTask_[partition]); @@ -102,7 +140,7 @@ void SkewedPartitionRebalancer::rebalancePartitions() { } rebalanceBasedOnTaskSkewness(maxTasks, minTasks, taskMaxPartitions); - processedBytesAtLastRebalance_ = processedBytes_; + processedBytesAtLastRebalance_.store(processedBytes); } void SkewedPartitionRebalancer::rebalanceBasedOnTaskSkewness( @@ -159,7 +197,7 @@ void SkewedPartitionRebalancer::rebalanceBasedOnTaskSkewness( } } - stats_.numScaledPartitions += scaledPartitions.size(); + numScaledPartitions_ += scaledPartitions.size(); } bool SkewedPartitionRebalancer::rebalancePartition( @@ -168,13 +206,13 @@ bool SkewedPartitionRebalancer::rebalancePartition( IndexedPriorityQueue& maxTasks, IndexedPriorityQueue& minTasks) { auto& taskAssignments = partitionAssignments_[rebalancePartition]; - for (auto taskId : taskAssignments) { + for (auto taskId : taskAssignments.taskIds()) { if (taskId == targetTaskId) { return false; } } - taskAssignments.push_back(targetTaskId); + taskAssignments.addTaskId(targetTaskId); VELOX_CHECK_GT(partitionAssignments_[rebalancePartition].size(), 1); const auto newTaskCount = taskAssignments.size(); @@ -182,7 +220,7 @@ bool SkewedPartitionRebalancer::rebalancePartition( // Since a partition is rebalanced from max to min skewed tasks, // decrease the priority of max taskBucket as well as increase the priority // of min taskBucket. - for (uint32_t taskId : taskAssignments) { + for (uint32_t taskId : taskAssignments.taskIds()) { if (taskId == targetTaskId) { estimatedTaskBytesSinceLastRebalance_[taskId] += (partitionBytesSinceLastRebalancePerTask_[rebalancePartition] * @@ -208,6 +246,13 @@ void SkewedPartitionRebalancer::calculatePartitionProcessedBytes() { for (auto partition = 0; partition < partitionCount_; ++partition) { totalPartitionRowCount += partitionRowCount_[partition]; } + if (totalPartitionRowCount <= 0) { + LOG(ERROR) << "processedBytes " << processedBytes_ + << " processedBytesAtLastRebalance_ " + << minProcessedBytesRebalanceThreshold_ + << " minProcessedBytesRebalanceThreshold_ " + << minProcessedBytesRebalanceThreshold_; + } VELOX_CHECK_GT(totalPartitionRowCount, 0); for (auto partition = 0; partition < partitionCount_; ++partition) { diff --git a/velox/common/base/SkewedPartitionBalancer.h b/velox/common/base/SkewedPartitionBalancer.h index 6c3c00a7cc68e..a9ce3a9ea5c67 100644 --- a/velox/common/base/SkewedPartitionBalancer.h +++ b/velox/common/base/SkewedPartitionBalancer.h @@ -27,7 +27,7 @@ class SkewedPartitionRebalancerTestHelper; /// tasks to busy partition measured by processed data size. This is used by /// local partition to scale table writers for now. /// -/// NOTE: this object is not thread-safe. +/// NOTE: this object is thread-safe. class SkewedPartitionRebalancer { public: /// 'partitionCount' is the number of partitions to process. 'taskCount' is @@ -46,14 +46,18 @@ class SkewedPartitionRebalancer { uint64_t minProcessedBytesRebalanceThresholdPerPartition, uint64_t minProcessedBytesRebalanceThreshold); + ~SkewedPartitionRebalancer() { + VELOX_CHECK(!rebalancing_); + } + /// Invoked to rebalance the partition assignments if applicable. void rebalance(); /// Gets the assigned task id for a given 'partition'. 'index' is used to /// choose one of multiple assigned tasks in a round-robin order. uint32_t getTaskId(uint32_t partition, uint64_t index) const { - const auto& taskList = partitionAssignments_[partition]; - return taskList[index % taskList.size()]; + auto& taskList = partitionAssignments_[partition]; + return taskList.nextTaskId(index); } /// Adds the processed partition row count. This is used to estimate the @@ -64,11 +68,19 @@ class SkewedPartitionRebalancer { } /// Adds the total processed bytes from all the partitions. - void addProcessedBytes(long bytes) { + void addProcessedBytes(int64_t bytes) { VELOX_CHECK_GT(bytes, 0); processedBytes_ += bytes; } + uint32_t numPartitions() const { + return partitionCount_; + } + + uint32_t numTasks() const { + return taskCount_; + } + /// The rebalancer internal stats. struct Stats { /// The number of times that triggers rebalance. @@ -85,13 +97,15 @@ class SkewedPartitionRebalancer { }; Stats stats() const { - return stats_; + return Stats{ + .numBalanceTriggers = numBalanceTriggers_.load(), + .numScaledPartitions = numScaledPartitions_.load()}; } private: - bool shouldRebalance() const; + bool shouldRebalance(int64_t processedBytes) const; - void rebalancePartitions(); + void rebalancePartitions(int64_t processedBytes); // Calculates the partition processed data size based on the number of // processed rows and the averaged row size. @@ -138,13 +152,16 @@ class SkewedPartitionRebalancer { const uint64_t minProcessedBytesRebalanceThreshold_; // The accumulated number of rows processed by each partition. - std::vector partitionRowCount_; + std::vector partitionRowCount_; + + // Indicates if the rebalancer is running or not. + std::atomic_bool rebalancing_{false}; // The accumulated number of bytes processed by all the partitions. - uint64_t processedBytes_{0}; + std::atomic_int64_t processedBytes_{0}; // 'processedBytes_' at the last rebalance. It is used to calculate the // processed bytes changes since the last rebalance. - uint64_t processedBytesAtLastRebalance_{0}; + std::atomic_int64_t processedBytesAtLastRebalance_{0}; // The accumulated number of bytes processed by each partition. std::vector partitionBytes_; // 'partitionBytes_' at the last rebalance. It is used to calculate the @@ -157,10 +174,29 @@ class SkewedPartitionRebalancer { // The estimated task processed bytes since the last rebalance. std::vector estimatedTaskBytesSinceLastRebalance_; - // The assigned task id list for each partition. - std::vector> partitionAssignments_; + // The assigned task id assignment for a partition. + class PartitionAssignment { + public: + PartitionAssignment() = default; + + void addTaskId(uint32_t taskId); + + uint32_t nextTaskId(uint64_t index) const; + + const std::vector taskIds() const; + + uint32_t size() const; + + private: + mutable folly::SharedMutex lock_; + std::vector taskIds_; + }; + std::vector partitionAssignments_; - Stats stats_; + /// The number of times that triggers rebalance. + /// The number of times that a scaled partition processing. + std::atomic_uint64_t numBalanceTriggers_{0}; + std::atomic_uint32_t numScaledPartitions_{0}; friend class test::SkewedPartitionRebalancerTestHelper; }; diff --git a/velox/common/base/tests/SkewedPartitionBalancerTest.cpp b/velox/common/base/tests/SkewedPartitionBalancerTest.cpp index 2b50892631397..9485cf45868e9 100644 --- a/velox/common/base/tests/SkewedPartitionBalancerTest.cpp +++ b/velox/common/base/tests/SkewedPartitionBalancerTest.cpp @@ -19,11 +19,19 @@ #include #include "folly/Random.h" +#include "folly/experimental/EventCount.h" #include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/testutil/TestValue.h" + +using namespace facebook::velox::common::testutil; namespace facebook::velox::common::test { class SkewedPartitionRebalancerTestHelper { public: + static void SetUpTestCase() { + TestValue::enable(); + } + explicit SkewedPartitionRebalancerTestHelper( SkewedPartitionRebalancer* balancer) : balancer_(balancer) { @@ -33,9 +41,8 @@ class SkewedPartitionRebalancerTestHelper { void verifyPartitionAssignment( uint32_t partition, const std::set& expectedAssignedTasks) const { - const std::set assignedTasks( - balancer_->partitionAssignments_[partition].begin(), - balancer_->partitionAssignments_[partition].end()); + const auto taskIds = balancer_->partitionAssignments_[partition].taskIds(); + const std::set assignedTasks(taskIds.begin(), taskIds.end()); ASSERT_EQ(assignedTasks, expectedAssignedTasks) << "\nExpected: " << folly::join(",", expectedAssignedTasks) << "\nActual: " << folly::join(",", assignedTasks); @@ -59,7 +66,8 @@ class SkewedPartitionRebalancerTestHelper { } bool shouldRebalance() const { - return balancer_->shouldRebalance(); + const int64_t processedBytes = balancer_->processedBytes_; + return balancer_->shouldRebalance(processedBytes); } private: @@ -88,6 +96,8 @@ TEST_F(SkewedPartitionRebalancerTest, basic) { helper.verifyPartitionAssignment(i, {i % helper.taskCount()}); } ASSERT_EQ(balancer->stats(), SkewedPartitionRebalancer::Stats{}); + ASSERT_EQ(balancer->numPartitions(), 32); + ASSERT_EQ(balancer->numTasks(), 4); ASSERT_EQ( balancer->stats().toString(), "numBalanceTriggers 0, numScaledPartitions 0"); @@ -316,6 +326,35 @@ TEST_F(SkewedPartitionRebalancerTest, skewTasksCondition) { } } +DEBUG_ONLY_TEST_F(SkewedPartitionRebalancerTest, serializedRebalanceExecution) { + auto balancer = createBalancer(32, 4, 128, 256); + SkewedPartitionRebalancerTestHelper helper(balancer.get()); + folly::EventCount rebalanceWait; + std::atomic_bool rebalanceWaitFlag{true}; + SCOPED_TESTVALUE_SET( + "facebook::velox::common::SkewedPartitionRebalancer::rebalancePartitions", + std::function( + [&](SkewedPartitionRebalancer*) { + rebalanceWait.await([&] { return !rebalanceWaitFlag.load(); }); + })); + + for (int partition = 0; partition < helper.taskCount(); ++partition) { + balancer->addProcessedBytes(1000); + balancer->addPartitionRowCount(partition, partition == 0 ? 20 : 1); + } + + std::thread rebalanceThread([&]() { balancer->rebalance(); }); + + balancer->rebalance(); + + rebalanceWaitFlag = false; + rebalanceWait.notifyAll(); + + rebalanceThread.join(); + ASSERT_EQ(balancer->stats().numBalanceTriggers, 1); + ASSERT_GT(balancer->stats().numScaledPartitions, 0); +} + TEST_F(SkewedPartitionRebalancerTest, error) { auto balancer = createBalancer(32, 4, 128, 256); VELOX_ASSERT_THROW(balancer->addProcessedBytes(0), ""); @@ -325,7 +364,7 @@ TEST_F(SkewedPartitionRebalancerTest, error) { VELOX_ASSERT_THROW(createBalancer(0, 4, 0, 0), ""); } -TEST_F(SkewedPartitionRebalancerTest, fuzz) { +TEST_F(SkewedPartitionRebalancerTest, singleThreadFuzz) { std::mt19937 rng{100}; for (int taskCount = 1; taskCount <= 10; ++taskCount) { const uint64_t rebalanceThreshold = folly::Random::rand32(128, rng); @@ -355,4 +394,47 @@ TEST_F(SkewedPartitionRebalancerTest, fuzz) { } } } + +TEST_F(SkewedPartitionRebalancerTest, concurrentFuzz) { + for (int numProducers = 1; numProducers <= 10; ++numProducers) { + std::mt19937 rng{100}; + const uint64_t rebalanceThreshold = folly::Random::rand32(128, rng); + const uint64_t perPartitionRebalanceThreshold = + folly::Random::rand32(rebalanceThreshold / 2, rng); + for (int taskCount = 1; taskCount <= 10; ++taskCount) { + auto balancer = createBalancer( + 32, taskCount, perPartitionRebalanceThreshold, rebalanceThreshold); + SkewedPartitionRebalancerTestHelper helper(balancer.get()); + std::vector threads; + for (int producer = 0; producer < numProducers; ++producer) { + threads.emplace_back([&]() { + std::mt19937 localRng{200}; + for (int iteration = 0; iteration < 1'000; ++iteration) { + SCOPED_TRACE(fmt::format( + "taskCount {}, iteration {}", taskCount, iteration)); + const uint64_t processedBytes = + 1 + folly::Random::rand32(512, localRng); + balancer->addProcessedBytes(processedBytes); + const auto numPartitons = folly::Random::rand32(32, localRng); + for (auto i = 0; i < numPartitons; ++i) { + const auto partition = folly::Random::rand32(32, localRng); + const auto numRows = 1 + folly::Random::rand32(32, localRng); + balancer->addPartitionRowCount(partition, numRows); + } + balancer->rebalance(); + for (int round = 0; round < 10; ++round) { + for (int partition = 0; partition < helper.partitionCount(); + ++partition) { + ASSERT_LT(balancer->getTaskId(partition, round), taskCount); + } + } + } + }); + } + for (auto& thread : threads) { + thread.join(); + } + } + } +} } // namespace facebook::velox::common::test diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index 9bd25072cc427..e7ae1782a4a0c 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -688,16 +688,17 @@ struct DriverFactory { return std::nullopt; } - /// Returns LocalPartition plan node ID if the pipeline gets data from a - /// local exchange. - std::optional needsLocalExchange() const { + /// Returns true if the pipeline gets data from a local exchange. The function + /// sets plan node in 'planNode'. + bool needsLocalExchange(core::PlanNodePtr& planNode) const { VELOX_CHECK(!planNodes.empty()); if (auto exchangeNode = std::dynamic_pointer_cast( planNodes.front())) { - return exchangeNode->id(); + planNode = exchangeNode; + return true; } - return std::nullopt; + return false; } /// Returns plan node IDs for which Hash Join Bridges must be created based diff --git a/velox/exec/ScaleWriterLocalPartition.cpp b/velox/exec/ScaleWriterLocalPartition.cpp index c4995d0975281..46ef91a31bf74 100644 --- a/velox/exec/ScaleWriterLocalPartition.cpp +++ b/velox/exec/ScaleWriterLocalPartition.cpp @@ -32,14 +32,16 @@ ScaleWriterPartitioningLocalPartition::ScaleWriterPartitioningLocalPartition( ctx->queryConfig().scaleWriterMaxPartitionsPerWriter()), numTablePartitions_(maxTablePartitionsPerWriter_ * numPartitions_), queryPool_(pool()->root()), - tablePartitionRebalancer_(std::make_unique( - numTablePartitions_, - numPartitions_, - ctx->queryConfig() - .scaleWriterMinPartitionProcessedBytesRebalanceThreshold(), - ctx->queryConfig() - .scaleWriterMinProcessedBytesRebalanceThreshold())) { + tablePartitionRebalancer_( + ctx->task->getScaleWriterPartitionBalancerLocked( + ctx->splitGroupId, + planNodeId())) { VELOX_CHECK_GT(maxTablePartitionsPerWriter_, 0); + VELOX_CHECK_NOT_NULL(tablePartitionRebalancer_); + + VELOX_CHECK_EQ( + numTablePartitions_, tablePartitionRebalancer_->numPartitions()); + VELOX_CHECK_EQ(numPartitions_, tablePartitionRebalancer_->numTasks()); writerAssignmentCounts_.resize(numPartitions_, 0); tablePartitionRowCounts_.resize(numTablePartitions_, 0); @@ -222,6 +224,12 @@ uint32_t ScaleWriterPartitioningLocalPartition::getNextWriterId( void ScaleWriterPartitioningLocalPartition::close() { LocalPartition::close(); + if (operatorCtx_->driverCtx()->driverId != 0) { + return; + } + + // The first driver operator reports the shared table partition rebalancer + // stats. const auto scaleStats = tablePartitionRebalancer_->stats(); auto lockedStats = stats_.wlock(); if (scaleStats.numScaledPartitions != 0) { diff --git a/velox/exec/ScaleWriterLocalPartition.h b/velox/exec/ScaleWriterLocalPartition.h index 03b16517c9f16..3fba1e781d260 100644 --- a/velox/exec/ScaleWriterLocalPartition.h +++ b/velox/exec/ScaleWriterLocalPartition.h @@ -66,7 +66,7 @@ class ScaleWriterPartitioningLocalPartition : public LocalPartition { memory::MemoryPool* const queryPool_; // The skewed partition balancer for writer scaling. - const std::unique_ptr tablePartitionRebalancer_; + const std::shared_ptr tablePartitionRebalancer_; std::shared_ptr memoryManager_; diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index b055105e2047b..984557129d9c5 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -1066,12 +1066,12 @@ void Task::createSplitGroupStateLocked(uint32_t splitGroupId) { continue; } - auto exchangeId = factory->needsLocalExchange(); - if (exchangeId.has_value()) { + core::PlanNodePtr partitionNode; + if (factory->needsLocalExchange(partitionNode)) { + VELOX_CHECK_NOT_NULL(partitionNode); createLocalExchangeQueuesLocked( - splitGroupId, exchangeId.value(), factory->numDrivers); + splitGroupId, partitionNode, factory->numDrivers); } - addHashJoinBridgesLocked(splitGroupId, factory->needsHashJoinBridges()); addNestedLoopJoinBridgesLocked( splitGroupId, factory->needsNestedLoopJoinBridges()); @@ -1144,7 +1144,7 @@ std::vector> Task::createDriversLocked( for (auto& bridgeEntry : splitGroupState.bridges) { bridgeEntry.second->start(); } - for (auto& bridgeEntry : splitGroupState.custom_bridges) { + for (auto& bridgeEntry : splitGroupState.customBridges) { bridgeEntry.second->start(); } @@ -1817,7 +1817,7 @@ void Task::addCustomJoinBridgesLocked( auto& splitGroupState = splitGroupStates_[splitGroupId]; for (const auto& planNode : planNodes) { if (auto joinBridge = Operator::joinBridgeFromPlanNode(planNode)) { - auto const inserted = splitGroupState.custom_bridges + auto const inserted = splitGroupState.customBridges .emplace(planNode->id(), std::move(joinBridge)) .second; VELOX_CHECK( @@ -1913,7 +1913,7 @@ std::shared_ptr Task::getCustomJoinBridgeInternal( const core::PlanNodeId& planNodeId) { std::lock_guard l(mutex_); return getJoinBridgeInternalLocked( - splitGroupId, planNodeId, &SplitGroupState::custom_bridges); + splitGroupId, planNodeId, &SplitGroupState::customBridges); } // static @@ -2032,7 +2032,7 @@ ContinueFuture Task::terminate(TaskState terminalState) { for (auto& pair : splitGroupState.second.bridges) { oldBridges.emplace_back(std::move(pair.second)); } - for (auto& pair : splitGroupState.second.custom_bridges) { + for (auto& pair : splitGroupState.second.customBridges) { oldBridges.emplace_back(std::move(pair.second)); } splitGroupStates.push_back(std::move(splitGroupState.second)); @@ -2509,9 +2509,10 @@ std::shared_ptr Task::getMergeJoinSource( void Task::createLocalExchangeQueuesLocked( uint32_t splitGroupId, - const core::PlanNodeId& planNodeId, + const core::PlanNodePtr& planNode, int numPartitions) { auto& splitGroupState = splitGroupStates_[splitGroupId]; + const auto& planNodeId = planNode->id(); VELOX_CHECK( splitGroupState.localExchanges.find(planNodeId) == splitGroupState.localExchanges.end(), @@ -2530,6 +2531,21 @@ void Task::createLocalExchangeQueuesLocked( std::make_shared(exchange.memoryManager, i)); } + const auto partitionNode = + std::dynamic_pointer_cast(planNode); + VELOX_CHECK_NOT_NULL(partitionNode); + if (partitionNode->scaleWriter()) { + exchange.scaleWriterPartitionBalancer = + std::make_shared( + queryCtx_->queryConfig().scaleWriterMaxPartitionsPerWriter() * + numPartitions, + numPartitions, + queryCtx_->queryConfig() + .scaleWriterMinPartitionProcessedBytesRebalanceThreshold(), + queryCtx_->queryConfig() + .scaleWriterMinProcessedBytesRebalanceThreshold()); + } + splitGroupState.localExchanges.insert({planNodeId, std::move(exchange)}); } @@ -2572,6 +2588,22 @@ Task::getLocalExchangeQueues( return it->second.queues; } +const std::shared_ptr& +Task::getScaleWriterPartitionBalancerLocked( + uint32_t splitGroupId, + const core::PlanNodeId& planNodeId) { + auto& splitGroupState = splitGroupStates_[splitGroupId]; + + auto it = splitGroupState.localExchanges.find(planNodeId); + VELOX_CHECK( + it != splitGroupState.localExchanges.end(), + "Incorrect local exchange ID {} for group {}, task {}", + planNodeId, + splitGroupId, + taskId()); + return it->second.scaleWriterPartitionBalancer; +} + const std::shared_ptr& Task::getLocalExchangeMemoryManager( uint32_t splitGroupId, diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 4a81b3353f816..1baac51427c38 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -15,6 +15,7 @@ */ #pragma once +#include "velox/common/base/SkewedPartitionBalancer.h" #include "velox/core/PlanFragment.h" #include "velox/core/QueryCtx.h" #include "velox/exec/Driver.h" @@ -529,7 +530,7 @@ class Task : public std::enable_shared_from_this { void createLocalExchangeQueuesLocked( uint32_t splitGroupId, - const core::PlanNodeId& planNodeId, + const core::PlanNodePtr& planNode, int numPartitions); void noMoreLocalExchangeProducers(uint32_t splitGroupId); @@ -549,6 +550,13 @@ class Task : public std::enable_shared_from_this { uint32_t splitGroupId, const core::PlanNodeId& planNodeId); + /// Returns the shared skewed partition balancer for scale writer local + /// partitioning with the given split group id and plan node id. + const std::shared_ptr& + getScaleWriterPartitionBalancerLocked( + uint32_t splitGroupId, + const core::PlanNodeId& planNodeId); + void setError(const std::exception_ptr& exception); void setError(const std::string& message); diff --git a/velox/exec/TaskStructs.h b/velox/exec/TaskStructs.h index f89126d65db92..7c193240689a7 100644 --- a/velox/exec/TaskStructs.h +++ b/velox/exec/TaskStructs.h @@ -89,6 +89,8 @@ struct SplitsState { struct LocalExchangeState { std::shared_ptr memoryManager; std::vector> queues; + std::shared_ptr + scaleWriterPartitionBalancer; }; /// Stores inter-operator state (exchange, bridges) for split groups. @@ -98,8 +100,7 @@ struct SplitGroupState { std::unordered_map> bridges; /// This map will contain all other custom bridges. std::unordered_map> - custom_bridges; - + customBridges; /// Holds states for Task::allPeersFinished. std::unordered_map barriers; @@ -135,7 +136,7 @@ struct SplitGroupState { void clear() { if (!mixedExecutionMode) { bridges.clear(); - custom_bridges.clear(); + customBridges.clear(); barriers.clear(); } localMergeSources.clear();