Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make PartitionedOutput reclaimable #11856

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 48 additions & 48 deletions velox/common/file/File.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,43 +42,44 @@

namespace facebook::velox {

// A read-only file. All methods in this object should be thread safe.
/// A read-only file. All methods in this object should be thread safe.
class ReadFile {
public:
virtual ~ReadFile() = default;

// Reads the data at [offset, offset + length) into the provided pre-allocated
// buffer 'buf'. The bytes are returned as a string_view pointing to 'buf'.
//
// This method should be thread safe.
/// Reads the data at [offset, offset + length) into the provided
/// pre-allocated buffer 'buf'. The bytes are returned as a string_view
/// pointing to 'buf'.
///
/// This method should be thread safe.
virtual std::string_view pread(uint64_t offset, uint64_t length, void* buf)
const = 0;

// Same as above, but returns owned data directly.
//
// This method should be thread safe.
/// Same as above, but returns owned data directly.
///
/// This method should be thread safe.
virtual std::string pread(uint64_t offset, uint64_t length) const;

// Reads starting at 'offset' into the memory referenced by the
// Ranges in 'buffers'. The buffers are filled left to right. A
// buffer with nullptr data will cause its size worth of bytes to be skipped.
//
// This method should be thread safe.
/// Reads starting at 'offset' into the memory referenced by the
/// Ranges in 'buffers'. The buffers are filled left to right. A
/// buffer with nullptr data will cause its size worth of bytes to be skipped.
///
/// This method should be thread safe.
virtual uint64_t preadv(
uint64_t /*offset*/,
const std::vector<folly::Range<char*>>& /*buffers*/) const;

// Vectorized read API. Implementations can coalesce and parallelize.
// The offsets don't need to be sorted.
// `iobufs` is a range of IOBufs to store the read data. They
// will be stored in the same order as the input `regions` vector. So the
// array must be pre-allocated by the caller, with the same size as `regions`,
// but don't need to be initialized, since each iobuf will be copy-constructed
// by the preadv.
// Returns the total number of bytes read, which might be different than the
// sum of all buffer sizes (for example, if coalescing was used).
//
// This method should be thread safe.
/// Vectorized read API. Implementations can coalesce and parallelize.
/// The offsets don't need to be sorted.
/// `iobufs` is a range of IOBufs to store the read data. They
/// will be stored in the same order as the input `regions` vector. So the
/// array must be pre-allocated by the caller, with the same size as
/// `regions`, but don't need to be initialized, since each iobuf will be
/// copy-constructed by the preadv. Returns the total number of bytes read,
/// which might be different than the sum of all buffer sizes (for example, if
/// coalescing was used).
///
/// This method should be thread safe.
virtual uint64_t preadv(
folly::Range<const common::Region*> regions,
folly::Range<folly::IOBuf*> iobufs) const;
Expand All @@ -98,25 +99,25 @@ class ReadFile {
}
}

// Returns true if preadvAsync has a native implementation that is
// asynchronous. The default implementation is synchronous.
/// Returns true if preadvAsync has a native implementation that is
/// asynchronous. The default implementation is synchronous.
virtual bool hasPreadvAsync() const {
return false;
}

// Whether preads should be coalesced where possible. E.g. remote disk would
// set to true, in-memory to false.
/// Whether preads should be coalesced where possible. E.g. remote disk would
/// set to true, in-memory to false.
virtual bool shouldCoalesce() const = 0;

// Number of bytes in the file.
/// Number of bytes in the file.
virtual uint64_t size() const = 0;

// An estimate for the total amount of memory *this uses.
/// An estimate for the total amount of memory *this uses.
virtual uint64_t memoryUsage() const = 0;

// The total number of bytes *this had been used to read since creation or
// the last resetBytesRead. We sum all the |length| variables passed to
// preads, not the actual amount of bytes read (which might be less).
/// The total number of bytes *this had been used to read since creation or
/// the last resetBytesRead. We sum all the |length| variables passed to
/// preads, not the actual amount of bytes read (which might be less).
virtual uint64_t bytesRead() const {
return bytesRead_;
}
Expand All @@ -135,8 +136,8 @@ class ReadFile {
mutable std::atomic<uint64_t> bytesRead_ = 0;
};

// A write-only file. Nothing written to the file should be read back until it
// is closed.
/// A write-only file. Nothing written to the file should be read back until it
/// is closed.
class WriteFile {
public:
virtual ~WriteFile() = default;
Expand Down Expand Up @@ -193,14 +194,13 @@ class WriteFile {
virtual uint64_t size() const = 0;
};

// We currently do a simple implementation for the in-memory files
// that simply resizes a string as needed. If there ever gets used in
// a performance sensitive path we'd probably want to move to a Cord-like
// implementation for underlying storage.

// We don't provide registration functions for the in-memory files, as they
// aren't intended for any robust use needing a filesystem.

/// We currently do a simple implementation for the in-memory files
/// that simply resizes a string as needed. If there ever gets used in
/// a performance sensitive path we'd probably want to move to a Cord-like
/// implementation for underlying storage.
///
/// We don't provide registration functions for the in-memory files, as they
/// aren't intended for any robust use needing a filesystem.
class InMemoryReadFile : public ReadFile {
public:
explicit InMemoryReadFile(std::string_view file) : file_(file) {}
Expand Down Expand Up @@ -307,18 +307,18 @@ class LocalReadFile final : public ReadFile {
class LocalWriteFile final : public WriteFile {
public:
struct Attributes {
// If set to true, the file will not be subject to copy-on-write updates.
// This flag has an effect only on filesystems that support copy-on-write
// semantics, such as Btrfs.
/// If set to true, the file will not be subject to copy-on-write updates.
/// This flag has an effect only on filesystems that support copy-on-write
/// semantics, such as Btrfs.
static constexpr std::string_view kNoCow{"write-on-copy-disabled"};
static constexpr bool kDefaultNoCow{false};

static bool cowDisabled(
const std::unordered_map<std::string, std::string>& attrs);
};

// An error is thrown is a file already exists at |path|,
// unless flag shouldThrowOnFileAlreadyExists is false
/// An error is thrown is a file already exists at |path|,
/// unless flag shouldThrowOnFileAlreadyExists is false
explicit LocalWriteFile(
std::string_view path,
bool shouldCreateParentDirectories = false,
Expand Down
3 changes: 3 additions & 0 deletions velox/common/memory/ByteStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,9 @@ void ByteOutputStream::extend(int32_t bytes) {
ranges_.size() == 1 ? nullptr : &ranges_[ranges_.size() - 2],
current_);
allocatedBytes_ += current_->size;
if (allocatedBytes_ <= 0) {
VELOX_CHECK_GT(allocatedBytes_, 0);
}
VELOX_CHECK_GT(allocatedBytes_, 0);
if (isBits_) {
// size and position are in units of bits for a bits stream.
Expand Down
1 change: 1 addition & 0 deletions velox/common/memory/StreamArena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ void StreamArena::newTinyRange(
range->buffer = reinterpret_cast<uint8_t*>(tinyRanges_.back().data());
range->size = bytes;
}

void StreamArena::clear() {
allocations_.clear();
pool_->freeNonContiguous(allocation_);
Expand Down
4 changes: 4 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,10 @@ class PartitionedOutputNode : public PlanNode {
return sources_;
}

bool canSpill(const QueryConfig& queryConfig) const override {
return isPartitioned() && queryConfig.partitionedOutputSpillEnabled();
}

const RowTypePtr& inputType() const {
return sources_[0]->outputType();
}
Expand Down
9 changes: 9 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ class QueryConfig {
static constexpr const char* kTopNRowNumberSpillEnabled =
"topn_row_number_spill_enabled";

/// PartitionedOutput spilling flag, only applies if "spill_enabled" flag is
/// set.
static constexpr const char* kPartitionedOutputSpillEnabled =
"partitioned_output_spill_enabled";

/// The max row numbers to fill and spill for each spill run. This is used to
/// cap the memory used for spilling. If it is zero, then there is no limit
/// and spilling might run out of memory.
Expand Down Expand Up @@ -694,6 +699,10 @@ class QueryConfig {
return get<bool>(kTopNRowNumberSpillEnabled, true);
}

bool partitionedOutputSpillEnabled() const {
return get<bool>(kPartitionedOutputSpillEnabled, true);
}

int32_t maxSpillLevel() const {
return get<int32_t>(kMaxSpillLevel, 1);
}
Expand Down
4 changes: 2 additions & 2 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
this->queryConfig_.testingOverrideConfigUnsafe(std::move(values));
}

// Overrides the previous connector-specific configuration. Note that this
// function is NOT thread-safe and should probably only be used in tests.
/// Overrides the previous connector-specific configuration. Note that this
/// function is NOT thread-safe and should probably only be used in tests.
void setConnectorSessionOverridesUnsafe(
const std::string& connectorId,
std::unordered_map<std::string, std::string>&& configOverrides) {
Expand Down
18 changes: 10 additions & 8 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ velox::memory::MemoryPool* DriverCtx::addOperatorPool(

std::optional<common::SpillConfig> DriverCtx::makeSpillConfig(
int32_t operatorId) const {
const auto& queryConfig = task->queryCtx()->queryConfig();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option would be to keep it here and lazy initialize the spill component of outputbuffer in partitioned output operator.

const auto& queryCtx = task->queryCtx();
const auto& queryConfig = queryCtx->queryConfig();
if (!queryConfig.spillEnabled()) {
return std::nullopt;
}
Expand All @@ -119,20 +120,18 @@ std::optional<common::SpillConfig> DriverCtx::makeSpillConfig(
[this]() -> std::string_view {
return task->getOrCreateSpillDirectory();
};
const auto& spillFilePrefix =
fmt::format("{}_{}_{}", pipelineId, driverId, operatorId);
common::UpdateAndCheckSpillLimitCB updateAndCheckSpillLimitCb =
[this](uint64_t bytes) {
task->queryCtx()->updateSpilledBytesAndCheckLimit(bytes);
[queryCtx](uint64_t bytes) {
queryCtx->updateSpilledBytesAndCheckLimit(bytes);
};
return common::SpillConfig(
std::move(getSpillDirPathCb),
std::move(updateAndCheckSpillLimitCb),
spillFilePrefix,
fmt::format("{}_{}_{}", pipelineId, driverId, operatorId),
queryConfig.maxSpillFileSize(),
queryConfig.spillWriteBufferSize(),
queryConfig.spillReadBufferSize(),
task->queryCtx()->spillExecutor(),
queryCtx->spillExecutor(),
queryConfig.minSpillableReservationPct(),
queryConfig.spillableReservationGrowthPct(),
queryConfig.spillStartPartitionBit(),
Expand All @@ -142,7 +141,10 @@ std::optional<common::SpillConfig> DriverCtx::makeSpillConfig(
queryConfig.writerFlushThresholdBytes(),
queryConfig.spillCompressionKind(),
queryConfig.spillPrefixSortEnabled()
? std::optional<common::PrefixSortConfig>(prefixSortConfig())
? std::optional(common::PrefixSortConfig{
queryConfig.prefixSortNormalizedKeyMaxBytes(),
queryConfig.prefixSortMinRows(),
queryConfig.prefixSortMaxStringPrefixLength()})
: std::nullopt,
queryConfig.spillFileCreateConfig());
}
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/ExchangeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class SerializedPage {
// Buffers containing the serialized data. The memory is owned by 'iobuf_'.
std::vector<ByteRange> ranges_;

// IOBuf holding the data in 'ranges_.
// IOBuf holding the data in 'ranges_'.
std::unique_ptr<folly::IOBuf> iobuf_;

// Number of payload bytes in 'iobuf_'.
Expand Down
14 changes: 9 additions & 5 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,14 @@ OperatorStats Operator::stats(bool clear) {
return stats;
}

void Operator::postReclaimCheck(int64_t reclaimedBytes) const {
VELOX_CHECK_GE(
reclaimedBytes,
0,
"Unexpected memory growth after reclaim from operator memory pool {}",
pool()->name());
}

void Operator::close() {
input_ = nullptr;
results_.clear();
Expand Down Expand Up @@ -750,11 +758,7 @@ uint64_t Operator::MemoryReclaimer::reclaim(
memory::ScopedReclaimedBytesRecorder recoder(pool, &reclaimedBytes);
op_->reclaim(targetBytes, stats);
}
VELOX_CHECK_GE(
reclaimedBytes,
0,
"Unexpected memory growth after reclaim from operator memory pool {}",
pool->name());
op_->postReclaimCheck(reclaimedBytes);
return reclaimedBytes;
},
stats);
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ class Operator : public BaseRuntimeStatWriter {
uint64_t targetBytes,
memory::MemoryReclaimer::Stats& stats) {}

virtual void postReclaimCheck(int64_t reclaimedBytes) const;

const core::PlanNodeId& planNodeId() const {
return operatorCtx_->planNodeId();
}
Expand Down
Loading
Loading