Skip to content

Commit

Permalink
Introduce max_arbitrary_buffer_size config (#8049)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #8049

An existing max_page_partitioning_buffer_size config defines maximum size in
bytes for task's buffered output when output is partitioned using hash of
partitioning keys. See PartitionedOutputNode::Kind::kPartitioned.

The new max_arbitrary_buffer_size config applies when output is randomly
distributed among consumers.  See PartitionedOutputNode::Kind::kArbitrary.

kArbitrary partitioning is used to distribute output among TableWriter
consumers. The number of table writers is increased dynamically based on how
much data is being produced. This number can be much larger than number of hash
partitions used for joins and aggregations. Hence, producers need to buffer
more data and therefore a separate config is needed.

Reviewed By: Yuhta

Differential Revision: D52164424

fbshipit-source-id: d6d80f780a63ff621ab318ab05b26ad8bf3a21e2
  • Loading branch information
mbasmanova authored and facebook-github-bot committed Dec 14, 2023
1 parent ea2d2e1 commit 12061c2
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 10 deletions.
22 changes: 18 additions & 4 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ class QueryConfig {
static constexpr const char* kMaxPartitionedOutputBufferSize =
"max_page_partitioning_buffer_size";

static constexpr const char* kMaxArbitraryBufferSize =
"max_arbitrary_buffer_size";

/// Preferred size of batches in bytes to be returned by operators from
/// Operator::getOutput. It is used when an estimate of average row size is
/// known. Otherwise kPreferredOutputBatchRows is used.
Expand Down Expand Up @@ -394,15 +397,26 @@ class QueryConfig {
return get<uint64_t>(kOrderBySpillMemoryThreshold, kDefault);
}

// Returns the target size for a Task's buffered output. The
// producer Drivers are blocked when the buffered size exceeds
// this. The Drivers are resumed when the buffered size goes below
// OutputBufferManager::kContinuePct % of this.
/// Returns the maximum size in bytes for the task's buffered output when
/// output is partitioned using hash of partitioning keys. See
/// PartitionedOutputNode::Kind::kPartitioned.
///
/// The producer Drivers are blocked when the buffered size exceeds
/// this. The Drivers are resumed when the buffered size goes below
/// OutputBufferManager::kContinuePct % of this.
uint64_t maxPartitionedOutputBufferSize() const {
static constexpr uint64_t kDefault = 32UL << 20;
return get<uint64_t>(kMaxPartitionedOutputBufferSize, kDefault);
}

/// Returns the maximum size in bytes for the task's buffered output when
/// output is distributed randomly among consumers. See
/// PartitionedOutputNode::Kind::kArbitrary.
uint64_t maxArbitraryBufferSize() const {
static constexpr uint64_t kDefault = 32UL << 20;
return get<uint64_t>(kMaxArbitraryBufferSize, kDefault);
}

uint64_t maxLocalExchangeBufferSize() const {
static constexpr uint64_t kDefault = 32UL << 20;
return get<uint64_t>(kMaxLocalExchangeBufferSize, kDefault);
Expand Down
9 changes: 8 additions & 1 deletion velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,14 @@ Generic Configuration
* - max_page_partitioning_buffer_size
- integer
- 32MB
- The target size for a Task's buffered output. The producer Drivers are blocked when the buffered size exceeds this.
- The maximum size in bytes for the task's buffered output when output is partitioned using hash of partitioning keys. See PartitionedOutputNode::Kind::kPartitioned.
The producer Drivers are blocked when the buffered size exceeds this.
The Drivers are resumed when the buffered size goes below OutputBufferManager::kContinuePct (90)% of this.
* - max_arbitrary_buffer_size
- integer
- 32MB
- The maximum size in bytes for the task's buffered output when output is distributed randomly among consumers. See PartitionedOutputNode::Kind::kArbitrary.
The producer Drivers are blocked when the buffered size exceeds this.
The Drivers are resumed when the buffered size goes below OutputBufferManager::kContinuePct (90)% of this.
* - min_table_rows_for_parallel_join_build
- integer
Expand Down
21 changes: 16 additions & 5 deletions velox/exec/OutputBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "velox/exec/OutputBuffer.h"
#include "velox/core/QueryConfig.h"
#include "velox/exec/Task.h"

namespace facebook::velox::exec {
Expand Down Expand Up @@ -217,6 +218,17 @@ void releaseAfterAcknowledge(
promise.setValue();
}
}

uint64_t maxBufferSize(
const core::QueryConfig& config,
PartitionedOutputNode::Kind bufferKind) {
if (bufferKind == PartitionedOutputNode::Kind::kArbitrary) {
return config.maxArbitraryBufferSize();
}

return config.maxPartitionedOutputBufferSize();
}

} // namespace

OutputBuffer::OutputBuffer(
Expand All @@ -226,8 +238,7 @@ OutputBuffer::OutputBuffer(
uint32_t numDrivers)
: task_(std::move(task)),
kind_(kind),
maxSize_(
task_->queryCtx()->queryConfig().maxPartitionedOutputBufferSize()),
maxSize_(maxBufferSize(task_->queryCtx()->queryConfig(), kind)),
continueSize_((maxSize_ * kContinuePct) / 100),
arbitraryBuffer_(
isArbitrary() ? std::make_unique<ArbitraryBuffer>() : nullptr),
Expand Down Expand Up @@ -295,9 +306,9 @@ void OutputBuffer::addOutputBuffersLocked(int numBuffers) {
for (const auto& data : dataToBroadcast_) {
buffer->enqueue(data);
}
}
if (atEnd_ && isBroadcast()) {
buffer->enqueue(nullptr);
if (atEnd_) {
buffer->enqueue(nullptr);
}
}
buffers_.emplace_back(std::move(buffer));
}
Expand Down

0 comments on commit 12061c2

Please sign in to comment.