Skip to content

Commit

Permalink
Reclaim unused reserved memory in dwrf writer (facebookincubator#10120)
Browse files Browse the repository at this point in the history
Summary:
During memory reclaim, we can release the previously reserved memory from writer to spare more memory for query to run. This PR also fixed the flush overhead calculation by adding dictionary memory usage during overhead estimation to align with the recording.

Pull Request resolved: facebookincubator#10120

Reviewed By: xiaoxmeng

Differential Revision: D58373600

Pulled By: tanjialiang

fbshipit-source-id: 5e1b74a2ac292b27cf479ddc55106d1a06a62aaf
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Jun 17, 2024
1 parent fe20e49 commit 5768570
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 61 deletions.
4 changes: 4 additions & 0 deletions velox/common/base/tests/StatsReporterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ class TestMemoryPool : public memory::MemoryPool {
return 0;
}

int64_t releasableReservation() const override {
return 0;
}

int64_t reservedBytes() const override {
return 0;
}
Expand Down
17 changes: 17 additions & 0 deletions velox/common/memory/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,23 @@ int64_t MemoryPoolImpl::usedBytes() const {
return usedBytes;
}

int64_t MemoryPoolImpl::releasableReservation() const {
if (isLeaf()) {
std::lock_guard<std::mutex> l(mutex_);
return std::max<int64_t>(
0, reservationBytes_ - quantizedSize(usedReservationBytes_));
}
if (reservedBytes() == 0) {
return 0;
}
int64_t releasableBytes{0};
visitChildren([&](MemoryPool* pool) {
releasableBytes += pool->releasableReservation();
return true;
});
return releasableBytes;
}

std::shared_ptr<MemoryPool> MemoryPoolImpl::genChild(
std::shared_ptr<MemoryPool> parent,
const std::string& name,
Expand Down
27 changes: 18 additions & 9 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,11 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
return threadSafe_;
}

/// Invoked to traverse the memory pool subtree rooted at this, and calls
/// 'visitor' on each visited child memory pool with the parent pool's
/// 'poolMutex_' reader lock held. The 'visitor' must not access the
/// parent memory pool to avoid the potential recursive locking issues. Note
/// that the traversal stops if 'visitor' returns false.
/// Invoked to visit the memory pool's direct children, and calls 'visitor' on
/// each visited child memory pool with the parent pool's 'poolMutex_' reader
/// lock held. The 'visitor' must not access the parent memory pool to avoid
/// the potential recursive locking issues. Note that the traversal stops if
/// 'visitor' returns false.
virtual void visitChildren(
const std::function<bool(MemoryPool*)>& visitor) const;

Expand Down Expand Up @@ -328,15 +328,22 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
/// Returns the peak memory usage in bytes of this memory pool.
virtual int64_t peakBytes() const = 0;

/// Returns the reserved but not used memory reservation in bytes of this
/// memory pool.
/// Returns the reserved but not used memory in bytes of this memory pool.
///
/// NOTE: this is always zero for non-leaf memory pool as it only aggregate
/// NOTE: this is always zero for non-leaf memory pool as it only aggregates
/// the memory reservations from its child memory pools but not
/// differentiating whether the aggregated reservations have been actually
/// used in child pools or not.
virtual int64_t availableReservation() const = 0;

/// Returns the reserved but not used memory in bytes that can be released by
/// calling 'release()'. This might be different from 'availableReservation()'
/// because leaf memory pool makes quantized memory reservation.
///
/// NOTE: For non-leaf memory pool, it returns the aggregated releasable
/// memory reservations from all its leaf memory pool.
virtual int64_t releasableReservation() const = 0;

/// Returns the reserved memory reservation in bytes including both used and
/// unused reservations.
virtual int64_t reservedBytes() const = 0;
Expand Down Expand Up @@ -632,6 +639,8 @@ class MemoryPoolImpl : public MemoryPool {
return availableReservationLocked();
}

int64_t releasableReservation() const override;

int64_t reservedBytes() const override {
return reservationBytes_;
}
Expand Down Expand Up @@ -1002,7 +1011,7 @@ class MemoryPoolImpl : public MemoryPool {
// name matches the specified regular expression 'debugPoolNameRegex_'.
const std::string debugPoolNameRegex_;

// Serializes updates on 'grantedReservationBytes_', 'usedReservationBytes_'
// Serializes updates on 'reservationBytes_', 'usedReservationBytes_'
// and 'minReservationBytes_' to make reservation decision on a consistent
// read/write of those counters. incrementReservation()/decrementReservation()
// work based on atomic 'reservationBytes_' without mutex as children updating
Expand Down
43 changes: 43 additions & 0 deletions velox/common/memory/tests/MemoryPoolTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,49 @@ TEST_P(MemoryPoolTest, grow) {
leaf->free(buf, 1 * MB);
}

TEST_P(MemoryPoolTest, releasableMemory) {
struct TestParam {
int64_t usedBytes;
int64_t reservedBytes;
};
std::vector<TestParam> testParams{
{2345, 98760},
{1, 1024},
{4096, 4096},
{1 * MB, 16 * MB},
{6 * MB, 7 * MB},
{123 * MB, 200 * MB},
{100 * MB, 50 * MB}};
auto root = getMemoryManager()->addRootPool("releasableMemory", 4 * GB);
for (auto i = 0; i < testParams.size() - 1; i++) {
auto leaf0 = root->addLeafChild("leafPool-0");
leaf0->maybeReserve(testParams[i].reservedBytes);
void* buffer0 = leaf0->allocate(testParams[i].usedBytes);
const auto reservedBytes0 = leaf0->reservedBytes();
const auto releasableBytes0 = leaf0->releasableReservation();

auto leaf1 = root->addLeafChild("leafPool-1");
leaf1->maybeReserve(testParams[i + 1].reservedBytes);
void* buffer1 = leaf1->allocate(testParams[i + 1].usedBytes);
const auto reservedBytes1 = leaf1->reservedBytes();
const auto releasableBytes1 = leaf1->releasableReservation();

const auto releasableBytesRoot = root->releasableReservation();
const auto reservedBytesRoot = root->reservedBytes();
ASSERT_EQ(releasableBytesRoot, releasableBytes0 + releasableBytes1);

leaf0->release();
ASSERT_EQ(reservedBytes0 - leaf0->reservedBytes(), releasableBytes0);
ASSERT_EQ(reservedBytesRoot - root->reservedBytes(), releasableBytes0);
leaf1->release();
ASSERT_EQ(reservedBytes1 - leaf1->reservedBytes(), releasableBytes1);
ASSERT_EQ(reservedBytesRoot - root->reservedBytes(), releasableBytesRoot);

leaf0->free(buffer0, testParams[i].usedBytes);
leaf1->free(buffer1, testParams[i + 1].usedBytes);
}
}

TEST_P(MemoryPoolTest, ReallocTestSameSize) {
auto manager = getMemoryManager();
auto root = manager->addRootPool();
Expand Down
12 changes: 10 additions & 2 deletions velox/connectors/hive/tests/HiveDataSinkTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "velox/common/testutil/TestValue.h"
#include "velox/core/Config.h"
#include "velox/dwio/common/Options.h"
#include "velox/dwio/dwrf/writer/Writer.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"
Expand Down Expand Up @@ -621,7 +622,7 @@ TEST_F(HiveDataSinkTest, abort) {
}
}

TEST_F(HiveDataSinkTest, memoryReclaim) {
DEBUG_ONLY_TEST_F(HiveDataSinkTest, memoryReclaim) {
const int numBatches = 200;
auto vectors = createVectors(500, 200);

Expand All @@ -644,7 +645,7 @@ TEST_F(HiveDataSinkTest, memoryReclaim) {
expectedWriterReclaimed);
}
} testSettings[] = {
// {dwio::common::FileFormat::DWRF, true, true, 1 << 30, true, true},
{dwio::common::FileFormat::DWRF, true, true, 1 << 30, true, true},
{dwio::common::FileFormat::DWRF, true, true, 1, true, true},
{dwio::common::FileFormat::DWRF, true, false, 1 << 30, false, false},
{dwio::common::FileFormat::DWRF, true, false, 1, false, false},
Expand All @@ -664,6 +665,13 @@ TEST_F(HiveDataSinkTest, memoryReclaim) {
{dwio::common::FileFormat::PARQUET, false, false, 1, false, false}
#endif
};
SCOPED_TESTVALUE_SET(
"facebook::velox::dwrf::Writer::MemoryReclaimer::reclaimableBytes",
std::function<void(dwrf::Writer*)>([&](dwrf::Writer* writer) {
// Release before reclaim to make it not able to reclaim from reserved
// memory.
writer->getContext().releaseMemoryReservation();
}));
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
setupMemoryPools();
Expand Down
9 changes: 8 additions & 1 deletion velox/dwio/dwrf/test/E2EWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2040,7 +2040,14 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimDuringInit) {
}
}

TEST_F(E2EWriterTest, memoryReclaimThreshold) {
DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimThreshold) {
SCOPED_TESTVALUE_SET(
"facebook::velox::dwrf::Writer::MemoryReclaimer::reclaimableBytes",
std::function<void(dwrf::Writer*)>([&](dwrf::Writer* writer) {
// Release before reclaim to make it not able to reclaim from reserved
// memory.
writer->getContext().releaseMemoryReservation();
}));
const auto type = ROW(
{{"int_val", INTEGER()},
{"string_val", VARCHAR()},
Expand Down
6 changes: 5 additions & 1 deletion velox/dwio/dwrf/test/WriterFlushTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,14 @@ class MockMemoryPool : public velox::memory::MemoryPool {
VELOX_NYI("{} unsupported", __FUNCTION__);
}

int64_t reservedBytes() const override {
int64_t releasableReservation() const override {
VELOX_NYI("{} unsupported", __FUNCTION__);
}

int64_t reservedBytes() const override {
return localMemoryUsage_;
}

bool maybeReserve(uint64_t size) override {
VELOX_NYI("{} unsupported", __FUNCTION__);
}
Expand Down
72 changes: 45 additions & 27 deletions velox/dwio/dwrf/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,27 +289,27 @@ bool Writer::maybeReserveMemory(
auto& context = getContext();
auto& pool = context.getMemoryPool(memoryUsageCategory);
const uint64_t availableReservation = pool.availableReservation();
const uint64_t usedReservationBytes = pool.usedBytes();
const uint64_t usedBytes = pool.usedBytes();
const uint64_t minReservationBytes =
usedReservationBytes * spillConfig_->minSpillableReservationPct / 100;
usedBytes * spillConfig_->minSpillableReservationPct / 100;
const uint64_t estimatedIncrementBytes =
usedReservationBytes * estimatedMemoryGrowthRatio;
usedBytes * estimatedMemoryGrowthRatio;
if ((availableReservation > minReservationBytes) &&
(availableReservation > 2 * estimatedIncrementBytes)) {
return true;
}

const uint64_t bytesToReserve = std::max(
estimatedIncrementBytes * 2,
usedReservationBytes * spillConfig_->spillableReservationGrowthPct / 100);
usedBytes * spillConfig_->spillableReservationGrowthPct / 100);
return pool.maybeReserve(bytesToReserve);
}

void Writer::releaseMemory() {
int64_t Writer::releaseMemory() {
if (!canReclaim()) {
return;
return 0;
}
getContext().releaseMemoryReservation();
return getContext().releaseMemoryReservation();
}

uint64_t Writer::flushTimeMemoryUsageEstimate(
Expand Down Expand Up @@ -406,7 +406,7 @@ void Writer::flushStripe(bool close) {
createRowIndexEntry();
}

const auto preFlushMem = context.getTotalMemoryUsage();
const auto preFlushTotalMemBytes = context.getTotalMemoryUsage();
ensureStripeFlushFits();
// NOTE: ensureStripeFlushFits() might trigger memory arbitration that have
// flushed the current stripe.
Expand Down Expand Up @@ -434,7 +434,7 @@ void Writer::flushStripe(bool close) {
metrics.flushOverhead = static_cast<uint64_t>(flushOverhead);
context.recordFlushOverhead(metrics.flushOverhead);

const auto postFlushMem = context.getTotalMemoryUsage();
const auto postFlushTotalMemBytes = context.getTotalMemoryUsage();

auto& sink = writerBase_->getSink();
auto stripeOffset = sink.size();
Expand Down Expand Up @@ -559,8 +559,8 @@ void Writer::flushStripe(bool close) {
metrics.stripeIndex,
metrics.flushOverhead,
metrics.stripeSize,
preFlushMem,
postFlushMem,
preFlushTotalMemBytes,
postFlushTotalMemBytes,
metrics.close);
addThreadLocalRuntimeStat(
"stripeSize",
Expand Down Expand Up @@ -721,11 +721,18 @@ bool Writer::MemoryReclaimer::reclaimableBytes(
if (!writer_->canReclaim()) {
return false;
}
const uint64_t memoryUsage = writer_->getContext().getTotalMemoryUsage();
if (memoryUsage < writer_->spillConfig_->writerFlushThresholdSize) {
TestValue::adjust(
"facebook::velox::dwrf::Writer::MemoryReclaimer::reclaimableBytes",
writer_);
const auto& context = writer_->getContext();
const auto usedBytes = context.getTotalMemoryUsage();
const auto releasableBytes = context.releasableMemoryReservation();
const bool flushable =
usedBytes >= writer_->spillConfig_->writerFlushThresholdSize;
if (releasableBytes == 0 && !flushable) {
return false;
}
reclaimableBytes = memoryUsage;
reclaimableBytes = (flushable ? usedBytes : 0) + releasableBytes;
return true;
}

Expand All @@ -741,7 +748,8 @@ uint64_t Writer::MemoryReclaimer::reclaim(
if (*writer_->nonReclaimableSection_) {
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
LOG(WARNING)
<< "Can't reclaim from dwrf writer which is under non-reclaimable section: "
<< "Can't reclaim from dwrf writer which is under non-reclaimable "
"section: "
<< pool->name();
++stats.numNonReclaimableAttempts;
return 0;
Expand All @@ -752,24 +760,34 @@ uint64_t Writer::MemoryReclaimer::reclaim(
++stats.numNonReclaimableAttempts;
return 0;
}
const uint64_t memoryUsage = writer_->getContext().getTotalMemoryUsage();
if (memoryUsage < writer_->spillConfig_->writerFlushThresholdSize) {
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
LOG(WARNING)
<< "Can't reclaim memory from dwrf writer pool " << pool->name()
<< " which doesn't have sufficient memory to flush, writer memory usage: "
<< succinctBytes(memoryUsage) << ", writer flush memory threshold: "
<< succinctBytes(writer_->spillConfig_->writerFlushThresholdSize);
++stats.numNonReclaimableAttempts;
return 0;
}

return memory::MemoryReclaimer::run(
[&]() {
int64_t reclaimedBytes{0};
{
memory::ScopedReclaimedBytesRecorder recorder(pool, &reclaimedBytes);
writer_->flushInternal(false);
const auto& context = writer_->getContext();
const auto usedBytes = context.getTotalMemoryUsage();
const auto releasedBytes = writer_->releaseMemory();
if (releasedBytes == 0 &&
usedBytes < writer_->spillConfig_->writerFlushThresholdSize) {
RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount);
LOG(WARNING)
<< "Can't reclaim memory from dwrf writer pool " << pool->name()
<< " which doesn't have sufficient memory to release or flush, "
"writer memory usage: "
<< succinctBytes(usedBytes)
<< ", writer memory available reservation: "
<< succinctBytes(context.availableMemoryReservation())
<< ", writer flush memory threshold: "
<< succinctBytes(
writer_->spillConfig_->writerFlushThresholdSize);
++stats.numNonReclaimableAttempts;
} else {
if (usedBytes >= writer_->spillConfig_->writerFlushThresholdSize) {
writer_->flushInternal(false);
}
}
}
return reclaimedBytes;
},
Expand Down
5 changes: 3 additions & 2 deletions velox/dwio/dwrf/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,9 @@ class Writer : public dwio::common::Writer {
MemoryUsageCategory memoryUsageCategory,
double estimatedMemoryGrowthRatio);

// Releases the unused memory reservations after we flush a stripe.
void releaseMemory();
// Releases the unused memory reservations after we flush a stripe. Returns
// the total number of released bytes.
int64_t releaseMemory();

// Create a new stripe. No-op if there is no data written.
void flushInternal(bool close = false);
Expand Down
Loading

0 comments on commit 5768570

Please sign in to comment.