Skip to content

Commit

Permalink
Serialization Mix
Browse files Browse the repository at this point in the history
  • Loading branch information
Orri Erling committed Dec 14, 2023
1 parent 12061c2 commit f8195b1
Show file tree
Hide file tree
Showing 19 changed files with 818 additions and 725 deletions.
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ HiveConnector::HiveConnector(
SimpleLRUCache<std::string, std::shared_ptr<FileHandle>>>(
hiveConfig_->numCacheFileHandles())
: nullptr,
std::make_unique<FileHandleGenerator>(config)),
std::make_unique<FileHandleGenerator>(nullptr)),
executor_(executor) {
if (hiveConfig_->isFileHandleCacheEnabled()) {
LOG(INFO) << "Hive connector " << connectorId()
Expand Down
17 changes: 0 additions & 17 deletions velox/connectors/hive/storage_adapters/s3fs/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,3 @@ target_link_libraries(
velox_exec
gtest
gtest_main)

add_executable(velox_s3read_test S3ReadTest.cpp)
add_test(
NAME velox_s3read_test
COMMAND velox_s3read_test
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(
velox_s3read_test
velox_file
velox_s3fs
velox_hive_config
velox_core
velox_exec_test_lib
velox_dwio_common_exception
velox_exec
gtest
gtest_main)
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
* limitations under the License.
*/

#pragma once

#include "velox/core/Config.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"

Expand Down
98 changes: 0 additions & 98 deletions velox/connectors/hive/storage_adapters/s3fs/tests/S3ReadTest.cpp

This file was deleted.

22 changes: 4 additions & 18 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,6 @@ class QueryConfig {
static constexpr const char* kMaxPartitionedOutputBufferSize =
"max_page_partitioning_buffer_size";

static constexpr const char* kMaxArbitraryBufferSize =
"max_arbitrary_buffer_size";

/// Preferred size of batches in bytes to be returned by operators from
/// Operator::getOutput. It is used when an estimate of average row size is
/// known. Otherwise kPreferredOutputBatchRows is used.
Expand Down Expand Up @@ -397,26 +394,15 @@ class QueryConfig {
return get<uint64_t>(kOrderBySpillMemoryThreshold, kDefault);
}

/// Returns the maximum size in bytes for the task's buffered output when
/// output is partitioned using hash of partitioning keys. See
/// PartitionedOutputNode::Kind::kPartitioned.
///
/// The producer Drivers are blocked when the buffered size exceeds
/// this. The Drivers are resumed when the buffered size goes below
/// OutputBufferManager::kContinuePct % of this.
// Returns the target size for a Task's buffered output. The
// producer Drivers are blocked when the buffered size exceeds
// this. The Drivers are resumed when the buffered size goes below
// OutputBufferManager::kContinuePct % of this.
uint64_t maxPartitionedOutputBufferSize() const {
static constexpr uint64_t kDefault = 32UL << 20;
return get<uint64_t>(kMaxPartitionedOutputBufferSize, kDefault);
}

/// Returns the maximum size in bytes for the task's buffered output when
/// output is distributed randomly among consumers. See
/// PartitionedOutputNode::Kind::kArbitrary.
uint64_t maxArbitraryBufferSize() const {
static constexpr uint64_t kDefault = 32UL << 20;
return get<uint64_t>(kMaxArbitraryBufferSize, kDefault);
}

uint64_t maxLocalExchangeBufferSize() const {
static constexpr uint64_t kDefault = 32UL << 20;
return get<uint64_t>(kMaxLocalExchangeBufferSize, kDefault);
Expand Down
9 changes: 1 addition & 8 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,7 @@ Generic Configuration
* - max_page_partitioning_buffer_size
- integer
- 32MB
- The maximum size in bytes for the task's buffered output when output is partitioned using hash of partitioning keys. See PartitionedOutputNode::Kind::kPartitioned.
The producer Drivers are blocked when the buffered size exceeds this.
The Drivers are resumed when the buffered size goes below OutputBufferManager::kContinuePct (90)% of this.
* - max_arbitrary_buffer_size
- integer
- 32MB
- The maximum size in bytes for the task's buffered output when output is distributed randomly among consumers. See PartitionedOutputNode::Kind::kArbitrary.
The producer Drivers are blocked when the buffered size exceeds this.
- The target size for a Task's buffered output. The producer Drivers are blocked when the buffered size exceeds this.
The Drivers are resumed when the buffered size goes below OutputBufferManager::kContinuePct (90)% of this.
* - min_table_rows_for_parallel_join_build
- integer
Expand Down
21 changes: 5 additions & 16 deletions velox/exec/OutputBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/
#include "velox/exec/OutputBuffer.h"
#include "velox/core/QueryConfig.h"
#include "velox/exec/Task.h"

namespace facebook::velox::exec {
Expand Down Expand Up @@ -218,17 +217,6 @@ void releaseAfterAcknowledge(
promise.setValue();
}
}

uint64_t maxBufferSize(
const core::QueryConfig& config,
PartitionedOutputNode::Kind bufferKind) {
if (bufferKind == PartitionedOutputNode::Kind::kArbitrary) {
return config.maxArbitraryBufferSize();
}

return config.maxPartitionedOutputBufferSize();
}

} // namespace

OutputBuffer::OutputBuffer(
Expand All @@ -238,7 +226,8 @@ OutputBuffer::OutputBuffer(
uint32_t numDrivers)
: task_(std::move(task)),
kind_(kind),
maxSize_(maxBufferSize(task_->queryCtx()->queryConfig(), kind)),
maxSize_(
task_->queryCtx()->queryConfig().maxPartitionedOutputBufferSize()),
continueSize_((maxSize_ * kContinuePct) / 100),
arbitraryBuffer_(
isArbitrary() ? std::make_unique<ArbitraryBuffer>() : nullptr),
Expand Down Expand Up @@ -306,9 +295,9 @@ void OutputBuffer::addOutputBuffersLocked(int numBuffers) {
for (const auto& data : dataToBroadcast_) {
buffer->enqueue(data);
}
if (atEnd_) {
buffer->enqueue(nullptr);
}
}
if (atEnd_ && isBroadcast()) {
buffer->enqueue(nullptr);
}
buffers_.emplace_back(std::move(buffer));
}
Expand Down
52 changes: 21 additions & 31 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,27 @@ BlockingReason Destination::advance(
OutputBufferManager& bufferManager,
const std::function<void()>& bufferReleaseFn,
bool* atEnd,
ContinueFuture* future) {
if (rangeIdx_ >= ranges_.size()) {
ContinueFuture* future,
Scratch& scratch) {
if (rowIdx_ >= rows_.size()) {
*atEnd = true;
return BlockingReason::kNotBlocked;
}

const auto firstRow = rowIdx_;
const uint32_t adjustedMaxBytes = (maxBytes * targetSizePct_) / 100;
if (bytesInCurrent_ >= adjustedMaxBytes) {
return flush(bufferManager, bufferReleaseFn, future);
}

// Collect ranges to serialize.
rangesToSerialize_.clear();
// Collect rows to serialize.
bool shouldFlush = false;
while (rangeIdx_ < ranges_.size() && !shouldFlush) {
auto& currRange = ranges_[rangeIdx_];
auto startRow = rowsInCurrentRange_;
for (; rowsInCurrentRange_ < currRange.size && !shouldFlush;
rowsInCurrentRange_++) {
++rowsInCurrent_;
bytesInCurrent_ += sizes[currRange.begin + rowsInCurrentRange_];
shouldFlush = bytesInCurrent_ >= adjustedMaxBytes ||
rowsInCurrent_ >= targetNumRows_;
}
rangesToSerialize_.push_back(
{currRange.begin + startRow, rowsInCurrentRange_ - startRow});
if (rowsInCurrentRange_ == currRange.size) {
rowsInCurrentRange_ = 0;
rangeIdx_++;
}
while (rowIdx_ < rows_.size() && !shouldFlush) {
bytesInCurrent_ += sizes[rowIdx_];
++rowIdx_;
++rowsInCurrent_;
shouldFlush =
bytesInCurrent_ >= adjustedMaxBytes || rowsInCurrent_ >= targetNumRows_;
}

// Serialize
Expand All @@ -67,9 +58,9 @@ BlockingReason Destination::advance(
current_->createStreamTree(rowType, rowsInCurrent_);
}
current_->append(
output, folly::Range(&rangesToSerialize_[0], rangesToSerialize_.size()));
output, folly::Range(&rows_[firstRow], rowIdx_ - firstRow), scratch);
// Update output state variable.
if (rangeIdx_ == ranges_.size()) {
if (rowIdx_ == rows_.size()) {
*atEnd = true;
}
if (shouldFlush || (eagerFlush_ && rowsInCurrent_ > 0)) {
Expand Down Expand Up @@ -189,12 +180,7 @@ void PartitionedOutput::initializeDestinations() {

void PartitionedOutput::initializeSizeBuffers() {
auto numInput = input_->size();
if (numInput > topLevelRanges_.size()) {
vector_size_t numOld = topLevelRanges_.size();
topLevelRanges_.resize(numInput);
for (auto i = numOld; i < numInput; ++i) {
topLevelRanges_[i] = IndexRange{i, 1};
}
if (numInput > rowSize_.size()) {
rowSize_.resize(numInput);
sizePointers_.resize(numInput);
// Set all the size pointers since 'rowSize_' may have been reallocated.
Expand All @@ -207,11 +193,14 @@ void PartitionedOutput::initializeSizeBuffers() {
void PartitionedOutput::estimateRowSizes() {
auto numInput = input_->size();
std::fill(rowSize_.begin(), rowSize_.end(), 0);
raw_vector<vector_size_t> storage;
auto numbers = iota(numInput, storage);
for (int i = 0; i < output_->childrenSize(); ++i) {
VectorStreamGroup::estimateSerializedSize(
output_->childAt(i),
folly::Range(topLevelRanges_.data(), numInput),
sizePointers_.data());
folly::Range(numbers, numInput),
sizePointers_.data(),
scratch_);
}
}

Expand Down Expand Up @@ -332,7 +321,8 @@ RowVectorPtr PartitionedOutput::getOutput() {
*bufferManager,
bufferReleaseFn_,
&atEnd,
&future_);
&future_,
scratch_);
if (blockingReason_ != BlockingReason::kNotBlocked) {
blockedDestination = destination.get();
workLeft = false;
Expand Down
Loading

0 comments on commit f8195b1

Please sign in to comment.