diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index ec9591e96ca6..9cc09048f7e8 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -156,6 +156,9 @@ class QueryConfig { static constexpr const char* kMaxPartitionedOutputBufferSize = "max_page_partitioning_buffer_size"; + static constexpr const char* kMaxArbitraryBufferSize = + "max_arbitrary_buffer_size"; + /// Preferred size of batches in bytes to be returned by operators from /// Operator::getOutput. It is used when an estimate of average row size is /// known. Otherwise kPreferredOutputBatchRows is used. @@ -394,15 +397,26 @@ class QueryConfig { return get(kOrderBySpillMemoryThreshold, kDefault); } - // Returns the target size for a Task's buffered output. The - // producer Drivers are blocked when the buffered size exceeds - // this. The Drivers are resumed when the buffered size goes below - // OutputBufferManager::kContinuePct % of this. + /// Returns the maximum size in bytes for the task's buffered output when + /// output is partitioned using hash of partitioning keys. See + /// PartitionedOutputNode::Kind::kPartitioned. + /// + /// The producer Drivers are blocked when the buffered size exceeds + /// this. The Drivers are resumed when the buffered size goes below + /// OutputBufferManager::kContinuePct % of this. uint64_t maxPartitionedOutputBufferSize() const { static constexpr uint64_t kDefault = 32UL << 20; return get(kMaxPartitionedOutputBufferSize, kDefault); } + /// Returns the maximum size in bytes for the task's buffered output when + /// output is distributed randomly among consumers. See + /// PartitionedOutputNode::Kind::kArbitrary. + uint64_t maxArbitraryBufferSize() const { + static constexpr uint64_t kDefault = 32UL << 20; + return get(kMaxArbitraryBufferSize, kDefault); + } + uint64_t maxLocalExchangeBufferSize() const { static constexpr uint64_t kDefault = 32UL << 20; return get(kMaxLocalExchangeBufferSize, kDefault); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 0ea869ef85c7..191c63169c79 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -94,7 +94,14 @@ Generic Configuration * - max_page_partitioning_buffer_size - integer - 32MB - - The target size for a Task's buffered output. The producer Drivers are blocked when the buffered size exceeds this. + - The maximum size in bytes for the task's buffered output when output is partitioned using hash of partitioning keys. See PartitionedOutputNode::Kind::kPartitioned. + The producer Drivers are blocked when the buffered size exceeds this. + The Drivers are resumed when the buffered size goes below OutputBufferManager::kContinuePct (90)% of this. + * - max_arbitrary_buffer_size + - integer + - 32MB + - The maximum size in bytes for the task's buffered output when output is distributed randomly among consumers. See PartitionedOutputNode::Kind::kArbitrary. + The producer Drivers are blocked when the buffered size exceeds this. The Drivers are resumed when the buffered size goes below OutputBufferManager::kContinuePct (90)% of this. * - min_table_rows_for_parallel_join_build - integer diff --git a/velox/exec/OutputBuffer.cpp b/velox/exec/OutputBuffer.cpp index 7b9bdec21e8a..9faa79e9da18 100644 --- a/velox/exec/OutputBuffer.cpp +++ b/velox/exec/OutputBuffer.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ #include "velox/exec/OutputBuffer.h" +#include "velox/core/QueryConfig.h" #include "velox/exec/Task.h" namespace facebook::velox::exec { @@ -217,6 +218,17 @@ void releaseAfterAcknowledge( promise.setValue(); } } + +uint64_t maxBufferSize( + const core::QueryConfig& config, + PartitionedOutputNode::Kind bufferKind) { + if (bufferKind == PartitionedOutputNode::Kind::kArbitrary) { + return config.maxArbitraryBufferSize(); + } + + return config.maxPartitionedOutputBufferSize(); +} + } // namespace OutputBuffer::OutputBuffer( @@ -226,8 +238,7 @@ OutputBuffer::OutputBuffer( uint32_t numDrivers) : task_(std::move(task)), kind_(kind), - maxSize_( - task_->queryCtx()->queryConfig().maxPartitionedOutputBufferSize()), + maxSize_(maxBufferSize(task_->queryCtx()->queryConfig(), kind)), continueSize_((maxSize_ * kContinuePct) / 100), arbitraryBuffer_( isArbitrary() ? std::make_unique() : nullptr), @@ -295,9 +306,9 @@ void OutputBuffer::addOutputBuffersLocked(int numBuffers) { for (const auto& data : dataToBroadcast_) { buffer->enqueue(data); } - } - if (atEnd_ && isBroadcast()) { - buffer->enqueue(nullptr); + if (atEnd_) { + buffer->enqueue(nullptr); + } } buffers_.emplace_back(std::move(buffer)); }