diff --git a/velox/common/file/File.h b/velox/common/file/File.h index 294961a32c55..bcd159356803 100644 --- a/velox/common/file/File.h +++ b/velox/common/file/File.h @@ -42,43 +42,44 @@ namespace facebook::velox { -// A read-only file. All methods in this object should be thread safe. +/// A read-only file. All methods in this object should be thread safe. class ReadFile { public: virtual ~ReadFile() = default; - // Reads the data at [offset, offset + length) into the provided pre-allocated - // buffer 'buf'. The bytes are returned as a string_view pointing to 'buf'. - // - // This method should be thread safe. + /// Reads the data at [offset, offset + length) into the provided + /// pre-allocated buffer 'buf'. The bytes are returned as a string_view + /// pointing to 'buf'. + /// + /// This method should be thread safe. virtual std::string_view pread(uint64_t offset, uint64_t length, void* buf) const = 0; - // Same as above, but returns owned data directly. - // - // This method should be thread safe. + /// Same as above, but returns owned data directly. + /// + /// This method should be thread safe. virtual std::string pread(uint64_t offset, uint64_t length) const; - // Reads starting at 'offset' into the memory referenced by the - // Ranges in 'buffers'. The buffers are filled left to right. A - // buffer with nullptr data will cause its size worth of bytes to be skipped. - // - // This method should be thread safe. + /// Reads starting at 'offset' into the memory referenced by the + /// Ranges in 'buffers'. The buffers are filled left to right. A + /// buffer with nullptr data will cause its size worth of bytes to be skipped. + /// + /// This method should be thread safe. virtual uint64_t preadv( uint64_t /*offset*/, const std::vector>& /*buffers*/) const; - // Vectorized read API. Implementations can coalesce and parallelize. - // The offsets don't need to be sorted. - // `iobufs` is a range of IOBufs to store the read data. They - // will be stored in the same order as the input `regions` vector. So the - // array must be pre-allocated by the caller, with the same size as `regions`, - // but don't need to be initialized, since each iobuf will be copy-constructed - // by the preadv. - // Returns the total number of bytes read, which might be different than the - // sum of all buffer sizes (for example, if coalescing was used). - // - // This method should be thread safe. + /// Vectorized read API. Implementations can coalesce and parallelize. + /// The offsets don't need to be sorted. + /// `iobufs` is a range of IOBufs to store the read data. They + /// will be stored in the same order as the input `regions` vector. So the + /// array must be pre-allocated by the caller, with the same size as + /// `regions`, but don't need to be initialized, since each iobuf will be + /// copy-constructed by the preadv. Returns the total number of bytes read, + /// which might be different than the sum of all buffer sizes (for example, if + /// coalescing was used). + /// + /// This method should be thread safe. virtual uint64_t preadv( folly::Range regions, folly::Range iobufs) const; @@ -98,25 +99,25 @@ class ReadFile { } } - // Returns true if preadvAsync has a native implementation that is - // asynchronous. The default implementation is synchronous. + /// Returns true if preadvAsync has a native implementation that is + /// asynchronous. The default implementation is synchronous. virtual bool hasPreadvAsync() const { return false; } - // Whether preads should be coalesced where possible. E.g. remote disk would - // set to true, in-memory to false. + /// Whether preads should be coalesced where possible. E.g. remote disk would + /// set to true, in-memory to false. virtual bool shouldCoalesce() const = 0; - // Number of bytes in the file. + /// Number of bytes in the file. virtual uint64_t size() const = 0; - // An estimate for the total amount of memory *this uses. + /// An estimate for the total amount of memory *this uses. virtual uint64_t memoryUsage() const = 0; - // The total number of bytes *this had been used to read since creation or - // the last resetBytesRead. We sum all the |length| variables passed to - // preads, not the actual amount of bytes read (which might be less). + /// The total number of bytes *this had been used to read since creation or + /// the last resetBytesRead. We sum all the |length| variables passed to + /// preads, not the actual amount of bytes read (which might be less). virtual uint64_t bytesRead() const { return bytesRead_; } @@ -135,8 +136,8 @@ class ReadFile { mutable std::atomic bytesRead_ = 0; }; -// A write-only file. Nothing written to the file should be read back until it -// is closed. +/// A write-only file. Nothing written to the file should be read back until it +/// is closed. class WriteFile { public: virtual ~WriteFile() = default; @@ -193,14 +194,13 @@ class WriteFile { virtual uint64_t size() const = 0; }; -// We currently do a simple implementation for the in-memory files -// that simply resizes a string as needed. If there ever gets used in -// a performance sensitive path we'd probably want to move to a Cord-like -// implementation for underlying storage. - -// We don't provide registration functions for the in-memory files, as they -// aren't intended for any robust use needing a filesystem. - +/// We currently do a simple implementation for the in-memory files +/// that simply resizes a string as needed. If there ever gets used in +/// a performance sensitive path we'd probably want to move to a Cord-like +/// implementation for underlying storage. +/// +/// We don't provide registration functions for the in-memory files, as they +/// aren't intended for any robust use needing a filesystem. class InMemoryReadFile : public ReadFile { public: explicit InMemoryReadFile(std::string_view file) : file_(file) {} @@ -307,9 +307,9 @@ class LocalReadFile final : public ReadFile { class LocalWriteFile final : public WriteFile { public: struct Attributes { - // If set to true, the file will not be subject to copy-on-write updates. - // This flag has an effect only on filesystems that support copy-on-write - // semantics, such as Btrfs. + /// If set to true, the file will not be subject to copy-on-write updates. + /// This flag has an effect only on filesystems that support copy-on-write + /// semantics, such as Btrfs. static constexpr std::string_view kNoCow{"write-on-copy-disabled"}; static constexpr bool kDefaultNoCow{false}; @@ -317,8 +317,8 @@ class LocalWriteFile final : public WriteFile { const std::unordered_map& attrs); }; - // An error is thrown is a file already exists at |path|, - // unless flag shouldThrowOnFileAlreadyExists is false + /// An error is thrown is a file already exists at |path|, + /// unless flag shouldThrowOnFileAlreadyExists is false explicit LocalWriteFile( std::string_view path, bool shouldCreateParentDirectories = false, diff --git a/velox/common/memory/ByteStream.cpp b/velox/common/memory/ByteStream.cpp index c8f489854988..823cef19775a 100644 --- a/velox/common/memory/ByteStream.cpp +++ b/velox/common/memory/ByteStream.cpp @@ -359,6 +359,9 @@ void ByteOutputStream::extend(int32_t bytes) { ranges_.size() == 1 ? nullptr : &ranges_[ranges_.size() - 2], current_); allocatedBytes_ += current_->size; + if (allocatedBytes_ <= 0) { + VELOX_CHECK_GT(allocatedBytes_, 0); + } VELOX_CHECK_GT(allocatedBytes_, 0); if (isBits_) { // size and position are in units of bits for a bits stream. diff --git a/velox/common/memory/StreamArena.cpp b/velox/common/memory/StreamArena.cpp index 1153afbb8cf6..7ffaa545415d 100644 --- a/velox/common/memory/StreamArena.cpp +++ b/velox/common/memory/StreamArena.cpp @@ -76,6 +76,7 @@ void StreamArena::newTinyRange( range->buffer = reinterpret_cast(tinyRanges_.back().data()); range->size = bytes; } + void StreamArena::clear() { allocations_.clear(); pool_->freeNonContiguous(allocation_); diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index d9085215860a..fb65022a5944 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -1385,6 +1385,10 @@ class PartitionedOutputNode : public PlanNode { return sources_; } + bool canSpill(const QueryConfig& queryConfig) const override { + return isPartitioned() && queryConfig.partitionedOutputSpillEnabled(); + } + const RowTypePtr& inputType() const { return sources_[0]->outputType(); } diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 630bebdc8fc0..9fa99968cdda 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -211,6 +211,11 @@ class QueryConfig { static constexpr const char* kTopNRowNumberSpillEnabled = "topn_row_number_spill_enabled"; + /// PartitionedOutput spilling flag, only applies if "spill_enabled" flag is + /// set. + static constexpr const char* kPartitionedOutputSpillEnabled = + "partitioned_output_spill_enabled"; + /// The max row numbers to fill and spill for each spill run. This is used to /// cap the memory used for spilling. If it is zero, then there is no limit /// and spilling might run out of memory. @@ -694,6 +699,10 @@ class QueryConfig { return get(kTopNRowNumberSpillEnabled, true); } + bool partitionedOutputSpillEnabled() const { + return get(kPartitionedOutputSpillEnabled, true); + } + int32_t maxSpillLevel() const { return get(kMaxSpillLevel, 1); } diff --git a/velox/core/QueryCtx.h b/velox/core/QueryCtx.h index 88ce04b1b9a0..655d9fe02e4c 100644 --- a/velox/core/QueryCtx.h +++ b/velox/core/QueryCtx.h @@ -96,8 +96,8 @@ class QueryCtx : public std::enable_shared_from_this { this->queryConfig_.testingOverrideConfigUnsafe(std::move(values)); } - // Overrides the previous connector-specific configuration. Note that this - // function is NOT thread-safe and should probably only be used in tests. + /// Overrides the previous connector-specific configuration. Note that this + /// function is NOT thread-safe and should probably only be used in tests. void setConnectorSessionOverridesUnsafe( const std::string& connectorId, std::unordered_map&& configOverrides) { diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 9a6a4740ddd8..961b7a86dd6b 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -108,7 +108,8 @@ velox::memory::MemoryPool* DriverCtx::addOperatorPool( std::optional DriverCtx::makeSpillConfig( int32_t operatorId) const { - const auto& queryConfig = task->queryCtx()->queryConfig(); + const auto& queryCtx = task->queryCtx(); + const auto& queryConfig = queryCtx->queryConfig(); if (!queryConfig.spillEnabled()) { return std::nullopt; } @@ -119,20 +120,18 @@ std::optional DriverCtx::makeSpillConfig( [this]() -> std::string_view { return task->getOrCreateSpillDirectory(); }; - const auto& spillFilePrefix = - fmt::format("{}_{}_{}", pipelineId, driverId, operatorId); common::UpdateAndCheckSpillLimitCB updateAndCheckSpillLimitCb = - [this](uint64_t bytes) { - task->queryCtx()->updateSpilledBytesAndCheckLimit(bytes); + [queryCtx](uint64_t bytes) { + queryCtx->updateSpilledBytesAndCheckLimit(bytes); }; return common::SpillConfig( std::move(getSpillDirPathCb), std::move(updateAndCheckSpillLimitCb), - spillFilePrefix, + fmt::format("{}_{}_{}", pipelineId, driverId, operatorId), queryConfig.maxSpillFileSize(), queryConfig.spillWriteBufferSize(), queryConfig.spillReadBufferSize(), - task->queryCtx()->spillExecutor(), + queryCtx->spillExecutor(), queryConfig.minSpillableReservationPct(), queryConfig.spillableReservationGrowthPct(), queryConfig.spillStartPartitionBit(), @@ -142,7 +141,10 @@ std::optional DriverCtx::makeSpillConfig( queryConfig.writerFlushThresholdBytes(), queryConfig.spillCompressionKind(), queryConfig.spillPrefixSortEnabled() - ? std::optional(prefixSortConfig()) + ? std::optional(common::PrefixSortConfig{ + queryConfig.prefixSortNormalizedKeyMaxBytes(), + queryConfig.prefixSortMinRows(), + queryConfig.prefixSortMaxStringPrefixLength()}) : std::nullopt, queryConfig.spillFileCreateConfig()); } diff --git a/velox/exec/ExchangeQueue.h b/velox/exec/ExchangeQueue.h index 91e3a663aa06..4bed38d841c6 100644 --- a/velox/exec/ExchangeQueue.h +++ b/velox/exec/ExchangeQueue.h @@ -60,7 +60,7 @@ class SerializedPage { // Buffers containing the serialized data. The memory is owned by 'iobuf_'. std::vector ranges_; - // IOBuf holding the data in 'ranges_. + // IOBuf holding the data in 'ranges_'. std::unique_ptr iobuf_; // Number of payload bytes in 'iobuf_'. diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 5ee37bd29814..0763386ec41a 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -335,6 +335,14 @@ OperatorStats Operator::stats(bool clear) { return stats; } +void Operator::postReclaimCheck(int64_t reclaimedBytes) const { + VELOX_CHECK_GE( + reclaimedBytes, + 0, + "Unexpected memory growth after reclaim from operator memory pool {}", + pool()->name()); +} + void Operator::close() { input_ = nullptr; results_.clear(); @@ -750,11 +758,7 @@ uint64_t Operator::MemoryReclaimer::reclaim( memory::ScopedReclaimedBytesRecorder recoder(pool, &reclaimedBytes); op_->reclaim(targetBytes, stats); } - VELOX_CHECK_GE( - reclaimedBytes, - 0, - "Unexpected memory growth after reclaim from operator memory pool {}", - pool->name()); + op_->postReclaimCheck(reclaimedBytes); return reclaimedBytes; }, stats); diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 4a1b7f3f43ab..432fcc4c604a 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -560,6 +560,8 @@ class Operator : public BaseRuntimeStatWriter { uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats) {} + virtual void postReclaimCheck(int64_t reclaimedBytes) const; + const core::PlanNodeId& planNodeId() const { return operatorCtx_->planNodeId(); } diff --git a/velox/exec/OutputBuffer.cpp b/velox/exec/OutputBuffer.cpp index a066f471e7bd..0f8d4a2ddbb0 100644 --- a/velox/exec/OutputBuffer.cpp +++ b/velox/exec/OutputBuffer.cpp @@ -80,6 +80,412 @@ std::string ArbitraryBuffer::toString() const { hasNoMoreData()); } +uint64_t DestinationBuffer::BufferedPages::PageSpiller::size() const { + return pageSizes_.size(); +} + +uint64_t DestinationBuffer::BufferedPages::PageSpiller::totalBytes() const { + return totalBytes_; +} + +std::shared_ptr +DestinationBuffer::BufferedPages::PageSpiller::at(uint64_t index) { + VELOX_CHECK_LT(index, pageSizes_.size()); + const auto numBufferedPages = bufferedPages_.size(); + if (index < numBufferedPages) { + return bufferedPages_[index]; + } + + const auto pagesToUnspill = index + 1 - numBufferedPages; + bufferedPages_.reserve(numBufferedPages + pagesToUnspill); + for (uint32_t i = 0; i < pagesToUnspill; ++i) { + bufferedPages_.push_back(unspillNextPage()); + } + return bufferedPages_[index]; +} + +bool DestinationBuffer::BufferedPages::PageSpiller::isNullAt( + uint64_t index) const { + VELOX_CHECK_LT(index, pageSizes_.size()); + return !pageSizes_[index].has_value(); +} + +uint64_t DestinationBuffer::BufferedPages::PageSpiller::sizeAt( + uint64_t index) const { + VELOX_CHECK_LT(index, pageSizes_.size()); + const auto& pageSize = pageSizes_[index]; + VELOX_CHECK(pageSize.has_value()); + return pageSize.value(); +} + +void DestinationBuffer::BufferedPages::PageSpiller::deleteFront( + uint64_t numPages) { + VELOX_CHECK_LE(numPages, pageSizes_.size()); + for (uint32_t i = 0; i < numPages; ++i) { + totalBytes_ -= pageSizes_[i].has_value() ? pageSizes_[i].value() : 0; + } + pageSizes_.erase(pageSizes_.begin(), pageSizes_.begin() + numPages); + + const auto numBuffered = std::min(numPages, (uint64_t)bufferedPages_.size()); + bufferedPages_.erase( + bufferedPages_.begin(), bufferedPages_.begin() + numBuffered); + numPages -= numBuffered; + + for (; numPages > 0; --numPages) { + unspillNextPage(); + } +} + +std::vector> +DestinationBuffer::BufferedPages::PageSpiller::deleteAll() { + while (curFileStream_ != nullptr || !spillFilePaths_.empty()) { + bufferedPages_.push_back(unspillNextPage()); + } + VELOX_CHECK(spillFilePaths_.empty()); + VELOX_CHECK_NULL(curFileStream_); + auto deletedPages = std::move(bufferedPages_); + bufferedPages_.clear(); + pageSizes_.clear(); + totalBytes_ = 0; + return deletedPages; +} + +std::tuple> +DestinationBuffer::BufferedPages::PageSpiller::nextSpillWriteFile() { + std::string path = fmt::format("{}-{}", filePrefix_, nextFileId_++); + auto fs = filesystems::getFileSystem(path, nullptr); + return { + path, + fs->openFileForWrite( + path, + filesystems::FileOptions{ + {{filesystems::FileOptions::kFileCreateConfig.toString(), + fileCreateConfig_}}, + nullptr, + std::nullopt})}; +} + +namespace { +// A wrapper around a write file that provides buffer capability. +class BufferedWriteFile { + public: + BufferedWriteFile( + std::unique_ptr writeFile, + uint64_t flushThresholdBytes, + memory::MemoryPool* pool) + : flushThresholdBytes_(flushThresholdBytes), + pool_(pool), + writeFile_(std::move(writeFile)), + bufferStream_(std::make_unique(*pool_)) {} + + ~BufferedWriteFile() { + close(); + } + + void append(char* payload, int64_t bytes) { + VELOX_CHECK_NOT_NULL(bufferStream_); + if (flushThresholdBytes_ == 0) { + // Bypass the copy if no buffer is intended. + writeFile_->append(std::string_view(payload, bytes)); + return; + } + bufferStream_->write(payload, bytes); + bufferBytes_ += bytes; + checkFlush(); + } + + void append(std::unique_ptr&& iobuf) { + VELOX_CHECK_NOT_NULL(bufferStream_); + if (flushThresholdBytes_ == 0) { + // Bypass the copy if no buffer is intended. + writeFile_->append(std::move(iobuf)); + return; + } + for (auto range = iobuf->begin(); range != iobuf->end(); range++) { + bufferStream_->write( + reinterpret_cast(range->data()), range->size()); + bufferBytes_ += range->size(); + } + bufferBytes_ += iobuf->computeChainDataLength(); + checkFlush(); + } + + void close() { + auto iobuf = bufferStream_->getIOBuf(); + if (iobuf->computeChainDataLength() != 0) { + writeFile_->append(std::move(iobuf)); + } + writeFile_->close(); + bufferStream_.reset(); + } + + private: + void checkFlush() { + if (bufferBytes_ < flushThresholdBytes_) { + return; + } + writeFile_->append(bufferStream_->getIOBuf()); + bufferStream_ = std::make_unique(*pool_); + bufferBytes_ = 0; + } + + const uint64_t flushThresholdBytes_; + memory::MemoryPool* const pool_; + + std::unique_ptr writeFile_; + std::unique_ptr bufferStream_; + uint64_t bufferBytes_{0}; +}; + +} // namespace + +void DestinationBuffer::BufferedPages::PageSpiller::spill() { + if (pages_->empty()) { + return; + } + + auto [path, writeFile] = nextSpillWriteFile(); + BufferedWriteFile bufferedWriteFile( + std::move(writeFile), writeBufferSize_, pool_); + spillFilePaths_.push_back(path); + + // Spill file layout: + // --- Payload 0 --- + // (1B) is null page at 0 + // [ + // (8B) payload size at 0 + // (1B) has num rows at 0 + // [(8B) num rows at 0] + // (xB) payload at 0 + // ] + // --- Payload 1 --- + // ... + // --- Payload n --- + + const auto totalBytesBeforeSpill = totalBytes_; + pageSizes_.reserve(pageSizes_.size() + pages_->size()); + for (auto& page : *pages_) { + // Fill spilled page metadata to keep in memory. + if (page == nullptr) { + pageSizes_.push_back(std::nullopt); + } else { + const auto pageSize = page->size(); + pageSizes_.push_back(pageSize); + totalBytes_ += pageSize; + } + + // Spill payload headers. + uint8_t isNull = (page == nullptr) ? 1 : 0; + bufferedWriteFile.append(reinterpret_cast(&isNull), sizeof(uint8_t)); + if (page == nullptr) { + continue; + } + + int64_t pageBytes = 0; + if (page != nullptr) { + pageBytes = page->size(); + } + bufferedWriteFile.append( + reinterpret_cast(&pageBytes), sizeof(int64_t)); + + auto numRowsOpt = page->numRows(); + uint8_t hasNumRows = numRowsOpt.has_value() ? 1 : 0; + bufferedWriteFile.append(reinterpret_cast(&hasNumRows), 1); + if (numRowsOpt.has_value()) { + int64_t numRows = numRowsOpt.value(); + bufferedWriteFile.append( + reinterpret_cast(&numRows), sizeof(int64_t)); + } + + // Spill payload. + bufferedWriteFile.append(page->getIOBuf()); + VELOX_CHECK_GE(totalBytes_, totalBytesBeforeSpill); + spillStats_->wlock()->spilledBytes += totalBytes_ - totalBytesBeforeSpill; + } +} + +bool DestinationBuffer::BufferedPages::PageSpiller::empty() const { + if (!bufferedPages_.empty()) { + return false; + } + return curFileStream_ == nullptr && spillFilePaths_.empty(); +} + +void DestinationBuffer::BufferedPages::PageSpiller::ensureFileStream() { + if (curFileStream_ != nullptr) { + return; + } + VELOX_CHECK(!spillFilePaths_.empty()); + auto filePath = spillFilePaths_.front(); + auto fs = filesystems::getFileSystem(filePath, nullptr); + auto file = fs->openFileForRead(filePath); + curFileStream_ = std::make_unique( + std::move(file), readBufferSize_, pool_); + spillFilePaths_.pop_front(); +} + +namespace { +struct FreeData { + std::shared_ptr pool; + int64_t bytesToFree; +}; + +void freeFunc(void* data, void* userData) { + auto* freeData = reinterpret_cast(userData); + freeData->pool->free(data, freeData->bytesToFree); + delete freeData; +} +} // namespace + +std::shared_ptr +DestinationBuffer::BufferedPages::PageSpiller::unspillNextPage() { + VELOX_CHECK(!empty()); + ensureFileStream(); + + // Read payload headers + auto isNull = !!(curFileStream_->read()); + if (isNull) { + if (curFileStream_->atEnd()) { + curFileStream_.reset(); + } + return nullptr; + } + auto iobufBytes = curFileStream_->read(); + auto hasNumRows = curFileStream_->read() == 0 ? false : true; + int64_t numRows{0}; + if (hasNumRows) { + numRows = curFileStream_->read(); + } + + // Read payload + VELOX_CHECK_GE(curFileStream_->remainingSize(), iobufBytes); + void* rawBuf = pool_->allocate(iobufBytes); + curFileStream_->readBytes(reinterpret_cast(rawBuf), iobufBytes); + if (curFileStream_->atEnd()) { + curFileStream_.reset(); + } + + auto* userData = new FreeData(); + userData->pool = pool_->shared_from_this(); + userData->bytesToFree = iobufBytes; + auto iobuf = + folly::IOBuf::takeOwnership(rawBuf, iobufBytes, freeFunc, userData, true); + + return std::make_shared( + std::move(iobuf), + nullptr, + hasNumRows ? std::optional(numRows) : std::nullopt); +} + +void DestinationBuffer::setupSpiller( + memory::MemoryPool* pool, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) { + data_.setupSpiller(pool, spillConfig, destinationIdx_, spillStats); +} + +void DestinationBuffer::BufferedPages::setupSpiller( + memory::MemoryPool* pool, + const common::SpillConfig* spillConfig, + int32_t destinationIdx, + folly::Synchronized* spillStats) { + auto spillDir = spillConfig->getSpillDirPathCb(); + VELOX_CHECK(!spillDir.empty(), "Spill directory does not exist"); + + spiller_ = std::make_unique( + &pages_, + fmt::format( + "{}/{}-dest-{}-spill", + spillDir, + spillConfig->fileNamePrefix, + destinationIdx), + spillConfig->fileCreateConfig, + spillConfig->readBufferSize, + spillConfig->writeBufferSize, + pool, + spillStats); +} + +uint64_t DestinationBuffer::BufferedPages::size() const { + return (spiller_ == nullptr ? 0 : spiller_->size()) + pages_.size(); +} + +std::shared_ptr DestinationBuffer::BufferedPages::at( + uint64_t index) { + VELOX_CHECK_LT(index, size()); + if (spiller_ == nullptr) { + return pages_[index]; + } + const auto numSpilledPages = spiller_->size(); + if (index >= numSpilledPages) { + return pages_[index - numSpilledPages]; + } + return spiller_->at(index); +} + +bool DestinationBuffer::BufferedPages::isNullAt(uint64_t index) const { + VELOX_CHECK_LT(index, size()); + if (spiller_ == nullptr) { + return pages_[index] == nullptr; + } + const auto numSpilledPages = spiller_->size(); + if (index >= numSpilledPages) { + return pages_[index - numSpilledPages] == nullptr; + } + return spiller_->isNullAt(index); +} + +uint64_t DestinationBuffer::BufferedPages::sizeAt(uint64_t index) const { + VELOX_CHECK_LT(index, size()); + if (spiller_ == nullptr) { + VELOX_CHECK_NOT_NULL(pages_[index]); + return pages_[index]->size(); + } + const auto numSpilledPages = spiller_->size(); + if (index >= numSpilledPages) { + VELOX_CHECK_NOT_NULL(pages_[index - numSpilledPages]); + return pages_[index - numSpilledPages]->size(); + } + return spiller_->sizeAt(index); +} + +bool DestinationBuffer::BufferedPages::empty() const { + return (spiller_ == nullptr || spiller_->empty()) && pages_.empty(); +} + +void DestinationBuffer::spill() { + data_.spill(); + stats_.bytesSpilled = data_.spilledBytes(); +} + +void DestinationBuffer::BufferedPages::spill() { + VELOX_CHECK_NOT_NULL(spiller_); + spiller_->spill(); + pages_.clear(); +} + +uint64_t DestinationBuffer::BufferedPages::spilledBytes() const { + return spiller_ == nullptr ? 0 : spiller_->totalBytes(); +} + +void DestinationBuffer::BufferedPages::append( + std::shared_ptr page) { + pages_.push_back(std::move(page)); +} + +void DestinationBuffer::BufferedPages::deleteFront(uint64_t numPages) { + VELOX_CHECK_LE(numPages, size()); + if (spiller_ != nullptr) { + const auto numSpillerPages = std::min(spiller_->size(), numPages); + spiller_->deleteFront(numSpillerPages); + numPages -= numSpillerPages; + if (numPages == 0) { + return; + } + } + pages_.erase(pages_.begin(), pages_.begin() + numPages); +} + void DestinationBuffer::Stats::recordEnqueue(const SerializedPage& data) { const auto numRows = data.numRows(); VELOX_CHECK(numRows.has_value(), "SerializedPage's numRows must be valid"); @@ -119,15 +525,16 @@ DestinationBuffer::Data DestinationBuffer::getData( loadData(arbitraryBuffer, maxBytes); } - if (sequence - sequence_ >= data_.size()) { - if (sequence - sequence_ > data_.size()) { + const auto totalPages = data_.size(); + if (sequence - sequence_ >= totalPages) { + if (sequence - sequence_ > totalPages) { VLOG(1) << this << " Out of order get: " << sequence << " over " << sequence_ << " Setting second notify " << notifySequence_ << " / " << sequence; } if (maxBytes == 0) { std::vector remainingBytes; - if (arbitraryBuffer) { + if (arbitraryBuffer != nullptr) { arbitraryBuffer->getAvailablePageSizes(remainingBytes); } if (!remainingBytes.empty()) { @@ -136,7 +543,7 @@ DestinationBuffer::Data DestinationBuffer::getData( } notify_ = std::move(notify); aliveCheck_ = std::move(activeCheck); - if (sequence - sequence_ > data_.size()) { + if (sequence - sequence_ > totalPages) { notifySequence_ = std::min(notifySequence_, sequence); } else { notifySequence_ = sequence; @@ -149,15 +556,16 @@ DestinationBuffer::Data DestinationBuffer::getData( uint64_t resultBytes = 0; auto i = sequence - sequence_; if (maxBytes > 0) { - for (; i < data_.size(); ++i) { + for (; i < totalPages; ++i) { // nullptr is used as end marker - if (data_[i] == nullptr) { - VELOX_CHECK_EQ(i, data_.size() - 1, "null marker found in the middle"); + auto page = data_.at(i); + if (page == nullptr) { + VELOX_CHECK_EQ(i, totalPages - 1, "null marker found in the middle"); data.push_back(nullptr); break; } - data.push_back(data_[i]->getIOBuf()); - resultBytes += data_[i]->size(); + data.push_back(page->getIOBuf()); + resultBytes += page->size(); if (resultBytes >= maxBytes) { ++i; break; @@ -166,16 +574,17 @@ DestinationBuffer::Data DestinationBuffer::getData( } bool atEnd = false; std::vector remainingBytes; - remainingBytes.reserve(data_.size() - i); - for (; i < data_.size(); ++i) { - if (data_[i] == nullptr) { - VELOX_CHECK_EQ(i, data_.size() - 1, "null marker found in the middle"); + remainingBytes.reserve(totalPages - i); + for (; i < totalPages; ++i) { + const auto page = data_.at(i); + if (data_.isNullAt(i)) { + VELOX_CHECK_EQ(i, totalPages - 1, "null marker found in the middle"); atEnd = true; break; } - remainingBytes.push_back(data_[i]->size()); + remainingBytes.push_back(data_.sizeAt(i)); } - if (!atEnd && arbitraryBuffer) { + if (!atEnd && arbitraryBuffer != nullptr) { arbitraryBuffer->getAvailablePageSizes(remainingBytes); } if (data.empty() && remainingBytes.empty() && atEnd) { @@ -184,16 +593,19 @@ DestinationBuffer::Data DestinationBuffer::getData( return {std::move(data), std::move(remainingBytes), true}; } +DestinationBuffer::DestinationBuffer(int32_t destinationIdx) + : destinationIdx_(destinationIdx) {} + void DestinationBuffer::enqueue(std::shared_ptr data) { // Drop duplicate end markers. - if (data == nullptr && !data_.empty() && data_.back() == nullptr) { + if (data == nullptr && !data_.empty() && data_.isNullAt(data_.size() - 1)) { return; } if (data != nullptr) { stats_.recordEnqueue(*data); } - data_.push_back(std::move(data)); + data_.append(std::move(data)); } DataAvailable DestinationBuffer::getAndClearNotify() { @@ -265,34 +677,49 @@ std::vector> DestinationBuffer::acknowledge( return {}; } + const auto totalPages = data_.size(); VELOX_CHECK_LE( - numDeleted, data_.size(), "Ack received for a not yet produced item"); + numDeleted, totalPages, "Ack received for a not yet produced item"); std::vector> freed; for (auto i = 0; i < numDeleted; ++i) { - if (data_[i] == nullptr) { - VELOX_CHECK_EQ(i, data_.size() - 1, "null marker found in the middle"); + const auto page = data_.at(i); + if (page == nullptr) { + VELOX_CHECK_EQ(i, totalPages - 1, "null marker found in the middle"); break; } - stats_.recordAcknowledge(*data_[i]); - freed.push_back(std::move(data_[i])); + stats_.recordAcknowledge(*page); + freed.push_back(std::move(page)); } - data_.erase(data_.begin(), data_.begin() + numDeleted); + data_.deleteFront(numDeleted); + stats_.bytesSpilled = data_.spilledBytes(); sequence_ += numDeleted; return freed; } std::vector> -DestinationBuffer::deleteResults() { +DestinationBuffer::BufferedPages::deleteAll() { std::vector> freed; - for (auto i = 0; i < data_.size(); ++i) { - if (data_[i] == nullptr) { - VELOX_CHECK_EQ(i, data_.size() - 1, "null marker found in the middle"); + if (spiller_ != nullptr) { + spiller_->deleteAll(); + } + for (auto i = 0; i < pages_.size(); ++i) { + if (pages_[i] == nullptr) { + VELOX_CHECK_EQ(i, pages_.size() - 1, "null marker found in the middle"); break; } - stats_.recordDelete(*data_[i]); - freed.push_back(std::move(data_[i])); + freed.push_back(std::move(pages_[i])); + } + pages_.clear(); + return freed; +} + +std::vector> +DestinationBuffer::deleteResults() { + std::vector> freed = data_.deleteAll(); + stats_.bytesSpilled = data_.spilledBytes(); + for (const auto& page : freed) { + stats_.recordDelete(*page); } - data_.clear(); return freed; } @@ -321,23 +748,61 @@ void releaseAfterAcknowledge( } } +bool isPartitionedOutputPool(const memory::MemoryPool& pool) { + return folly::StringPiece(pool.name()).endsWith("PartitionedOutput"); +} } // namespace +PartitionedOutputNodeReclaimer::PartitionedOutputNodeReclaimer( + core::PartitionedOutputNode::Kind kind, + int32_t priority) + : MemoryReclaimer(priority), kind_(kind) {} + +bool PartitionedOutputNodeReclaimer::reclaimableBytes( + const memory::MemoryPool& pool, + uint64_t& reclaimableBytes) const { + reclaimableBytes = 0; + if (kind_ != core::PartitionedOutputNode::Kind::kPartitioned) { + return false; + } + reclaimableBytes = pool.reservedBytes(); + return true; +} + +uint64_t PartitionedOutputNodeReclaimer::reclaim( + memory::MemoryPool* pool, + uint64_t targetBytes, + uint64_t maxWaitMs, + memory::MemoryReclaimer::Stats& stats) { + const auto prevNodeReservedMemory = pool->reservedBytes(); + pool->visitChildren([&](memory::MemoryPool* child) { + VELOX_CHECK_EQ(child->kind(), memory::MemoryPool::Kind::kLeaf); + if (isPartitionedOutputPool(*child)) { + child->reclaim(targetBytes, maxWaitMs, stats); + return false; + } + return true; + }); + return pool->reservedBytes() - prevNodeReservedMemory; +} + OutputBuffer::OutputBuffer( std::shared_ptr task, PartitionedOutputNode::Kind kind, int numDestinations, - uint32_t numDrivers) + uint32_t numDrivers, + memory::MemoryPool* pool) : task_(std::move(task)), kind_(kind), maxSize_(task_->queryCtx()->queryConfig().maxOutputBufferSize()), continueSize_((maxSize_ * kContinuePct) / 100), arbitraryBuffer_( isArbitrary() ? std::make_unique() : nullptr), + pool_(pool), numDrivers_(numDrivers) { buffers_.reserve(numDestinations); for (int i = 0; i < numDestinations; i++) { - buffers_.push_back(std::make_unique()); + buffers_.push_back(createDestinationBuffer(i)); } finishedBufferStats_.resize(numDestinations); } @@ -389,12 +854,17 @@ void OutputBuffer::updateNumDrivers(uint32_t newNumDrivers) { } } +std::unique_ptr OutputBuffer::createDestinationBuffer( + int32_t destinationIdx) const { + return std::make_unique(destinationIdx); +} + void OutputBuffer::addOutputBuffersLocked(int numBuffers) { VELOX_CHECK(!noMoreBuffers_); VELOX_CHECK(!isPartitioned()); buffers_.reserve(numBuffers); for (int32_t i = buffers_.size(); i < numBuffers; ++i) { - auto buffer = std::make_unique(); + auto buffer = createDestinationBuffer(i); if (isBroadcast()) { for (const auto& data : dataToBroadcast_) { buffer->enqueue(data); @@ -741,6 +1211,97 @@ void OutputBuffer::getData( } } +bool OutputBuffer::canReclaim() const { + // We only enable spilling for partitioned mode. + return isPartitioned() && spillConfig_ != nullptr; +} + +void OutputBuffer::reclaim(uint64_t targetBytes) { + VELOX_CHECK(canReclaim()); + VELOX_CHECK_GT(targetBytes, 0); + VELOX_CHECK(isPartitioned()); + VELOX_CHECK_NOT_NULL(spillConfig_); + struct Candidate { + uint32_t destinationIdx; + int64_t reclaimableBytes; + }; + + std::lock_guard l(mutex_); + + // Make reclaim order based on buffers' in-memory size, from high to low. + std::vector candidates; + candidates.reserve(buffers_.size()); + for (uint32_t i = 0; i < buffers_.size(); ++i) { + if (buffers_[i] == nullptr) { + continue; + } + auto bufferStats = buffers_[i]->stats(); + candidates.push_back( + {i, bufferStats.bytesBuffered - bufferStats.bytesSpilled}); + } + std::sort( + candidates.begin(), + candidates.end(), + [&](auto& lhsCandidate, auto& rhsCandidate) { + return lhsCandidate.reclaimableBytes > rhsCandidate.reclaimableBytes; + }); + + struct SpillResult { + const std::exception_ptr error{nullptr}; + + explicit SpillResult(std::exception_ptr _error) + : error(std::move(_error)) {} + }; + + auto* spillExecutor = spillConfig_->executor; + std::vector>> spillTasks; + spillTasks.reserve(candidates.size()); + uint64_t spillBytes{0}; + for (const auto candidate : candidates) { + if (candidate.reclaimableBytes == 0) { + break; + } + spillTasks.push_back(memory::createAsyncMemoryReclaimTask( + [destinationIdx = candidate.destinationIdx, + buffer = buffers_[candidate.destinationIdx].get()]() { + try { + buffer->spill(); + return std::make_unique(nullptr); + } catch (const std::exception& e) { + LOG(ERROR) << "Reclaim from DestinationBuffer " << destinationIdx + << " failed: " << e.what(); + return std::make_unique(std::current_exception()); + } + })); + if ((spillTasks.size() > 1) && (spillExecutor != nullptr)) { + spillExecutor->add([source = spillTasks.back()]() { source->prepare(); }); + } + spillBytes += candidate.reclaimableBytes; + if (spillBytes >= targetBytes) { + break; + } + } + + SCOPE_EXIT { + for (auto& spillTask : spillTasks) { + // We consume the result for the pending tasks. This is a cleanup in the + // guard and must not throw. The first error is already captured before + // this runs. + try { + spillTask->move(); + } catch (const std::exception&) { + } + } + }; + + for (auto& spillTask : spillTasks) { + const auto result = spillTask->move(); + if (result->error) { + std::rethrow_exception(result->error); + } + } +} + void OutputBuffer::terminate() { VELOX_CHECK(!task_->isRunning()); @@ -780,7 +1341,7 @@ double OutputBuffer::getUtilization() const { return bufferedBytes_ / static_cast(maxSize_); } -bool OutputBuffer::isOverutilized() const { +bool OutputBuffer::isOverUtilized() const { return (bufferedBytes_ > (0.5 * maxSize_)) || atEnd_; } diff --git a/velox/exec/OutputBuffer.h b/velox/exec/OutputBuffer.h index 640c1bfcd8e7..fc14b50e8f92 100644 --- a/velox/exec/OutputBuffer.h +++ b/velox/exec/OutputBuffer.h @@ -15,11 +15,13 @@ */ #pragma once +#include "velox/common/file/FileInputStream.h" #include "velox/core/PlanNode.h" #include "velox/exec/ExchangeQueue.h" +#include "velox/exec/MemoryReclaimer.h" +class OutputBufferTest; namespace facebook::velox::exec { - /// nullptr in pages indicates that there is no more data. /// sequence is the same as specified in BufferManager::getData call. The /// caller is expected to advance sequence by the number of entries in groups @@ -92,6 +94,8 @@ class ArbitraryBuffer { class DestinationBuffer { public: + explicit DestinationBuffer(int32_t destinationIdx = -1); + /// The data transferred by the destination buffer has two phases: /// 1. Buffered: the data resides in the buffer after enqueued and before /// acked / deleted. @@ -106,12 +110,16 @@ class DestinationBuffer { bool finished{false}; - /// Number of buffered bytes / rows / pages. + /// Snapshot number of buffered bytes / rows / pages (both in-memory and + /// spilled) int64_t bytesBuffered{0}; int64_t rowsBuffered{0}; int64_t pagesBuffered{0}; - /// Number of sent bytes / rows / pages. + /// Snapshot number of spilled bytes + int64_t bytesSpilled{0}; + + /// Cumulated number of sent bytes / rows / pages. int64_t bytesSent{0}; int64_t rowsSent{0}; int64_t pagesSent{0}; @@ -176,6 +184,14 @@ class DestinationBuffer { /// the callback. DataAvailable getAndClearNotify(); + void setupSpiller( + memory::MemoryPool* pool, + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats); + + /// Spills the current 'data_'. + void spill(); + /// Finishes this destination buffer, set finished stats. void finish(); @@ -184,17 +200,163 @@ class DestinationBuffer { std::string toString(); + /// Representations of an ordered list of pages, including both spilled (if + /// spill is enabled) and in-memory ones. If spill is enabled, it preserves + /// the order of the pages across all spilled and in-memory pages. + /// + /// The spilled pages are always in the front if any. This is because after + /// spilling, all front pages are spilled and any upcoming in-memory pages are + /// appended at the back. + class BufferedPages { + public: + BufferedPages() = default; + + class PageSpiller { + public: + PageSpiller( + std::vector>* pages, + const std::string& filePrefix, + const std::string& fileCreateConfig, + uint64_t readBufferSize, + uint64_t writeBufferSize, + memory::MemoryPool* pool, + folly::Synchronized* spillStats) + : filePrefix_(filePrefix), + fileCreateConfig_(fileCreateConfig), + readBufferSize_(readBufferSize), + writeBufferSize_(writeBufferSize), + spillStats_(spillStats), + pages_(pages), + pool_(pool) { + VELOX_CHECK_NOT_NULL(pool_); + } + + /// Spills all the in memory buffers to file. All currently in-memory + /// serialized pages are spilled into the same file. The method does not + /// free the original in-memory structure. It is caller's responsibility + /// to free them. + void spill(); + + /// Returns true if there are any pages that are yet to be unspilled. + bool empty() const; + + uint64_t size() const; + + uint64_t totalBytes() const; + + std::shared_ptr at(uint64_t index); + + bool isNullAt(uint64_t index) const; + + uint64_t sizeAt(uint64_t index) const; + + /// Delete 'numPages' from the front. + void deleteFront(uint64_t numPages); + + /// Delete all data, spilled or buffered. Returns the deleted data. + std::vector> deleteAll(); + + private: + std::tuple> nextSpillWriteFile(); + + // Unspills one serialized page and returns it. + std::shared_ptr unspillNextPage(); + + void ensureFileStream(); + + const std::string filePrefix_; + + const std::string fileCreateConfig_; + + const uint64_t readBufferSize_; + + const uint64_t writeBufferSize_; + + folly::Synchronized* const spillStats_; + + std::vector>* const pages_; + + memory::MemoryPool* const pool_; + + // Each spilled file represents a series of 'SerializedPage'. + std::deque spillFilePaths_; + + std::unique_ptr curFileStream_; + + // Page sizes in all spilled files. A nullopt represents null page. + std::vector> pageSizes_; + + // A small number of front pages buffered in memory from spilled pages. + // These pages will be kept in memory and won't be spilled again. + std::vector> bufferedPages_; + + uint64_t totalBytes_{0}; + + uint32_t nextFileId_{0}; + + friend class ::OutputBufferTest; + }; + + void setupSpiller( + memory::MemoryPool* pool, + const common::SpillConfig* spillConfig, + int32_t destinationIdx, + folly::Synchronized* spillStats); + + /// Returns total number of pages currently in the 'DestinationBuffer', + /// including both in-memory ones in 'data_' and spilled ones in 'spiller_'. + uint64_t size() const; + + /// Returns the page at 'index'. + std::shared_ptr at(uint64_t index); + + /// Returns if the page at 'index' is null. + bool isNullAt(uint64_t index) const; + + /// Returns the size of the page at 'index'. + uint64_t sizeAt(uint64_t index) const; + + bool empty() const; + + /// Appends 'page' to the back of the buffered pages. + void append(std::shared_ptr page); + + /// Delete first 'numPages' from 'this'. + void deleteFront(uint64_t numPages); + + /// Delete all pages from the buffer. + std::vector> deleteAll(); + + /// Spills all the pages and remove them from memory. + void spill(); + + /// Snapshot of currently spilled bytes. + uint64_t spilledBytes() const; + + private: + std::vector> pages_; + std::unique_ptr spiller_; + }; + private: void clearNotify(); - std::vector> data_; + const int32_t destinationIdx_; + + BufferedPages data_; + // The sequence number of the first in 'data_'. int64_t sequence_ = 0; + DataAvailableCallback notify_{nullptr}; + DataConsumerActiveCheckCallback aliveCheck_{nullptr}; + // The sequence number of the first item to pass to 'notify'. int64_t notifySequence_{0}; + uint64_t notifyMaxBytes_{0}; + Stats stats_; }; @@ -261,7 +423,8 @@ class OutputBuffer { std::shared_ptr task, core::PartitionedOutputNode::Kind kind, int numDestinations, - uint32_t numDrivers); + uint32_t numDrivers, + memory::MemoryPool* pool = nullptr); core::PartitionedOutputNode::Kind kind() const { return kind_; @@ -319,7 +482,26 @@ class OutputBuffer { /// and will start blocking producers soon. This is used to dynamically scale /// the number of consumers, for example, increase number of TableWriter /// tasks. - bool isOverutilized() const; + bool isOverUtilized() const; + + /// Returns if this 'OutputBuffer' can be reclaimed. Currently only + /// partitioned mode is supported for reclaim. + /// + /// TODO: In fact functionality-wise all modes reclaim shall be supported, the + /// performance for arbitrary and broadcast spill is sub-optimal. + /// Optimizations need to be done to enable spill for these two modes. + bool canReclaim() const; + + void reclaim(uint64_t targetBytes); + + void setupSpiller( + const common::SpillConfig* spillConfig, + folly::Synchronized* spillStats) { + spillConfig_ = spillConfig; + for (auto& buffer : buffers_) { + buffer->setupSpiller(pool_, spillConfig, spillStats); + } + } /// Gets the Stats of this output buffer. Stats stats(); @@ -348,8 +530,11 @@ class OutputBuffer { const std::vector>& freed, std::vector& promises); - /// Given an updated total number of broadcast buffers, add any missing ones - /// and enqueue data that has been produced so far (e.g. dataToBroadcast_). + std::unique_ptr createDestinationBuffer( + int32_t destinationIdx) const; + + // Given an updated total number of broadcast buffers, add any missing ones + // and enqueue data that has been produced so far (e.g. dataToBroadcast_). void addOutputBuffersLocked(int numBuffers); void enqueueBroadcastOutputLocked( @@ -380,15 +565,23 @@ class OutputBuffer { } const std::shared_ptr task_; + const core::PartitionedOutputNode::Kind kind_; - /// If 'bufferedBytes_' > 'maxSize_', each producer is blocked after adding - /// data. + + // If 'bufferedBytes_' > 'maxSize_', each producer is blocked after adding + // data. const uint64_t maxSize_; + // When 'bufferedBytes_' goes below 'continueSize_', blocked producers are // resumed. const uint64_t continueSize_; + const std::unique_ptr arbitraryBuffer_; + memory::MemoryPool* const pool_; + + const common::SpillConfig* spillConfig_{nullptr}; + // Total number of drivers expected to produce results. This number will // decrease in the end of grouped execution, when we understand the real // number of producer drivers (depending on the number of split groups). @@ -404,27 +597,37 @@ class OutputBuffer { std::vector> dataToBroadcast_; std::mutex mutex_; + // Actual data size in 'buffers_'. int64_t bufferedBytes_{0}; + // The number of buffered pages which corresponds to 'bufferedBytes_'. int64_t bufferedPages_{0}; + // The total number of output bytes, rows and pages. uint64_t numOutputBytes_{0}; uint64_t numOutputRows_{0}; uint64_t numOutputPages_{0}; + std::vector promises_; + // The next buffer index in 'buffers_' to load data from arbitrary buffer // which is only used by arbitrary output type. int32_t nextArbitraryLoadBufferIndex_{0}; + // One buffer per destination. std::vector> buffers_; + // The sizes of buffers_ and finishedBufferStats_ are the same, but // finishedBufferStats_[i] is set if and only if buffers_[i] is null as // the buffer is finished and deleted. std::vector finishedBufferStats_; + uint32_t numFinished_{0}; + // When this reaches buffers_.size(), 'this' can be freed. int numFinalAcknowledges_ = 0; + bool atEnd_ = false; // Time since last change in bufferedBytes_. Used to compute total time data @@ -435,4 +638,30 @@ class OutputBuffer { double totalBufferedBytesMs_; }; +class PartitionedOutputNodeReclaimer final : public exec::MemoryReclaimer { + public: + static std::unique_ptr create( + core::PartitionedOutputNode::Kind kind, + int32_t priority) { + return std::unique_ptr( + new PartitionedOutputNodeReclaimer(kind, priority)); + } + + bool reclaimableBytes( + const memory::MemoryPool& pool, + uint64_t& reclaimableBytes) const final; + + uint64_t reclaim( + memory::MemoryPool* pool, + uint64_t targetBytes, + uint64_t maxWaitMs, + memory::MemoryReclaimer::Stats& stats) final; + + private: + PartitionedOutputNodeReclaimer( + core::PartitionedOutputNode::Kind kind, + int32_t priority); + + core::PartitionedOutputNode::Kind kind_; +}; } // namespace facebook::velox::exec diff --git a/velox/exec/OutputBufferManager.cpp b/velox/exec/OutputBufferManager.cpp index 0425218d175a..2ace1b939129 100644 --- a/velox/exec/OutputBufferManager.cpp +++ b/velox/exec/OutputBufferManager.cpp @@ -113,23 +113,31 @@ bool OutputBufferManager::getData( return false; } -void OutputBufferManager::initializeTask( +std::shared_ptr OutputBufferManager::initializeTask( std::shared_ptr task, core::PartitionedOutputNode::Kind kind, int numDestinations, - int numDrivers) { + int numDrivers, + memory::MemoryPool* pool) { const auto& taskId = task->taskId(); + std::shared_ptr buffer; buffers_.withLock([&](auto& buffers) { auto it = buffers.find(taskId); if (it == buffers.end()) { - buffers[taskId] = std::make_shared( - std::move(task), kind, numDestinations, numDrivers); + buffer = std::make_shared( + std::move(task), + kind, + numDestinations, + numDrivers, + pool); + buffers[taskId] = buffer; } else { VELOX_FAIL( "Registering an output buffer for pre-existing taskId {}", taskId); } }); + return buffer; } bool OutputBufferManager::updateOutputBuffers( @@ -190,10 +198,10 @@ double OutputBufferManager::getUtilization(const std::string& taskId) { return 0; } -bool OutputBufferManager::isOverutilized(const std::string& taskId) { +bool OutputBufferManager::isOverUtilized(const std::string& taskId) { auto buffer = getBufferIfExists(taskId); if (buffer != nullptr) { - return buffer->isOverutilized(); + return buffer->isOverUtilized(); } return false; } diff --git a/velox/exec/OutputBufferManager.h b/velox/exec/OutputBufferManager.h index 038d42cdce77..129a350cf91e 100644 --- a/velox/exec/OutputBufferManager.h +++ b/velox/exec/OutputBufferManager.h @@ -29,11 +29,12 @@ class OutputBufferManager { explicit OutputBufferManager(Options /*unused*/) {} - void initializeTask( + std::shared_ptr initializeTask( std::shared_ptr task, core::PartitionedOutputNode::Kind kind, int numDestinations, - int numDrivers); + int numDrivers, + memory::MemoryPool* pool = nullptr); /// Updates the number of buffers. Returns true if the buffer exists for a /// given taskId, else returns false. @@ -122,7 +123,7 @@ class OutputBufferManager { // If the output buffer from a task of taskId is over-utilized and blocks its // producers. When the task of this taskId is not found, return false. - bool isOverutilized(const std::string& taskId); + bool isOverUtilized(const std::string& taskId); // Returns nullopt when the specified output buffer doesn't exist. std::optional stats(const std::string& taskId); diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index a08114210245..29439eeebb62 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -73,7 +73,8 @@ BlockingReason Destination::advance( const auto firstRow = rowIdx_; const uint32_t adjustedMaxBytes = (maxBytes * targetSizePct_) / 100; if (bytesInCurrent_ >= adjustedMaxBytes) { - return flush(bufferManager, bufferReleaseFn, future); + int64_t bytesFlushed{0}; + return flush(bufferManager, bufferReleaseFn, bytesFlushed, future); } // Collect rows to serialize. @@ -110,7 +111,8 @@ BlockingReason Destination::advance( *atEnd = true; } if (shouldFlush || (eagerFlush_ && rowsInCurrent_ > 0)) { - return flush(bufferManager, bufferReleaseFn, future); + int64_t bytesFlushed{0}; + return flush(bufferManager, bufferReleaseFn, bytesFlushed, future); } return BlockingReason::kNotBlocked; } @@ -118,7 +120,9 @@ BlockingReason Destination::advance( BlockingReason Destination::flush( OutputBufferManager& bufferManager, const std::function& bufferReleaseFn, + int64_t& bytesFlushed, ContinueFuture* future) { + VELOX_CHECK_EQ(bytesFlushed, 0); if (!current_ || rowsInCurrent_ == 0) { return BlockingReason::kNotBlocked; } @@ -141,11 +145,12 @@ BlockingReason Destination::flush( rowsInCurrent_ = 0; setTargetSizePct(); + auto ioBuf = stream.getIOBuf(bufferReleaseFn); + bytesFlushed = ioBuf->computeChainDataLength(); bool blocked = bufferManager.enqueue( taskId_, destination_, - std::make_unique( - stream.getIOBuf(bufferReleaseFn), nullptr, flushedRows), + std::make_unique(std::move(ioBuf), nullptr, flushedRows), future); recordEnqueued_(flushedBytes, flushedRows); @@ -177,7 +182,10 @@ PartitionedOutput::PartitionedOutput( planNode->outputType(), operatorId, planNode->id(), - "PartitionedOutput"), + "PartitionedOutput", + planNode->canSpill(ctx->queryConfig()) + ? ctx->makeSpillConfig(operatorId) + : std::nullopt), keyChannels_(toChannels(planNode->inputType(), planNode->keys())), numDestinations_(planNode->numPartitions()), replicateNullsAndAny_(planNode->isReplicateNullsAndAny()), @@ -204,6 +212,10 @@ PartitionedOutput::PartitionedOutput( serdeOptions_(getVectorSerdeOptions( operatorCtx_->driverCtx()->queryConfig(), planNode->serdeKind())) { + if (spillConfig_.has_value()) { + bufferManager_.lock()->getBufferIfExists(taskId())->setupSpiller( + spillConfig(), &spillStats_); + } if (!planNode->isPartitioned()) { VELOX_USER_CHECK_EQ(numDestinations_, 1); } @@ -391,6 +403,15 @@ RowVectorPtr PartitionedOutput::getOutput() { return nullptr; } + auto testSpillGuard = folly::makeGuard([self = this]() { + // Test-only spill path. + if (testingTriggerSpill(self->pool()->name())) { + Operator::ReclaimableSectionGuard guard(self); + memory::testingRunArbitration(self->pool()); + return; + } + }); + blockingReason_ = BlockingReason::kNotBlocked; detail::Destination* blockedDestination = nullptr; auto bufferManager = bufferManager_.lock(); @@ -442,7 +463,9 @@ RowVectorPtr PartitionedOutput::getOutput() { destination->serializedBytes() < kMinDestinationSize) { continue; } - destination->flush(*bufferManager, bufferReleaseFn_, nullptr); + int64_t bytesFlushed{0}; + destination->flush( + *bufferManager, bufferReleaseFn_, bytesFlushed, nullptr); } return nullptr; } @@ -454,7 +477,9 @@ RowVectorPtr PartitionedOutput::getOutput() { if (destination->isFinished()) { continue; } - destination->flush(*bufferManager, bufferReleaseFn_, nullptr); + int64_t bytesFlushed{0}; + destination->flush( + *bufferManager, bufferReleaseFn_, bytesFlushed, nullptr); destination->setFinished(); destination->updateStats(this); } @@ -488,4 +513,33 @@ void PartitionedOutput::close() { destinations_.clear(); } +void PartitionedOutput::reclaim( + uint64_t targetBytes, + memory::MemoryReclaimer::Stats& /* stats */) { + auto* driver = operatorCtx_->driver(); + VELOX_CHECK_NOT_NULL(driver); + VELOX_CHECK(!nonReclaimableSection_); + const auto& task = driver->task(); + VELOX_CHECK(task->pauseRequested()); + + auto bufferManager = bufferManager_.lock(); + const std::vector operators = + task->findPeerOperators(operatorCtx_->driverCtx()->pipelineId, this); + for (int32_t i = 0; i < operators.size(); ++i) { + auto* partitionedOutputOp = dynamic_cast(operators[i]); + VELOX_CHECK_NOT_NULL(partitionedOutputOp); + for (auto& destination : partitionedOutputOp->destinations_) { + int64_t bytesFlushed{0}; + destination->flush( + *bufferManager, + partitionedOutputOp->bufferReleaseFn_, + bytesFlushed, + nullptr); + targetBytes += bytesFlushed; + } + } + auto outputBuffer = bufferManager->getBufferIfExists(task->taskId()); + VELOX_CHECK_NOT_NULL(outputBuffer); + outputBuffer->reclaim(targetBytes); +} } // namespace facebook::velox::exec diff --git a/velox/exec/PartitionedOutput.h b/velox/exec/PartitionedOutput.h index 5a1c44cf0b19..d17487ecd82c 100644 --- a/velox/exec/PartitionedOutput.h +++ b/velox/exec/PartitionedOutput.h @@ -55,7 +55,7 @@ class Destination { } /// Serializes row from 'output' till either 'maxBytes' have been serialized - /// or + /// or 'targetNumRows_' have been serialized. BlockingReason advance( uint64_t maxBytes, const std::vector& sizes, @@ -71,6 +71,7 @@ class Destination { BlockingReason flush( OutputBufferManager& bufferManager, const std::function& bufferReleaseFn, + int64_t& bytesFlushed, ContinueFuture* future); bool isFinished() const { @@ -185,6 +186,9 @@ class PartitionedOutput : public Operator { void close() override; + void reclaim(uint64_t targetBytes, memory::MemoryReclaimer::Stats& stats) + override; + static void testingSetMinCompressionRatio(float ratio) { minCompressionRatio_ = ratio; } @@ -205,6 +209,15 @@ class PartitionedOutput : public Operator { // Collect all rows with null keys into nullRows_. void collectNullRows(); + // Partitioned output has a coordinated reclaim mechanism. The overall memory + // from partitioned output node level (across all partitioned output + // operators) is guaranteed to decrease. But on a single operator level it is + // not guaranteed. This is due to: + // 1) The flushing during reclaim might increase the memory of this operator. + // 2) The spilled pages do not necessarily all coming from this particular + // operator. + void postReclaimCheck(int64_t reclaimedBytes) const override {}; + // If compression in serde is enabled, this is the minimum compression that // must be achieved before starting to skip compression. Used for testing. inline static float minCompressionRatio_ = 0.8; @@ -241,7 +254,7 @@ class PartitionedOutput : public Operator { // do it only once for an entire input processing across different // destinations. std::unique_ptr outputCompactRow_; - // Simialr to 'outputcompactRow_' for unsafe row serde format. + // Similar to 'outputCompactRow_' for unsafe row serde format. std::unique_ptr outputUnsafeRow_; // Reusable memory. diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 587c571b76f9..8aeb8cbc6dc5 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -19,6 +19,7 @@ #include #include "velox/common/base/Counters.h" +#include "velox/common/base/PrefixSortConfig.h" #include "velox/common/base/StatsReporter.h" #include "velox/common/file/FileSystems.h" #include "velox/common/testutil/TestValue.h" @@ -161,6 +162,10 @@ bool isHashJoinOperator(const std::string& operatorType) { return (operatorType == "HashBuild") || (operatorType == "HashProbe"); } +bool isPartitionedOutputOperator(const std::string& operatorType) { + return operatorType == "PartitionedOutput"; +} + // Moves split promises from one vector to another. void movePromisesOut( std::vector& from, @@ -509,6 +514,27 @@ velox::memory::MemoryPool* Task::getOrAddNodePool( return nodePool; } +memory::MemoryPool* Task::getOrAddPartitionedOutputNodePool( + const core::PlanNodeId& planNodeId) { + if (nodePools_.count(planNodeId) == 1) { + return nodePools_[planNodeId]; + } + for (auto& factory : driverFactories_) { + if (auto partitionedOutputNode = factory->needsPartitionedOutput()) { + VELOX_CHECK_EQ(partitionedOutputNode->id(), planNodeId); + childPools_.push_back(pool_->addAggregateChild( + fmt::format("node.{}", planNodeId), createNodeReclaimer([&]() { + return PartitionedOutputNodeReclaimer::create( + partitionedOutputNode->kind(), 0); + }))); + auto* nodePool = childPools_.back().get(); + nodePools_[planNodeId] = nodePool; + return nodePool; + } + } + VELOX_UNREACHABLE("No partitioned output node found."); +} + memory::MemoryPool* Task::getOrAddJoinNodePool( const core::PlanNodeId& planNodeId, uint32_t splitGroupId) { @@ -548,6 +574,14 @@ std::unique_ptr Task::createExchangeClientReclaimer() return exec::MemoryReclaimer::create(); } +std::unique_ptr Task::createOutputBufferReclaimer() + const { + if (pool()->reclaimer() == nullptr) { + return nullptr; + } + return exec::MemoryReclaimer::create(); +} + std::unique_ptr Task::createTaskReclaimer() { // We shall only create the task memory reclaimer once on task memory pool // creation. @@ -568,6 +602,8 @@ velox::memory::MemoryPool* Task::addOperatorPool( velox::memory::MemoryPool* nodePool; if (isHashJoinOperator(operatorType)) { nodePool = getOrAddJoinNodePool(planNodeId, splitGroupId); + } else if (isPartitionedOutputOperator(operatorType)) { + nodePool = getOrAddPartitionedOutputNodePool(planNodeId); } else { nodePool = getOrAddNodePool(planNodeId); } @@ -618,6 +654,16 @@ velox::memory::MemoryPool* Task::addExchangeClientPool( return childPools_.back().get(); } +velox::memory::MemoryPool* Task::addOutputBufferPool( + const core::PlanNodeId& planNodeId) { + auto* nodePool = getOrAddPartitionedOutputNodePool(planNodeId); + childPools_.push_back(nodePool->addLeafChild( + fmt::format("outputBuffer.{}", planNodeId), + true, + createOutputBufferReclaimer())); + return childPools_.back().get(); +} + bool Task::supportSerialExecutionMode() const { if (consumerSupplier_) { return false; @@ -978,11 +1024,12 @@ void Task::initializePartitionOutput() { if (partitionedOutputNode != nullptr) { VELOX_CHECK(hasPartitionedOutput()); VELOX_CHECK_GT(numOutputDrivers, 0); - bufferManager->initializeTask( + auto outputBuffer = bufferManager->initializeTask( shared_from_this(), partitionedOutputNode->kind(), partitionedOutputNode->numPartitions(), - numOutputDrivers); + numOutputDrivers, + addOutputBufferPool(partitionedOutputNode->id())); } } @@ -2291,7 +2338,7 @@ TaskStats Task::taskStats() const { auto bufferManager = bufferManager_.lock(); taskStats.outputBufferUtilization = bufferManager->getUtilization(taskId_); - taskStats.outputBufferOverutilized = bufferManager->isOverutilized(taskId_); + taskStats.outputBufferOverutilized = bufferManager->isOverUtilized(taskId_); if (!taskStats.outputBufferStats.has_value()) { taskStats.outputBufferStats = bufferManager->stats(taskId_); } diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 253153420a83..ad2917f0c977 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -796,6 +796,11 @@ class Task : public std::enable_shared_from_this { // to ensure lifetime and returns a raw pointer. memory::MemoryPool* getOrAddNodePool(const core::PlanNodeId& planNodeId); + // Smilar to getOrAddNodePool but creates the memory pool instance for a + // partitioned output plan node. + memory::MemoryPool* getOrAddPartitionedOutputNodePool( + const core::PlanNodeId& planNodeId); + // Similar to getOrAddNodePool but creates the memory pool instance for a hash // join plan node. If 'splitGroupId' is not kUngroupedGroupId, it specifies // the split group id under the grouped execution mode. @@ -818,6 +823,10 @@ class Task : public std::enable_shared_from_this { std::unique_ptr createExchangeClientReclaimer() const; + // Creates a memory reclaimer instance for the output buffer of the task, if + // the task memory pool has set memory reclaimer. + std::unique_ptr createOutputBufferReclaimer() const; + // Creates a memory reclaimer instance for this task. If the query memory // pool doesn't set memory reclaimer, then the function simply returns null. // Otherwise, it creates a customized memory reclaimer for this task. @@ -830,6 +839,9 @@ class Task : public std::enable_shared_from_this { const core::PlanNodeId& planNodeId, uint32_t pipelineId); + velox::memory::MemoryPool* addOutputBufferPool( + const core::PlanNodeId& planNodeId); + // Invoked to remove this task from the output buffer manager if it has set // output buffer. void maybeRemoveFromOutputBufferManager(); diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index c92038334d3c..ba24d91cee2b 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -28,6 +28,7 @@ #include "velox/exec/tests/utils/LocalExchangeSource.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/SerializedPageUtil.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" using namespace facebook::velox::exec::test; @@ -2825,6 +2826,79 @@ TEST_P(MultiFragmentTest, scaledTableScan) { } } +DEBUG_ONLY_TEST_P(MultiFragmentTest, partitionedOutputSpill) { + filesystems::registerLocalFileSystem(); + std::shared_ptr spillDirectory; + spillDirectory = exec::test::TempDirectoryPath::create(); + + const int numSplits = 20; + std::vector> splitFiles; + std::vector splitVectors; + for (auto i = 0; i < numSplits; ++i) { + auto vectors = makeVectors(10, 1'000); + auto filePath = TempFilePath::create(); + writeToFile(filePath->getPath(), vectors); + splitFiles.push_back(std::move(filePath)); + splitVectors.insert(splitVectors.end(), vectors.begin(), vectors.end()); + } + + createDuckDbTable(splitVectors); + + std::atomic_bool noMoreInput{false}; + folly::EventCount consumerBlocker; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::noMoreInput", + std::function(([&](Operator* op) { + if (dynamic_cast(op) != nullptr) { + ASSERT_FALSE(noMoreInput.load()); + noMoreInput = true; + consumerBlocker.notifyAll(); + } + }))); + + // Create a task with partitioned output. + configSettings_[core::QueryConfig::kSpillEnabled] = "true"; + configSettings_[core::QueryConfig::kPartitionedOutputSpillEnabled] = "true"; + core::PlanNodeId planNodeId; + auto partitionedOutputPlan = + PlanBuilder() + .tableScan(rowType_) + .partitionedOutput({}, 1, {}, GetParam().serdeKind) + .capturePlanNodeId(planNodeId) + .planNode(); + auto partitionedOutputTaskId = makeTaskId("producer", 0); + auto partitionedOutputTask = + makeTask(partitionedOutputTaskId, partitionedOutputPlan, 0); + TestScopedSpillInjection scopedSpillInjection(100); + partitionedOutputTask->setSpillDirectory(spillDirectory->getPath()); + partitionedOutputTask->start(1); + addHiveSplits(partitionedOutputTask, splitFiles); + + // Create a task with an exchange consuming from the partitioned output. + auto exchangePlan = + PlanBuilder() + .exchange(partitionedOutputPlan->outputType(), GetParam().serdeKind) + .planNode(); + + consumerBlocker.await([&]() { return noMoreInput.load(); }); + + test::AssertQueryBuilder(exchangePlan, duckDbQueryRunner_) + .split(remoteSplit(partitionedOutputTaskId)) + .config( + core::QueryConfig::kShuffleCompressionKind, + common::compressionKindToString(GetParam().compressionKind)) + .assertResults( + "SELECT c0, c1, c2, c3, c4, c5 FROM tmp"); + + ASSERT_TRUE(waitForTaskCompletion(partitionedOutputTask.get())); + auto taskStats = partitionedOutputTask->taskStats(); + auto planStatsMap = exec::toPlanStats(taskStats); + auto& planStats = planStatsMap.at(planNodeId); + ASSERT_GT(taskStats.memoryReclaimCount, 0); + ASSERT_GT(taskStats.memoryReclaimMs, 0); + ASSERT_GT(planStats.spilledBytes, 0); +} + VELOX_INSTANTIATE_TEST_SUITE_P( MultiFragmentTest, MultiFragmentTest, diff --git a/velox/exec/tests/OutputBufferManagerTest.cpp b/velox/exec/tests/OutputBufferManagerTest.cpp index 3902ead5ffc4..4dbd89672be1 100644 --- a/velox/exec/tests/OutputBufferManagerTest.cpp +++ b/velox/exec/tests/OutputBufferManagerTest.cpp @@ -19,8 +19,11 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/dwio/common/tests/utils/BatchMaker.h" #include "velox/exec/Task.h" +#include "velox/exec/tests/utils/OperatorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/SerializedPageUtil.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/exec/tests/utils/TempFilePath.h" #include "velox/serializers/CompactRowSerializer.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/serializers/UnsafeRowSerializer.h" @@ -1613,3 +1616,593 @@ VELOX_INSTANTIATE_TEST_SUITE_P( AllOutputBufferManagerTestSuite, AllOutputBufferManagerTest, testing::ValuesIn(AllOutputBufferManagerTest::getTestParams())); + +using PageSpiller = DestinationBuffer::BufferedPages::PageSpiller; +using BufferedPages = DestinationBuffer::BufferedPages; +class OutputBufferTest : public exec::test::OperatorTestBase { + public: + void SetUp() override { + OperatorTestBase::SetUp(); + filesystems::registerLocalFileSystem(); + rng_.seed(0); + } + + protected: + std::vector> generateData( + uint32_t numPages, + int64_t maxPageSize, + bool hasVoidedNumRows, + int64_t maxNumRows) { + std::vector> pages; + pages.reserve(numPages); + for (auto i = 0; i < numPages; ++i) { + auto iobufBytes = folly::Random().rand64(maxPageSize, rng_); + + // Setup a chained iobuf. + std::unique_ptr iobuf; + if (iobufBytes > 1) { + auto firstHalfBytes = iobufBytes / 2; + iobuf = folly::IOBuf::create(firstHalfBytes); + std::memset(iobuf->writableData(), 'x', firstHalfBytes); + iobuf->append(firstHalfBytes); + + auto secondHalfBytes = iobufBytes - firstHalfBytes; + auto secondHalfBuf = folly::IOBuf::create(secondHalfBytes); + std::memset(secondHalfBuf->writableData(), 'y', secondHalfBytes); + secondHalfBuf->append(secondHalfBytes); + iobuf->prependChain(std::move(secondHalfBuf)); + } else { + iobuf = folly::IOBuf::create(iobufBytes); + std::memset(iobuf->writableData(), 'x', iobufBytes); + iobuf->append(iobufBytes); + } + + std::optional numRowsOpt; + if (!hasVoidedNumRows || folly::Random().oneIn(2, rng_)) { + numRowsOpt = std::optional(folly::Random().rand64(maxNumRows, rng_)); + } + pages.push_back(std::make_shared( + std::move(iobuf), nullptr, numRowsOpt)); + } + return pages; + } + + void checkIOBufsEqual( + std::unique_ptr& buf1, + std::unique_ptr& buf2) { + auto coalescedBuf1 = buf1->coalesce(); + auto coalescedBuf2 = buf2->coalesce(); + ASSERT_EQ(coalescedBuf1.size(), coalescedBuf2.size()); + ASSERT_EQ( + std::memcmp( + coalescedBuf1.data(), coalescedBuf2.data(), coalescedBuf1.size()), + 0); + } + + void checkSerializedPageEqual(SerializedPage& page1, SerializedPage& page2) { + ASSERT_EQ(page1.numRows().has_value(), page2.numRows().has_value()); + if (page1.numRows().has_value()) { + ASSERT_EQ(page1.numRows().value(), page1.numRows().value()); + } + auto buf1 = page1.getIOBuf(); + auto buf2 = page2.getIOBuf(); + checkIOBufsEqual(buf1, buf2); + } + + void checkSpillerConsistency(PageSpiller& spiller) { + ASSERT_GE(spiller.pageSizes_.size(), spiller.bufferedPages_.size()); + for (int32_t i = 0; i < spiller.bufferedPages_.size(); ++i) { + auto& pageSizeOpt = spiller.pageSizes_[i]; + if (!pageSizeOpt.has_value()) { + ASSERT_EQ(spiller.bufferedPages_[i], nullptr); + continue; + } + ASSERT_EQ(pageSizeOpt.value(), spiller.bufferedPages_[i]->size()); + } + ASSERT_LE(spiller.spillFilePaths_.size(), spiller.nextFileId_ + 1); + + uint64_t totalBytes{0}; + for (const auto& pageSizeOpt : spiller.pageSizes_) { + totalBytes += pageSizeOpt.has_value() ? pageSizeOpt.value() : 0; + } + ASSERT_EQ(spiller.totalBytes(), totalBytes); + } + + void assertNumBufferedPages(PageSpiller& spiller, uint64_t numPages) { + ASSERT_EQ(spiller.bufferedPages_.size(), numPages); + } + + folly::Random::DefaultGenerator rng_; +}; + +TEST_F(OutputBufferTest, pageSpillerBasic) { + auto pool = rootPool_->addLeafChild("destinationBufferSpiller"); + + struct TestValue { + uint32_t numPages; + int64_t maxPageSize; + bool hasVoidedNumRows; + int64_t maxNumRows; + uint64_t readBufferSize; + uint64_t writeBufferSize; + + std::string debugString() const { + return fmt::format( + "numPages {}, maxPageSize {}, hasVoidedNumRows {}, maxNumRows {}, " + "readBufferSize {}, writeBufferSize {}", + numPages, + maxPageSize, + hasVoidedNumRows, + maxNumRows, + readBufferSize, + writeBufferSize); + } + }; + + std::vector testValues{ + {10, 64, true, 20, 1024, 1024}, + {10, 64, false, 20, 1024, 0}, + {0, 64, true, 20, 1024, 256}, + {10, 64, true, 20, 1, 2048}, + {10, 0, true, 20, 128, 2048}}; + + for (const auto& testValue : testValues) { + SCOPED_TRACE(testValue.debugString()); + + auto tempFile = exec::test::TempFilePath::create(); + const auto& prefixPath = tempFile->getPath(); + auto fs = filesystems::getFileSystem(prefixPath, {}); + SCOPE_EXIT { + fs->remove(prefixPath); + }; + + auto pages = generateData( + testValue.numPages, + testValue.maxPageSize, + testValue.hasVoidedNumRows, + testValue.maxNumRows); + + folly::Synchronized spillStats; + PageSpiller spiller( + &pages, + prefixPath, + "", + testValue.readBufferSize, + testValue.writeBufferSize, + pool.get(), + &spillStats); + checkSpillerConsistency(spiller); + + ASSERT_TRUE(spiller.empty()); + ASSERT_EQ(spiller.size(), 0); + + VELOX_ASSERT_THROW(spiller.at(0), ""); + checkSpillerConsistency(spiller); + + VELOX_ASSERT_THROW(spiller.isNullAt(0), ""); + checkSpillerConsistency(spiller); + + VELOX_ASSERT_THROW(spiller.sizeAt(0), ""); + checkSpillerConsistency(spiller); + + VELOX_ASSERT_THROW(spiller.deleteFront(1), ""); + checkSpillerConsistency(spiller); + + ASSERT_NO_THROW(spiller.deleteAll()); + checkSpillerConsistency(spiller); + + spiller.spill(); + checkSpillerConsistency(spiller); + + if (pages.empty()) { + ASSERT_TRUE(spiller.empty()); + continue; + } + + ASSERT_FALSE(spiller.empty()); + uint32_t i = 0; + while (!spiller.empty()) { + ASSERT_EQ(spiller.isNullAt(0), pages[i] == nullptr); + if (spiller.isNullAt(0)) { + VELOX_ASSERT_THROW(spiller.sizeAt(0), ""); + } else { + ASSERT_EQ(spiller.sizeAt(0), pages[i]->size()); + } + checkSpillerConsistency(spiller); + auto unspilledPage = spiller.at(0); + checkSpillerConsistency(spiller); + ASSERT_LT(i, pages.size()); + ASSERT_EQ(unspilledPage->numRows(), pages[i]->numRows()); + ASSERT_EQ(unspilledPage->size(), pages[i]->size()); + auto originalIOBuf = pages[i]->getIOBuf(); + auto unspilledIOBuf = unspilledPage->getIOBuf(); + checkIOBufsEqual(originalIOBuf, unspilledIOBuf); + if (testValue.maxPageSize == 0) { + ASSERT_GE(pool->usedBytes(), 0); + } else { + ASSERT_GT(pool->usedBytes(), 0); + } + spiller.deleteFront(1); + ++i; + } + ASSERT_EQ(i, pages.size()); + ASSERT_TRUE(spiller.empty()); + ASSERT_EQ(spiller.size(), 0); + + VELOX_ASSERT_THROW(spiller.at(0), ""); + checkSpillerConsistency(spiller); + + VELOX_ASSERT_THROW(spiller.isNullAt(0), ""); + checkSpillerConsistency(spiller); + + VELOX_ASSERT_THROW(spiller.sizeAt(0), ""); + checkSpillerConsistency(spiller); + + VELOX_ASSERT_THROW(spiller.deleteFront(1), ""); + checkSpillerConsistency(spiller); + + ASSERT_NO_THROW(spiller.deleteAll()); + checkSpillerConsistency(spiller); + } + ASSERT_EQ(pool->usedBytes(), 0); +} + +TEST_F(OutputBufferTest, pageSpillerAccess) { + auto pool = rootPool_->addLeafChild("pageSpillerAccess"); + auto pages = generateData(20, 1LL << 20, true, 1000); + + struct TestValue { + std::string testName; + std::function>&, + PageSpiller&, + uint64_t)> + accessorVerifier; + std::string debugString() { + return testName; + } + }; + + std::vector testValues{ + {"PageSpiller::at", + [this](auto& originalPages, auto& spiller, auto index) { + // Accessor verifier for PageSpiller::at() + if (index >= originalPages.size()) { + VELOX_ASSERT_THROW(spiller.at(index), ""); + return; + } + auto originalPage = originalPages[index]; + auto unspilledPage = spiller.at(index); + if (originalPage == nullptr) { + ASSERT_EQ(unspilledPage, nullptr); + return; + } + ASSERT_EQ(originalPage->size(), unspilledPage->size()); + ASSERT_EQ(originalPage->numRows(), unspilledPage->numRows()); + auto originalIOBuf = originalPage->getIOBuf(); + auto unspilledIOBuf = unspilledPage->getIOBuf(); + checkIOBufsEqual(originalIOBuf, unspilledIOBuf); + }}, + {"PageSpiller::isNullAt", + [this](auto& originalPages, auto& spiller, auto index) { + // Accessor verifier for PageSpiller::isNullAt() + if (index >= originalPages.size()) { + VELOX_ASSERT_THROW(spiller.isNullAt(index), ""); + return; + } + ASSERT_EQ(originalPages[index] == nullptr, spiller.isNullAt(index)); + }}, + {"PageSpiller::sizeAt", + [this](auto& originalPages, auto& spiller, auto index) { + // Accessor verifier for PageSpiller::sizeAt() + if (index >= originalPages.size()) { + VELOX_ASSERT_THROW(spiller.sizeAt(index), ""); + return; + } + auto originalPage = originalPages[index]; + if (originalPage == nullptr) { + VELOX_ASSERT_THROW(spiller.sizeAt(index), ""); + return; + } + ASSERT_EQ(originalPage->size(), spiller.sizeAt(index)); + }}}; + + for (auto& testValue : testValues) { + SCOPED_TRACE(testValue.debugString()); + auto tempFile = exec::test::TempFilePath::create(); + const auto& prefixPath = tempFile->getPath(); + auto fs = filesystems::getFileSystem(prefixPath, {}); + SCOPE_EXIT { + fs->remove(prefixPath); + }; + + folly::Synchronized spillStats; + PageSpiller spiller( + &pages, prefixPath, "", 1024, 1024, pool.get(), &spillStats); + spiller.spill(); + + // Only PageSpiller::at loads the actual buffer. + const bool loadBuffer = testValue.testName == "PageSpiller::at"; + + testValue.accessorVerifier(pages, spiller, 1); + checkSpillerConsistency(spiller); + assertNumBufferedPages(spiller, loadBuffer ? 2 : 0); + + testValue.accessorVerifier(pages, spiller, 10); + checkSpillerConsistency(spiller); + assertNumBufferedPages(spiller, loadBuffer ? 11 : 0); + + testValue.accessorVerifier(pages, spiller, 25); + checkSpillerConsistency(spiller); + assertNumBufferedPages(spiller, loadBuffer ? 11 : 0); + + testValue.accessorVerifier(pages, spiller, 19); + checkSpillerConsistency(spiller); + assertNumBufferedPages(spiller, loadBuffer ? 20 : 0); + + testValue.accessorVerifier(pages, spiller, 5); + checkSpillerConsistency(spiller); + assertNumBufferedPages(spiller, loadBuffer ? 20 : 0); + } + ASSERT_EQ(pool->usedBytes(), 0); +} + +TEST_F(OutputBufferTest, pageSpillerDelete) { + auto pool = rootPool_->addLeafChild("pageSpillerAccess"); + const auto kNumPages = 20; + auto pages = generateData(kNumPages, 1LL << 20, true, 1000); + + struct TestValue { + uint32_t numBufferedPages; + uint32_t numDelete; + + std::string debugString() { + return fmt::format( + "numBufferedPages {}, numDelete {}", numBufferedPages, numDelete); + } + }; + + std::vector testValues{ + {0, 0}, + {0, 10}, + {0, 20}, + {0, 25}, + {10, 0}, + {10, 5}, + {10, 15}, + {10, 20}, + {10, 25}}; + for (auto& testValue : testValues) { + SCOPED_TRACE(testValue.debugString()); + // Test delete front. + auto tempFile = exec::test::TempFilePath::create(); + const auto& prefixPath = tempFile->getPath(); + auto fs = filesystems::getFileSystem(prefixPath, {}); + SCOPE_EXIT { + fs->remove(prefixPath); + }; + + folly::Synchronized spillStats; + PageSpiller spiller( + &pages, prefixPath, "", 1024, 1024, pool.get(), &spillStats); + spiller.spill(); + checkSpillerConsistency(spiller); + + // Unspill pages to buffer + if (testValue.numBufferedPages > 0) { + spiller.at(testValue.numBufferedPages - 1); + } + checkSpillerConsistency(spiller); + assertNumBufferedPages(spiller, testValue.numBufferedPages); + + if (testValue.numDelete > kNumPages) { + VELOX_ASSERT_THROW(spiller.deleteFront(testValue.numDelete), ""); + checkSpillerConsistency(spiller); + assertNumBufferedPages(spiller, testValue.numBufferedPages); + continue; + } else { + spiller.deleteFront(testValue.numDelete); + } + checkSpillerConsistency(spiller); + if (testValue.numDelete <= testValue.numBufferedPages) { + assertNumBufferedPages( + spiller, (testValue.numBufferedPages - testValue.numDelete)); + } else { + assertNumBufferedPages(spiller, 0); + } + } + + { + // Test delete all. + auto tempFile = exec::test::TempFilePath::create(); + const auto& prefixPath = tempFile->getPath(); + auto fs = filesystems::getFileSystem(prefixPath, {}); + SCOPE_EXIT { + fs->remove(prefixPath); + }; + + folly::Synchronized spillStats; + PageSpiller spiller( + &pages, prefixPath, "", 1024, 1024, pool.get(), &spillStats); + spiller.spill(); + checkSpillerConsistency(spiller); + spiller.at(10); + + auto removedPages = spiller.deleteAll(); + checkSpillerConsistency(spiller); + assertNumBufferedPages(spiller, 0); + + ASSERT_EQ(removedPages.size(), pages.size()); + for (uint32_t i = 0; i < pages.size(); ++i) { + ASSERT_EQ(removedPages[i] == nullptr, pages[i] == nullptr); + if (removedPages[i] == nullptr) { + continue; + } + checkSerializedPageEqual(*removedPages[i], *pages[i]); + } + } +} + +TEST_F(OutputBufferTest, bufferedPagesBasic) { + const uint64_t kNumPages = 50; + auto verifyFn = [this]( + BufferedPages& bufferedPages, + std::vector>& pages) { + ASSERT_EQ(bufferedPages.size(), pages.size()); + const auto numPages = bufferedPages.size(); + for (auto i = 0; i < numPages; ++i) { + if (pages[i] == nullptr) { + ASSERT_TRUE(bufferedPages.isNullAt(i)); + VELOX_ASSERT_THROW(bufferedPages.sizeAt(i), ""); + ASSERT_EQ(bufferedPages.at(i), nullptr); + continue; + } + ASSERT_FALSE(bufferedPages.isNullAt(i)); + ASSERT_EQ(bufferedPages.sizeAt(i), pages[i]->size()); + auto originalPage = pages[i]; + auto bufferedPage = bufferedPages.at(i); + checkSerializedPageEqual(*originalPage, *bufferedPage); + } + }; + + BufferedPages bufferedPages; + auto pages = generateData(kNumPages, 1LL << 20, true, 100); + + ASSERT_TRUE(bufferedPages.empty()); + for (auto& page : pages) { + bufferedPages.append(page); + } + ASSERT_EQ(bufferedPages.size(), pages.size()); + ASSERT_FALSE(bufferedPages.empty()); + verifyFn(bufferedPages, pages); + + for (auto i = 0; i < kNumPages / 2; ++i) { + bufferedPages.deleteFront(0); + bufferedPages.deleteFront(2); + pages.erase(pages.begin(), pages.begin() + 2); + verifyFn(bufferedPages, pages); + } + VELOX_ASSERT_THROW(bufferedPages.deleteFront(kNumPages * 2), ""); + bufferedPages.deleteAll(); + ASSERT_EQ(bufferedPages.size(), 0); + ASSERT_TRUE(bufferedPages.empty()); +} + +TEST_F(OutputBufferTest, bufferedPagesWithSpill) { + struct TestValue { + uint32_t numInMemoryPages; + uint32_t numSpills; + uint32_t numPagesPerSpill; + uint64_t maxPageSize; + uint64_t advanceSize; + uint64_t deleteSize; + + std::string debugString() { + return fmt::format( + "numInMemoryPages {}, numSpills {}, numPagesPerSpill {}, " + "maxPagesSize {}, advanceSize {}, deleteSize {}", + numInMemoryPages, + numSpills, + numPagesPerSpill, + maxPageSize, + advanceSize, + deleteSize); + } + }; + auto pool = rootPool_->addLeafChild("bufferedPagesWithSpill"); + + std::vector testValues{ + {30, 10, 20, 1L << 20, 3, 5}, + {50, 10, 10, 1L << 20, 15, 5}, + {50, 10, 10, 1L << 20, 5, 15}, + {0, 10, 20, 1L << 20, 15, 5}}; + + for (auto& testValue : testValues) { + SCOPED_TRACE(testValue.debugString()); + + std::vector>> pageBatches; + pageBatches.reserve(testValue.numSpills + 1); + for (uint32_t i = 0; i < testValue.numSpills; ++i) { + pageBatches.push_back(generateData( + testValue.numPagesPerSpill, testValue.maxPageSize, true, 100)); + } + pageBatches.push_back(generateData( + testValue.numInMemoryPages, testValue.maxPageSize, true, 100)); + + auto tempDir = exec::test::TempDirectoryPath::create(); + const auto dirPath = tempDir->getPath(); + auto fs = filesystems::getFileSystem(dirPath, {}); + SCOPE_EXIT { + fs->rmdir(dirPath); + }; + common::SpillConfig spillConfig; + spillConfig.readBufferSize = 1024; + spillConfig.writeBufferSize = 1024; + spillConfig.fileNamePrefix = "bufferedPagesWithSpill"; + spillConfig.getSpillDirPathCb = [&]() { return std::string_view(dirPath); }; + folly::Synchronized spillStats; + + BufferedPages bufferedPages; + bufferedPages.setupSpiller(pool.get(), &spillConfig, 0, &spillStats); + + for (uint32_t i = 0; i < testValue.numSpills; ++i) { + const auto& curPageBatch = pageBatches[i]; + for (const auto& page : curPageBatch) { + bufferedPages.append(page); + } + bufferedPages.spill(); + } + const auto& inMemoryPages = pageBatches.back(); + for (auto& page : inMemoryPages) { + bufferedPages.append(page); + } + + const auto totalPages = testValue.numSpills * testValue.numPagesPerSpill + + testValue.numInMemoryPages; + ASSERT_EQ(bufferedPages.size(), totalPages); + + uint32_t numDeleted{0}; + while (numDeleted < totalPages) { + // Read 'advanceSize' pages. + for (uint32_t i = 0; + i < testValue.advanceSize && numDeleted + i < totalPages; + ++i) { + // Find corresponding indexes for the original page + uint32_t totalSize{0}; + uint32_t batchIndex{0}; + uint32_t pageIndex{0}; + for (; batchIndex < pageBatches.size(); ++batchIndex) { + totalSize += pageBatches[batchIndex].size(); + if (numDeleted + i < totalSize) { + pageIndex = + numDeleted + i - (totalSize - pageBatches[batchIndex].size()); + break; + } + } + + auto page = bufferedPages.at(i); + auto originalPage = pageBatches[batchIndex][pageIndex]; + if (originalPage == nullptr) { + ASSERT_TRUE(page == nullptr); + ASSERT_TRUE(bufferedPages.isNullAt(i)); + VELOX_ASSERT_THROW(bufferedPages.sizeAt(i), ""); + } else { + checkSerializedPageEqual(*page, *originalPage); + ASSERT_FALSE(bufferedPages.isNullAt(i)); + ASSERT_EQ(bufferedPages.sizeAt(i), originalPage->size()); + } + } + + // Delete 'deleteSize' pages. + uint32_t deleteSize = testValue.deleteSize; + if (numDeleted + testValue.deleteSize > totalPages) { + deleteSize = totalPages - numDeleted; + } + numDeleted += deleteSize; + bufferedPages.deleteFront(deleteSize); + ASSERT_EQ(bufferedPages.size(), totalPages - numDeleted); + } + ASSERT_TRUE(bufferedPages.empty()); + ASSERT_EQ(bufferedPages.size(), 0); + } +} diff --git a/velox/exec/tests/PartitionedOutputTest.cpp b/velox/exec/tests/PartitionedOutputTest.cpp index 4ce97e38d02e..6a4deced5982 100644 --- a/velox/exec/tests/PartitionedOutputTest.cpp +++ b/velox/exec/tests/PartitionedOutputTest.cpp @@ -20,6 +20,7 @@ #include "velox/exec/Task.h" #include "velox/exec/tests/utils/OperatorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" namespace facebook::velox::exec::test { @@ -27,6 +28,8 @@ class PartitionedOutputTest : public OperatorTestBase, public testing::WithParamInterface { public: + static constexpr int64_t kTimeoutSecond = 10; + static std::vector getTestParams() { const std::vector kinds( {VectorSerde::Kind::kPresto, @@ -46,7 +49,7 @@ class PartitionedOutputTest getData(const std::string& taskId, int destination, int64_t sequence) { auto [promise, semiFuture] = folly::makePromiseContract< std::vector>>(); - VELOX_CHECK(bufferManager_->getData( + EXPECT_TRUE(bufferManager_->getData( taskId, destination, PartitionedOutput::kMinDestinationSize, @@ -60,8 +63,8 @@ class PartitionedOutputTest result->setValue(std::move(pages)); })); auto future = std::move(semiFuture).via(executor_.get()); - future.wait(std::chrono::seconds{10}); - VELOX_CHECK(future.isReady()); + future.wait(std::chrono::seconds{kTimeoutSecond}); + EXPECT_TRUE(future.isReady()); return std::move(future).value(); } @@ -73,7 +76,7 @@ class PartitionedOutputTest bool done = false; while (!done) { attempts++; - VELOX_CHECK_LT(attempts, 100); + EXPECT_LT(attempts, 100); std::vector> pages = getData(taskId, destination, result.size()); for (auto& page : pages) { @@ -89,6 +92,22 @@ class PartitionedOutputTest return result; } + void consumeAllData(const std::string& taskId, int destination) { + int sequence = 0; + bool done = false; + while (!done) { + std::vector> pages = + getData(taskId, destination, sequence++); + for (auto& page : pages) { + if (page == nullptr) { + bufferManager_->deleteResults(taskId, destination); + done = true; + break; + } + } + } + } + private: const std::shared_ptr bufferManager_{ OutputBufferManager::getInstance().lock()}; @@ -206,6 +225,90 @@ TEST_P(PartitionedOutputTest, keyChannelNotAtBeginningWithNulls) { .count())); } +TEST_P(PartitionedOutputTest, spill) { + filesystems::registerLocalFileSystem(); + const auto rowType = ROW({"c0", "c1"}, {BIGINT(), VARCHAR()}); + VectorFuzzer::Options fuzzerOpt; + fuzzerOpt.stringLength = 200; + fuzzerOpt.vectorSize = 100; + auto input = createVectors(rowType, 5UL << 20, fuzzerOpt); + + struct TestValue { + int32_t numDrivers; + int32_t numPartitions; + bool parallelConsumption; + + std::string debugString() const { + return fmt::format( + "numDrivers {}, numPartitions {}, parallelConsumption {}", + numDrivers, + numPartitions, + parallelConsumption); + } + }; + + std::vector testValues{{1, 2, false}, {2, 2, false}, {2, 4, true}}; + for (const auto& testValue : testValues) { + SCOPED_TRACE(testValue.debugString()); + core::PlanNodeId planNodeId; + auto plan = PlanBuilder() + .values({input}, true, 6) + .partitionedOutput( + {"c0"}, + testValue.numPartitions, + true, + std::vector{"c1"}, + GetParam()) + .capturePlanNodeId(planNodeId) + .planNode(); + + auto taskId = "local://test-partitioned-output-0"; + auto task = Task::create( + taskId, + core::PlanFragment{plan}, + 0, + createQueryContext( + {{core::QueryConfig::kPartitionedOutputSpillEnabled, "true"}, + {core::QueryConfig::kSpillEnabled, "true"}}), + Task::ExecutionMode::kParallel); + + std::shared_ptr spillDirectory; + spillDirectory = exec::test::TempDirectoryPath::create(); + task->setSpillDirectory(spillDirectory->getPath()); + + TestScopedSpillInjection scopedSpillInjection(100); + task->start(testValue.numDrivers); + + if (testValue.parallelConsumption) { + std::vector threads; + for (auto i = 0; i < testValue.numPartitions; ++i) { + threads.push_back( + std::thread([this, taskId, i]() { consumeAllData(taskId, i); })); + } + for (auto& t : threads) { + t.join(); + } + } else { + for (auto i = 0; i < testValue.numPartitions; ++i) { + consumeAllData(taskId, i); + } + } + + ASSERT_TRUE(waitForTaskCompletion( + task.get(), + std::chrono::duration_cast( + std::chrono::seconds(kTimeoutSecond)) + .count())); + + auto taskStats = task->taskStats(); + auto planStatsMap = exec::toPlanStats(taskStats); + auto& planStats = planStatsMap.at(planNodeId); + ASSERT_GT(taskStats.memoryReclaimCount, 0); + ASSERT_GT(taskStats.memoryReclaimMs, 0); + ASSERT_GT(planStats.spilledBytes, 0); + } +} + VELOX_INSTANTIATE_TEST_SUITE_P( PartitionedOutputTest, PartitionedOutputTest,