Skip to content

Commit

Permalink
feat: Make skewed partition balancer thread-safe and share among scal…
Browse files Browse the repository at this point in the history
…e writer partitioners (#11786)

Summary:

Make skewed partition balancer thread-safe and share it with all the scale writer local partitioners.
Skewed partition balance is created by driver factory init and saved in the associated exchange plan node's state.
This will help further reduces the unnecessary written files and make memory usage more efficient as
all the local scale writer partitioners have the consistent partition assignment views

Differential Revision: D66628614
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Dec 8, 2024
1 parent 929affe commit 663dc36
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 64 deletions.
85 changes: 65 additions & 20 deletions velox/common/base/SkewedPartitionBalancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

#include "velox/common/base/SkewedPartitionBalancer.h"

#include "velox/common/testutil/TestValue.h"

using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::common {
SkewedPartitionRebalancer::SkewedPartitionRebalancer(
uint32_t partitionCount,
Expand All @@ -28,40 +32,74 @@ SkewedPartitionRebalancer::SkewedPartitionRebalancer(
minProcessedBytesRebalanceThresholdPerPartition),
minProcessedBytesRebalanceThreshold_(std::max(
minProcessedBytesRebalanceThreshold,
minProcessedBytesRebalanceThresholdPerPartition_)) {
minProcessedBytesRebalanceThresholdPerPartition_)),
partitionRowCount_(partitionCount_),
partitionAssignments_(partitionCount_) {
VELOX_CHECK_GT(partitionCount_, 0);
VELOX_CHECK_GT(taskCount_, 0);

partitionRowCount_.resize(partitionCount_, 0);
partitionBytes_.resize(partitionCount_, 0);
partitionBytesAtLastRebalance_.resize(partitionCount_, 0);
partitionBytesSinceLastRebalancePerTask_.resize(partitionCount_, 0);
estimatedTaskBytesSinceLastRebalance_.resize(taskCount_, 0);

partitionAssignments_.resize(partitionCount_);

// Assigns one task for each partition intitially.
for (auto partition = 0; partition < partitionCount_; ++partition) {
const uint32_t taskId = partition % taskCount;
partitionAssignments_[partition].emplace_back(taskId);
const uint32_t taskId = partition % taskCount_;
partitionAssignments_[partition].addTaskId(taskId);
}
}

void SkewedPartitionRebalancer::PartitionAssignment::addTaskId(
uint32_t taskId) {
std::unique_lock guard{lock_};
taskIds_.push_back(taskId);
}

uint32_t SkewedPartitionRebalancer::PartitionAssignment::nextTaskId(
uint64_t index) const {
std::shared_lock guard{lock_};
const auto taskIndex = index % taskIds_.size();
return taskIds_[taskIndex];
}

uint32_t SkewedPartitionRebalancer::PartitionAssignment::size() const {
std::shared_lock guard{lock_};
return taskIds_.size();
}

const std::vector<uint32_t>
SkewedPartitionRebalancer::PartitionAssignment::taskIds() const {
std::shared_lock guard{lock_};
return taskIds_;
}

void SkewedPartitionRebalancer::rebalance() {
if (shouldRebalance()) {
rebalancePartitions();
const int64_t processedBytes = processedBytes_.load();
if (shouldRebalance(processedBytes)) {
rebalancePartitions(processedBytes);
}
}

bool SkewedPartitionRebalancer::shouldRebalance() const {
VELOX_CHECK_GE(processedBytes_, processedBytesAtLastRebalance_);
return (processedBytes_ - processedBytesAtLastRebalance_) >=
bool SkewedPartitionRebalancer::shouldRebalance(int64_t processedBytes) const {
return (processedBytes - processedBytesAtLastRebalance_) >=
minProcessedBytesRebalanceThreshold_;
}

void SkewedPartitionRebalancer::rebalancePartitions() {
VELOX_DCHECK(shouldRebalance());
++stats_.numBalanceTriggers;
void SkewedPartitionRebalancer::rebalancePartitions(int64_t processedBytes) {
if (rebalancing_.exchange(true)) {
return;
}

SCOPE_EXIT {
VELOX_CHECK(rebalancing_);
rebalancing_ = false;
};
++numBalanceTriggers_;

TestValue::adjust(
"facebook::velox::common::SkewedPartitionRebalancer::rebalancePartitions",
this);

// Updates the processed bytes for each partition.
calculatePartitionProcessedBytes();
Expand All @@ -83,7 +121,7 @@ void SkewedPartitionRebalancer::rebalancePartitions() {
taskMaxPartitions{taskCount_};
for (auto partition = 0; partition < partitionCount_; ++partition) {
auto& taskAssignments = partitionAssignments_[partition];
for (uint32_t taskId : taskAssignments) {
for (uint32_t taskId : taskAssignments.taskIds()) {
auto& taskQueue = taskMaxPartitions[taskId];
taskQueue.addOrUpdate(
partition, partitionBytesSinceLastRebalancePerTask_[partition]);
Expand All @@ -102,7 +140,7 @@ void SkewedPartitionRebalancer::rebalancePartitions() {
}

rebalanceBasedOnTaskSkewness(maxTasks, minTasks, taskMaxPartitions);
processedBytesAtLastRebalance_ = processedBytes_;
processedBytesAtLastRebalance_.store(processedBytes);
}

void SkewedPartitionRebalancer::rebalanceBasedOnTaskSkewness(
Expand Down Expand Up @@ -159,7 +197,7 @@ void SkewedPartitionRebalancer::rebalanceBasedOnTaskSkewness(
}
}

stats_.numScaledPartitions += scaledPartitions.size();
numScaledPartitions_ += scaledPartitions.size();
}

bool SkewedPartitionRebalancer::rebalancePartition(
Expand All @@ -168,21 +206,21 @@ bool SkewedPartitionRebalancer::rebalancePartition(
IndexedPriorityQueue<uint32_t, true>& maxTasks,
IndexedPriorityQueue<uint32_t, false>& minTasks) {
auto& taskAssignments = partitionAssignments_[rebalancePartition];
for (auto taskId : taskAssignments) {
for (auto taskId : taskAssignments.taskIds()) {
if (taskId == targetTaskId) {
return false;
}
}

taskAssignments.push_back(targetTaskId);
taskAssignments.addTaskId(targetTaskId);
VELOX_CHECK_GT(partitionAssignments_[rebalancePartition].size(), 1);

const auto newTaskCount = taskAssignments.size();
const auto oldTaskCount = newTaskCount - 1;
// Since a partition is rebalanced from max to min skewed tasks,
// decrease the priority of max taskBucket as well as increase the priority
// of min taskBucket.
for (uint32_t taskId : taskAssignments) {
for (uint32_t taskId : taskAssignments.taskIds()) {
if (taskId == targetTaskId) {
estimatedTaskBytesSinceLastRebalance_[taskId] +=
(partitionBytesSinceLastRebalancePerTask_[rebalancePartition] *
Expand All @@ -208,6 +246,13 @@ void SkewedPartitionRebalancer::calculatePartitionProcessedBytes() {
for (auto partition = 0; partition < partitionCount_; ++partition) {
totalPartitionRowCount += partitionRowCount_[partition];
}
if (totalPartitionRowCount <= 0) {
LOG(ERROR) << "processedBytes " << processedBytes_
<< " processedBytesAtLastRebalance_ "
<< minProcessedBytesRebalanceThreshold_
<< " minProcessedBytesRebalanceThreshold_ "
<< minProcessedBytesRebalanceThreshold_;
}
VELOX_CHECK_GT(totalPartitionRowCount, 0);

for (auto partition = 0; partition < partitionCount_; ++partition) {
Expand Down
62 changes: 49 additions & 13 deletions velox/common/base/SkewedPartitionBalancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class SkewedPartitionRebalancerTestHelper;
/// tasks to busy partition measured by processed data size. This is used by
/// local partition to scale table writers for now.
///
/// NOTE: this object is not thread-safe.
/// NOTE: this object is thread-safe.
class SkewedPartitionRebalancer {
public:
/// 'partitionCount' is the number of partitions to process. 'taskCount' is
Expand All @@ -46,14 +46,18 @@ class SkewedPartitionRebalancer {
uint64_t minProcessedBytesRebalanceThresholdPerPartition,
uint64_t minProcessedBytesRebalanceThreshold);

~SkewedPartitionRebalancer() {
VELOX_CHECK(!rebalancing_);
}

/// Invoked to rebalance the partition assignments if applicable.
void rebalance();

/// Gets the assigned task id for a given 'partition'. 'index' is used to
/// choose one of multiple assigned tasks in a round-robin order.
uint32_t getTaskId(uint32_t partition, uint64_t index) const {
const auto& taskList = partitionAssignments_[partition];
return taskList[index % taskList.size()];
auto& taskList = partitionAssignments_[partition];
return taskList.nextTaskId(index);
}

/// Adds the processed partition row count. This is used to estimate the
Expand All @@ -64,11 +68,19 @@ class SkewedPartitionRebalancer {
}

/// Adds the total processed bytes from all the partitions.
void addProcessedBytes(long bytes) {
void addProcessedBytes(int64_t bytes) {
VELOX_CHECK_GT(bytes, 0);
processedBytes_ += bytes;
}

uint32_t numPartitions() const {
return partitionCount_;
}

uint32_t numTasks() const {
return taskCount_;
}

/// The rebalancer internal stats.
struct Stats {
/// The number of times that triggers rebalance.
Expand All @@ -85,13 +97,15 @@ class SkewedPartitionRebalancer {
};

Stats stats() const {
return stats_;
return Stats{
.numBalanceTriggers = numBalanceTriggers_.load(),
.numScaledPartitions = numScaledPartitions_.load()};
}

private:
bool shouldRebalance() const;
bool shouldRebalance(int64_t processedBytes) const;

void rebalancePartitions();
void rebalancePartitions(int64_t processedBytes);

// Calculates the partition processed data size based on the number of
// processed rows and the averaged row size.
Expand Down Expand Up @@ -138,13 +152,16 @@ class SkewedPartitionRebalancer {
const uint64_t minProcessedBytesRebalanceThreshold_;

// The accumulated number of rows processed by each partition.
std::vector<uint64_t> partitionRowCount_;
std::vector<std::atomic_uint64_t> partitionRowCount_;

// Indicates if the rebalancer is running or not.
std::atomic_bool rebalancing_{false};

// The accumulated number of bytes processed by all the partitions.
uint64_t processedBytes_{0};
std::atomic_int64_t processedBytes_{0};
// 'processedBytes_' at the last rebalance. It is used to calculate the
// processed bytes changes since the last rebalance.
uint64_t processedBytesAtLastRebalance_{0};
std::atomic_int64_t processedBytesAtLastRebalance_{0};
// The accumulated number of bytes processed by each partition.
std::vector<uint64_t> partitionBytes_;
// 'partitionBytes_' at the last rebalance. It is used to calculate the
Expand All @@ -157,10 +174,29 @@ class SkewedPartitionRebalancer {
// The estimated task processed bytes since the last rebalance.
std::vector<uint64_t> estimatedTaskBytesSinceLastRebalance_;

// The assigned task id list for each partition.
std::vector<std::vector<uint32_t>> partitionAssignments_;
// The assigned task id assignment for a partition.
class PartitionAssignment {
public:
PartitionAssignment() = default;

void addTaskId(uint32_t taskId);

uint32_t nextTaskId(uint64_t index) const;

const std::vector<uint32_t> taskIds() const;

uint32_t size() const;

private:
mutable folly::SharedMutex lock_;
std::vector<uint32_t> taskIds_;
};
std::vector<PartitionAssignment> partitionAssignments_;

Stats stats_;
/// The number of times that triggers rebalance.
/// The number of times that a scaled partition processing.
std::atomic_uint64_t numBalanceTriggers_{0};
std::atomic_uint32_t numScaledPartitions_{0};

friend class test::SkewedPartitionRebalancerTestHelper;
};
Expand Down
Loading

0 comments on commit 663dc36

Please sign in to comment.